Note that there are some explanatory texts on larger screens.

plurals
  1. POThread Pool handling 'duplicate' tasks
    primarykey
    data
    text
    <p>I want to be executing some different tasks in parallel, but have the concept that if a task is already queued or is currently processing, it will not get re-queued. I have read up a little on the Java API and have come up with the code below, which seems to work. Can anybody shed light on whether the method I am using is the best approach. Any dangers (thread safety?) or better ways to do this? Code is as below:</p> <pre><code>import java.util.HashMap; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class TestExecution implements Runnable { String key1; String key2; static HashMap&lt;TestExecution, Future&lt;?&gt;&gt; executions = new HashMap&lt;TestExecution, Future&lt;?&gt;&gt;(); static LinkedBlockingQueue&lt;Runnable&gt; q = new LinkedBlockingQueue&lt;Runnable&gt;(); static ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 5, 1, TimeUnit.MINUTES, q); public static void main(String[] args) { try { execute(new TestExecution("A", "A")); execute(new TestExecution("A", "A")); execute(new TestExecution("B", "B")); Thread.sleep(8000); execute(new TestExecution("B", "B")); } catch (InterruptedException e) { e.printStackTrace(); } } static boolean execute(TestExecution e) { System.out.println("Handling "+e.key1+":"+e.key2); if (executions.containsKey(e)) { Future&lt;?&gt; f = (Future&lt;?&gt;) executions.get(e); if (f.isDone()) { System.out.println("Previous execution has completed"); executions.remove(e); } else { System.out.println("Previous execution still running"); return false; } } else { System.out.println("No previous execution"); } Future&lt;?&gt; f = tpe.submit(e); executions.put(e, f); return true; } public TestExecution(String key1, String key2) { this.key1 = key1; this.key2 = key2; } public boolean equals(Object obj) { if (obj instanceof TestExecution) { TestExecution t = (TestExecution) obj; return (key1.equals(t.key1) &amp;&amp; key2.equals(t.key2)); } return false; } public int hashCode () { return key1.hashCode()+key2.hashCode(); } public void run() { try { System.out.println("Start processing "+key1+":"+key2); Thread.sleep(4000); System.out.println("Finish processing "+key1+":"+key2); } catch (InterruptedException e) { e.printStackTrace(); } } } </code></pre> <p>Follow up to comment below:<br> The plan is that triggering the tasks to execute will be handled by cron calling RESTful web service. For example below is the setup for one task triggered at 9:30 every day, plus another scheduled every two minutes. </p> <pre><code>0/2 * * * * restclient.pl key11 key12 30 09 * * * restclient.pl key21 key22 </code></pre> <p>In this case, if task key11:key12 is running, or already queued to run, I don't want to queue another instance. I understand we have other options for scheduling, however we tend to use cron for other tasks, so I want to try to keep this.</p> <p>Second Update. In response to comments so far I have re-written the code, could you comment on any issues with the following updated solution?</p> <pre><code>import java.util.concurrent.LinkedBlockingQueue; public class TestExecution implements Runnable { String key1; String key2; static TestThreadPoolExecutor tpe = new TestThreadPoolExecutor(new LinkedBlockingQueue&lt;Runnable&gt;()); public static void main(String[] args) { try { tpe.execute(new TestExecution("A", "A")); tpe.execute(new TestExecution("A", "A")); tpe.execute(new TestExecution("B", "B")); Thread.sleep(8000); tpe.execute(new TestExecution("B", "B")); } catch (InterruptedException e) { e.printStackTrace(); } } public TestExecution(String key1, String key2) { this.key1 = key1; this.key2 = key2; } public boolean equals(Object obj) { if (obj instanceof TestExecution) { TestExecution t = (TestExecution) obj; return (key1.equals(t.key1) &amp;&amp; key2.equals(t.key2)); } return false; } public int hashCode () { return key1.hashCode()+key2.hashCode(); } public void run() { try { System.out.println("Start processing "+key1+":"+key2); Thread.sleep(4000); System.out.println("Finish processing "+key1+":"+key2); } catch (InterruptedException e) { e.printStackTrace(); } } } import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class TestThreadPoolExecutor extends ThreadPoolExecutor { Set&lt;Runnable&gt; executions = Collections.synchronizedSet(new HashSet&lt;Runnable&gt;()); public TestThreadPoolExecutor(LinkedBlockingQueue&lt;Runnable&gt; q) { super(2, 5, 1, TimeUnit.MINUTES, q); } public void execute(Runnable command) { if (executions.contains(command)) { System.out.println("Previous execution still running"); return; } else { System.out.println("No previous execution"); } super.execute(command); executions.add(command); } protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); executions.remove(r); } } </code></pre>
    singulars
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload