Note that there are some explanatory texts on larger screens.

plurals
  1. POScala Actors instead of Java Futures
    text
    copied!<p><strong>Problem</strong>: I need to write an application to process several hundred files, each which will take several hundred megabytes and several seconds to do. I have written it using <code>Future[Report]</code> objects created using an <code>Executors.newFixedThreadPool()</code>, but got out-of-memory errors because the <code>List[Future[Report]]</code> object returned by <code>ExecutorService.invokeAll()</code> was holding on to the intermediate memory used by each process. I solved the problem by returning the <code>Report</code> objects from local methods in the processors after calculating the <code>Report</code> values (only a few hundred lines per <code>Report</code>) instead of doing the calculations in the <code>call</code> method (from interface <code>Callable</code>).</p> <p>I would like to try solving this using Scala Actors instead. I created a class that takes a sequence of jobs (parameterized types for the jobs, results, and processing function) and processes each in one of a configurable number of <code>Worker</code> instances (subclass of <code>Actor</code>). The code follows.</p> <p><strong>Issues</strong>:</p> <ul> <li><p>I'm not sure that my processing is correct.</p></li> <li><p>I don't like using the <code>CountDownLatch</code> to delay returning a result from the dispatcher.</p></li> <li><p>I would prefer to write a more "functional" version of the dispatcher that does not modify the <code>jobsQueue</code> list or <code>workers</code> hashmap, perhaps borrowing the tail-recursive <code>loop</code> structure from Clojure (I've used a <code>@tailrec def loop</code> method in other Scala code).</p></li> </ul> <p>I am anxiously awaiting the publication of <a href="http://www.artima.com/shop/actors_in_scala" rel="nofollow">"Actors in Scala"</a> by Philipp Haller and Frank Sommers.</p> <p>Here is the code:</p> <pre><code>package multi_worker import scala.actors.Actor import java.util.concurrent.CountDownLatch object MultiWorker { private val megabyte = 1024 * 1024 private val runtime = Runtime.getRuntime } class MultiWorker[A, B](jobs: List[A], actorCount: Int)(process: (A) =&gt; B) { import MultiWorker._ sealed abstract class Message // Dispatcher -&gt; Worker: Run this job and report results case class Process(job: A) extends Message // Worker -&gt; Dispatcher: Result of processing case class ReportResult(id: Int, result: B) extends Message // Worker -&gt; Dispatcher: I need work -- send me a job case class SendJob(id: Int) extends Message // Worker -&gt; Dispatcher: I have stopped as requested case class Stopped(id: Int) extends Message // Dispatcher -&gt; Worker: Stop working -- all jobs done case class StopWorking extends Message /** * A simple logger that can be sent text messages that will be written to the * console. Used so that messages from the actors do not step on each other. */ object Logger extends Actor { def act() { loop { react { case text: String =&gt; println(text) case StopWorking =&gt; exit() } } } } Logger.start() /** * A worker actor that will process jobs and return results to the * dispatcher. */ class Worker(id: Int) extends Actor{ def act() { // Ask the dispatcher for an initial job dispatcher ! SendJob(id) loop { react { case Process(job) =&gt; val startTime = System.nanoTime dispatcher ! ReportResult(id, process(job)) val endTime = System.nanoTime val totalMemory = (runtime.totalMemory / megabyte) val usedMemory = totalMemory - (runtime.freeMemory / megabyte) val message = "Finished job " + job + " in " + ((endTime - startTime) / 1000000000.0) + " seconds using " + usedMemory + "MB out of total " + totalMemory + "MB" Logger ! message dispatcher ! SendJob(id) case StopWorking =&gt; dispatcher ! Stopped(id) exit() } } } } val latch = new CountDownLatch(1) var res = List.empty[B] /** * The job dispatcher that sends jobs to the worker until the job queue * (jobs: TraversableOnce[A]) is empty. It then tells the workers to * stop working and returns the List[B] results to the caller. */ val dispatcher = new Actor { def act() { var jobQueue = jobs var workers = (0 until actorCount).map(id =&gt; (id, new Worker(id))).toMap workers.values.foreach(_.start()) loop { react { case ReportResult(id, result) =&gt; res = result :: res if (jobQueue.isEmpty &amp;&amp; workers.isEmpty) { latch.countDown() exit() } case SendJob(id) =&gt; if (!jobQueue.isEmpty) { workers(id) ! Process(jobQueue.head) jobQueue = jobQueue.tail } case Stopped(id) =&gt; workers = workers - id } } } } dispatcher.start() /** * Get the results of the processing -- wait for the dispatcher to finish * before returning. */ def results: List[B] = { latch.await() res } } </code></pre>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload