Note that there are some explanatory texts on larger screens.

plurals
  1. PO
    primarykey
    data
    text
    <blockquote> <p>How can I work around this limitation in <code>ThreadPoolExecutor</code> where the queue needs to be bounded and full before more threads will be started.</p> </blockquote> <p>I believe I have finally found a somewhat elegant (maybe a little hacky) solution to this limitation with <code>ThreadPoolExecutor</code>. It involves extending <code>LinkedBlockingQueue</code> to have it return <code>false</code> for <code>queue.offer(...)</code> when there are already some tasks queued. If the current threads are not keeping up with the queued tasks, the TPE will add additional threads. If the pool is already at max threads, then the <code>RejectedExecutionHandler</code> will be called. It is the handler which then does the <code>put(...)</code> into the queue.</p> <p>It certainly is strange to write a queue where <code>offer(...)</code> can return <code>false</code> and <code>put()</code> never blocks so that's the hack part. But this works well with TPE's usage of the queue so I don't see any problem with doing this.</p> <p>Here's the code:</p> <pre><code>// extend LinkedBlockingQueue to force offer() to return false conditionally BlockingQueue&lt;Runnable&gt; queue = new LinkedBlockingQueue&lt;Runnable&gt;() { private static final long serialVersionUID = -6903933921423432194L; @Override public boolean offer(Runnable e) { /* * Offer it to the queue if there is 0 items already queued, else * return false so the TPE will add another thread. If we return false * and max threads have been reached then the RejectedExecutionHandler * will be called which will do the put into the queue. */ if (size() == 0) { return super.offer(e); } else { return false; } } }; ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*secs*/, TimeUnit.SECONDS, queue); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { /* * This does the actual put into the queue. Once the max threads * have been reached, the tasks will then queue up. */ executor.getQueue().put(r); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; } } }); </code></pre> <p>With this mechanism, when I submit tasks to the queue, the <code>ThreadPoolExecutor</code> will:</p> <ol> <li>Scale the number of threads up to the core size initially (here 1).</li> <li>Offer it to the queue. If the queue is empty it will be queued to be handled by the existing threads.</li> <li>If the queue has 1 or more elements already, the <code>offer(...)</code> will return false.</li> <li>If false is returned, scale up the number of threads in the pool until they reach the max number (here 50).</li> <li>If at the max then it calls the <code>RejectedExecutionHandler</code></li> <li>The <code>RejectedExecutionHandler</code> then puts the task into the queue to be processed by the first available thread in FIFO order.</li> </ol> <p>Although in my example code above, the queue is unbounded, you could also define it as a bounded queue. For example, if you add a capacity of 1000 to the <code>LinkedBlockingQueue</code> then it will:</p> <ol> <li>scale the threads up to max</li> <li>then queue up until it is full with 1000 tasks</li> <li>then block the caller until space becomes available to the queue.</li> </ol> <p>In addition, if you really needed to use <code>offer(...)</code> in the <code>RejectedExecutionHandler</code> then you could use the <code>offer(E, long, TimeUnit)</code> method instead with <code>Long.MAX_VALUE</code> as the timeout.</p> <p><strong>Edit:</strong></p> <p>I've tweaked my <code>offer(...)</code> method override per @Ralf's feedback. This will only scale up the number of threads in the pool if they are not keeping up with the load.</p> <p><strong>Edit:</strong></p> <p>Another tweak to this answer could be to actually ask the TPE if there are idle threads and only enqueue the item if there is so. You would have to make a true class for this and add a <code>ourQueue.setThreadPoolExecutor(tpe);</code> method on it.</p> <p>Then your <code>offer(...)</code> method might look something like:</p> <ol> <li>Check to see if the <code>tpe.getPoolSize() == tpe.getMaximumPoolSize()</code> in which case just call <code>super.offer(...)</code>.</li> <li>Else if <code>tpe.getPoolSize() &gt; tpe.getActiveCount()</code> then call <code>super.offer(...)</code> since there seem to be idle threads.</li> <li>Otherwise return <code>false</code> to fork another thread.</li> </ol> <p>Maybe this:</p> <pre><code>int poolSize = tpe.getPoolSize(); int maximumPoolSize = tpe.getMaximumPoolSize(); if (poolSize &gt;= maximumPoolSize || poolSize &gt; tpe.getActiveCount()) { return super.offer(e); } else { return false; } </code></pre> <p>Note that the get methods on TPE are expensive since they access <code>volatile</code> fields or (in the case of <code>getActiveCount()</code>) lock the TPE and walk the thread-list. Also, there are race conditions here that may cause a task to be enqueued improperly or another thread forked when there was an idle thread.</p>
    singulars
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. VO
      singulars
      1. This table or related slice is empty.
    2. VO
      singulars
      1. This table or related slice is empty.
    3. VO
      singulars
      1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload