Note that there are some explanatory texts on larger screens.

plurals
  1. POMulti-threading a stage in the pipeline pattern with ExecutorService
    primarykey
    data
    text
    <p>I have a multi-stage pipeline. Each stage runs in a separate thread with communication happening using bounded BlockingArrayQueues. I'm trying to multi-thread the slowest stage to improve throughput.</p> <p>Question: What is the recommended implementation for this? Is there a library that would make this implementation simpler &amp; easier to read?</p> <p>Input Queue -> Stage 1 (4 threads) -> Bounded Queue -> Stage 2</p> <p><strong>Requirements</strong>:</p> <ul> <li><p>Work units on the input queue are independent. </p></li> <li><p>Work units are strictly ordered -- output should be in the same order as input. </p></li> <li><p>Stage 1 must be throttled -- it must stop if the output exceeds a certain size. </p></li> <li><p>Exceptions in Stage 1 should result in a "poison pill" on the output queue and terminate the ExecutorService. Queued tasks should be discarded best effort.</p></li> </ul> <p>** My proposed implementation:**</p> <p>I'm thinking of using a ThreadPoolExecutor with a limited # of threads.</p> <p>Strict ordering will be enforced with a CountDown latch on each work unit. A thread can only push the result if the previous work unit's latch is 0 and there's space on the queue. This also takes care of throttling since the thread will block until there's room on the output queue.</p> <pre><code>class WorkUnit { CountDownLatch previousLatch; CountDownLatch myLatch; } class MyRunnable extends Runnable { public void run() { //do work... previousLatch.await(); ouputQueue.put( result ); myLatch.countDown(); } } </code></pre> <p>Exception handling is where I'm a little stumped. I'm thinking of overriding ThreadPoolExecutor.afterExecute() which will call shutdownNow() if there's an exception.</p> <pre><code>class MyThreadPoolExecutor extends ThreadPoolExecutor { protected void afterExecute(Runnable r, Throwable t) { if(t != null) { //record exection, log, alert, etc ouput.put(POISON_PILL); shutdownNow(); } } } </code></pre>
    singulars
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload