Note that there are some explanatory texts on larger screens.

plurals
  1. POExecutorService-related design questions
    primarykey
    data
    text
    <p>I would like to validate the design of a multithreaded app I wrote and get clarification/re-assurance on a few points. I apologize in advance for such a long post - I thought to split it into a few questions, but then I would have to reference the same code and they all seemed to be inter-related, so I opted to put everything in one post. If this is not appropriate - please let me know and I will break this into multiple posts.</p> <p>Here is what I have:</p> <ol> <li><strong>BatchService</strong> (a Spring Singleton bean): accepts requests to upload a specified directory or a zip archive. For that, it holds ExecutorService servicePool . On each request, it submits a new BatchUploader Callable task to the pool and stores returned Future in a list - a TX method. It provides methods to get status of all uploads and to cancel all uploads. It also starts a new BatchMonitor thread to monitor the progress of uploads and update queues that hold completed and not completed upload infos. It also cleans up all resources when the bean is about to be destroyed (using Spring's PreDestroy callback)</li> <li><strong>BatchUploader</strong> is a Callable task and it also has its own ServiceExecutor batchPool to upload individual files. In its call() method it scans the directory or zip archive and for each file it submits a SingleFileUploader Callable task to its pool.</li> <li><strong>SingleFileUploader</strong> is a Callable task and in its call() method it does all the work uploading and processing the file and returns some status. </li> </ol> <p>And here is some real and some pseudo code:</p> <pre><code>public class BatchService { private ExecutorService servicePool; private ConcurrentHashMap&lt;String, Future&lt;SingleBatchUploadResult&gt;&gt; uploadBatchFutures = new ConcurrentHashMap&lt;String, Future&lt;SingleBatchUploadResult&gt;&gt;(); // keep last 100 unsuccessful uploads private ConcurrentLinkedQueue&lt;SingleBatchUploadResult&gt; notCompletedBatches = new ConcurrentLinkedQueue&lt;SingleBatchUploadResult&gt;(); // keep last 100 successful uploads private ConcurrentLinkedQueue&lt;String&gt; completedBatches = new ConcurrentLinkedQueue&lt;String&gt;(); private Thread monitorThread; public BatchService() { executorService = Executors.newFixedThreadPool(MAX_BATCH_UPLOAD_THREADS); monitorThread = new Thread(new BatchMonitor()); monitorThread.setDaemon(true); monitorThread.start(); } @Transactional public void processUpload(String uploadId, String contentName) { Future&lt;SingleBatchUploadResult&gt; taskFuture = servicePool.submit(new BatchUploader(uploadId, contentName)); uploadBatchFutures.put(uploadId, taskFuture); } @PreDestroy public void preDestroy() { // stop the monitor thread monitorThread.interrupt(); // stop all executors and their threads cancelAllTasks(); } public void cancelAllTasks(){ List&lt;Runnable&gt; waitingTasks = servicePool.shutdownNow(); for (Runnable task: waitingTasks){ // examine which tasks are still waiting, if necessary } } public boolean cancelBatchById(String uploadId){ Future&lt;SingleBatchUploadResult&gt; resultFuture = activeBatchFutures.get(uploadId); if (resultFuture != null &amp;&amp; (!resultFuture.isDone() || !resultFuture.isCancelled()) ){ resultFuture.cancel(true); return true; } // this task was either already finished, cancelled, not submitted or unknown return false; } public void getCurrentStatus(){ // just print out the sizes of queues for now System.out.println("number of active uploads: " + activeBatchFutures.size()); System.out.println("number of successfully completed uploads: " + completedBatches.size()); System.out.println("number of failed uploads: " + notCompletedBatches.size()); } public class BatchMonitor implements Runnable { @Override public void run() { boolean cont = true; while (cont) { if (Thread.currentThread().isInterrupted()){ // the thread is being shut down - get out cont = false; break; } Iterator&lt;Entry&lt;String, Future&lt;SingleBatchUploadResult&gt;&gt;&gt; iterator = activeBatchFutures.entrySet().iterator(); // remove completed Futures from the map // add successfully completed batches to completedBatches queue // add all other batches to notCompletedBatches queue while (iterator.hasNext() &amp;&amp; cont){ … if (batchUploadFuture.isCancelled()) { addToNotCompleted(defaultResult); // remove this future from the active list activeBatchFutures.remove(uploadId); } else if (batchUploadFuture.isDone()){ try { SingleBatchUploadResult result = batchUploadFuture.get(); if (UploadStatus.SUCCESS.equals(result.getUploadStatus())) addToCompleted(uploadId); else addToNotCompleted(result); } catch (InterruptedException e) { // the thread is being shut down - stop processing cont = false; // preserve interruption state of the thread Thread.currentThread().interrupt(); break; } catch (ExecutionException e) { addToNotCompleted(defaultResult); } // remove this future from the active list activeBatchFutures.remove(uploadId); } else { // the task has not finished yet - let it be // TODO if a Future is not complete - see how old it is [how ?] If older then timeout - cancel it // For now, rely on the ExecutorService timeout set on the BatchUploader } } // try to sleep for 5 sec, unless the thread is being shutdown if (!Thread.currentThread().isInterrupted()){ try { Thread.sleep(5000); } catch (InterruptedException e) { cont = false; // preserve interruption state of the thread Thread.currentThread().interrupt(); } } } System.out.println("BatchMonitor.run() has terminated"); } public void addToCompleted(String uploadId){ int currentSize = completedBatches.size(); // bring the size of the queue below MAX if (currentSize &gt; MAX_SUCCESSFUL_RESULTS) { int delta = MAX_SUCCESSFUL_RESULTS - currentSize; while (delta &gt; 0){ completedBatches.poll(); delta--; } } completedBatches.offer(uploadId); } public void addToNotCompleted(SingleBatchUploadResult result){ int currentSize = notCompletedBatches.size(); // bring the size of the queue below MAX if (currentSize &gt; MAX_UNSUCCESSFUL_RESULTS) { int delta = MAX_UNSUCCESSFUL_RESULTS - currentSize; while (delta &gt; 0){ notCompletedBatches.poll(); delta--; } } notCompletedBatches.offer(result); } } } public class BatchUploader implements Callable&lt;SingleBatchUploadResult&gt; { private ExecutorService executorService; // Map&lt;fileName, Future result&gt; - holds Futures for all files that were submitted for upload (those that did not fail validation) private ConcurrentHashMap&lt;String, Future&lt;SingleFileUploadResult&gt;&gt; uploadTaskFutures = new ConcurrentHashMap&lt;String, Future&lt;SingleFileUploadResult&gt;&gt;(); private ConcurrentHashMap&lt;String, SingleFileUploadResult&gt; notUploadedFiles = new ConcurrentHashMap&lt;String, SingleFileUploadResult&gt;(); private int totalFilesToUpload = 0; public BatchUploader(...) { executorService = Executors.newFixedThreadPool(MAX_THREADS_PER_BATCH); } public SingleBatchUploadResult call() { // do some validation if ( this is a correct ZIP file){ String errorMessage = processZipArchive(threadName, contentName); // the errorMessage will be not null if there were some exceptions that happened during the zip archive read: // opening the ZIP archive, reading entries or thread interruption exceptions if (errorMessage != null) { ... return errorBatchUploadResult; } } // all tasks are submitted - stop the service from accepting new requests and shutdown when done executorService.shutdown(); // now wait until all tasks have finished - but only up to BATCH_UPLOAD_TIMEOUT_IN_SEC seconds try { executorService.awaitTermination(BATCH_UPLOAD_TIMEOUT_IN_SEC, TimeUnit.SECONDS); } catch (InterruptedException e) { // try to shutdown all running tasks and stop waiting tasks from being scheduled; executorService.shutdownNow(); // preserve interruption state of the thread Thread.currentThread().interrupt(); return errorBatchUploadResult; } // at this point, we either finished all tasks (awaitTermination finished before timeout), // or we timed out waiting. Get the latest status of each task List&lt;String&gt; successfullyUploadedFiles = new LinkedList&lt;String&gt;(); for (String entryName : uploadTaskFutures.keySet()) { Future&lt;SingleFileUploadResult&gt; future = uploadTaskFutures.get(entryName); try { if (future.isCancelled()) { ... notUploadedFiles.putIfAbsent(entryName, taskResult); } else if (future.isDone()) { // this task has finished taskResult = future.get(); if (taskResult.getUploadStatus().equals(UploadStatus.SUCCESS)) successfullyUploadedFiles.add(entryName); else notUploadedFiles.putIfAbsent(entryName, taskResult); } else { // this task is either not started yet or not finished yet … notUploadedFiles.putIfAbsent(entryName, sometaskResult); } } catch (InterruptedException e){ // this is a signal to stop processing batchUploadResult.setTotalFilesToUpload(totalFilesToUpload); batchUploadResult.setNotUploadedFiles(notUploadedFiles); batchUploadResult.setSuccessfullyUploadedFiles(successfullyUploadedFiles); batchUploadResult.setStatusMessage(statusMessage); batchUploadResult.setUploadStatus(UploadStatus.PARTIAL_FAILURE); // cancel/stop all executing/waiting SingleFileUpload tasks executorService.shutdownNow(); // preserve interruption state of the thread Thread.currentThread().interrupt(); return batchUploadResult; } catch (ExecutionException e) { // we do not know what the state of this task is … notUploadedFiles.putIfAbsent(entryName, sometaskResult); } } ... return batchUploadResult; } private String processZipArchive(String threadName, String zipName) { // do all ZIP-reading work here while ( valid file found ) { if (Thread.currentThread().isInterrupted()){ // this batch uploader thread is being shut down - stop all SingleFileUpload tasks executorService.shutdownNow(); return errorMessage; } // do a try while processing individual files to be able to gather info about failed files but continue processing good ones try { // read the file and pass it for processing to SingleFileUploader Future&lt;SingleFileUploadResult&gt; taskFuture = executorService.submit(new SingleFileUploader(uploadId, bytesContent, zipEntryName)); uploadTaskFutures.put(zipEntryName, taskFuture); ... } catch (some exceptions) { notUploadedFiles.put(zipEntryName, taskResult); } } return errorMessage; } } public class SingleFileUploader implements Callable&lt;SingleFileUploadResult&gt; { ... @Override public SingleFileUploadResult call() { // check if there was a cancellation request if (Thread.currentThread().isInterrupted()){ // this file uploader thread is being shut down - get out return errorResult; } // do the real work here return result; } </code></pre> <p>}</p> <p>All this works just fine in regular scenarios. However, I would still like to hear your opinion on whether there are better/ more reliable ways to do what I want, especially in the following areas:</p> <ol> <li><p>I am using a separate thread, BatchMonitor, to keep track of what is active, done and not done yet, by periodically scanning the list of active Futures and moving them into "successfully completed" or "notCompleted[failed]" queues. I wonder if there is a better way to do that?</p></li> <li><p>I use synchronized unbounded queues for that - and bound them to a specified max size myself as I keep adding items to them. I could not find a "bounded concurrent queue" in the standard JDK libs, there are only unbounded ones, and I wish I could use the EvictingQueue from Guava but it is bundled into 15.0 release which does not seem to be out yet… So, I settled on limiting the size of the queues myself, at the expense of using the size() operation , which I know is a problem with concurrent queues as it does a full scan of the queue… My reasoning is that it might be Ok if I keep the size of the queues small - 100 in my case</p></li> <li><p>Do I need Concurrent queues at all ? The only thread that modifies the queues is the BatchMonitor thread, and the only other thread that will be reading the queues is the BatchService thread. The only time I can get into out-of-sync situation is when BatchSErvice tries to get status of a particular upload. It is possible that that upload was already removed from the activeBatchFutures map but not yet placed into either "completed" or "notCompleted" queues, because I do not synchronize reads/writes between the map and the queues deliberately , to avoid unnecessary locking. But I am Ok to leave with occasional "not found" statuses returned for a particular upload - asking for the status second time would get the correct result.</p></li> <li><p>BatchService is a Singleton bean - which brings its own scalability issues, since all requests to this bean will be throttled. Another option could be to make each BatchUploader a Spring bean and limit the number of beans, but then how would I do the overall monitoring? </p></li> <li><p>Handling timeouts and cancellations: I'm trying to make this app bullet-proof where it comes to resources cleanup - I'm trying to handle all ThreadInterpution cases and stop processing to allow threads to be killed. I'm relying on InterruptedException to be caught and handled in the BAtchUploader to propagate this event to the individual FileUploader tasks, by calling the batchPool.shutdownNow(). Can you see any potential cases where I might have runaway Threads - when JVM shuts down, app is re-deployed in a Web container, … ?</p></li> </ol> <p>Thanks!</p> <p>Marina</p>
    singulars
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload