Note that there are some explanatory texts on larger screens.

plurals
  1. POJava Concurrency - Better Design approach to manage thread life cycle (start/stop)
    text
    copied!<p>I'm designing a concurrent Java application that reads data from various medical devices available on the hospital Intranet.</p> <p>I've read "Java concurrency in practice - Brian Goetz..." to understand how to do stuff, but I think I'm still missing something.</p> <p>Here's a <a href="https://docs.google.com/drawings/d/1C54rfxRL0MXykobGF39w-cSSJhxcbszNO8leMnEf4uM/edit?hl=en_US" rel="nofollow">quick simple diagram</a> of what I'm trying to do and there's some code snippet below..</p> <p>Worker threads (MedicalDeviceData instances) continuously read data from medical devices and make it available for the MedicalDeviceWorkManager, who in turn supplies it to the end user.<br> The worker threads keep reading data infinitely (ideally) and there's no "work completed" situation in my scenario. Moreover, user can choose to Start All devices or start a specific device or stop a device as and when he wishes.</p> <p>Below is code snippet (compiles but not tested) of how I would implement it. </p> <p>MedicalDeviceWorkManager - Spawns the worker threads and manages them.</p> <p>MedicalDeviceData - Worker thread gets data from medical devices infinitely and updates this instance of this class.</p> <p>Mainly look at startDevice, stopDevice and run methods.</p> <p>You'll obviously notice that I'm not using ThreadPoolExecutor and Future and that I just rolled my own implementation here. </p> <p>As future.get blocks till a work is completed, it doesnt make sense for my case, because my worker thread never "completes" the task...its just an infinitely ongoing task...</p> <p>QUESTION:How do I change the implementation shown below to a more standardized one so that I could make better use of java.util.concurrent package (ThreadPoolExecutor/Future). </p> <p>Any other better design pattern I should look at?</p> <pre><code>public class MedicalDeviceWorkManager { private ThreadGroup rootThreadGroup = null; Hashtable&lt;String, MedicalDeviceData&gt; deviceObjs = new Hashtable&lt;String, MedicalDeviceData&gt;(); public void manageMedicalDevices() throws InterruptedException { String[] allDevices={"Device1","Device2","Device3","Device4"}; //-- Start all threads to collect data for(String deviceToStart:allDevices){ this.startDevice(deviceToStart); } //-- Stop all threads for(String deviceToStop:allDevices){ this.stopDevice(deviceToStop); } //-- Start on request from user String deviceToStart="Device1"; this.startDevice(deviceToStart); //-- Stop on request from user. String deviceToStop="Device1"; this.stopDevice(deviceToStop); /* * Get Data and give it to client * This is happening via a separate TCP port * */ while(true){ for(String deviceName:allDevices){ if(deviceObjs.get(deviceName)!=null){ ConcurrentHashMap&lt;String,BigDecimal&gt; devData=deviceObjs.get(deviceName).getCollectedData(); //--Loop and send data to client on TCP stream ; } }//-- loop the devices }//-- infinite } //-- Start the device to start acquiring data using a worker thread private void startDevice(String deviceName){ //-- Get Device instance MedicalDeviceData thisDevice=deviceObjs.get(deviceName); if(thisDevice==null){ thisDevice=new MedicalDeviceData(deviceName); deviceObjs.put(deviceName, thisDevice); } //-- Create thread to start data acquisition //-- Start if not being processed already (Handle what if thread hung scenario later) if(this.getThread(deviceName)==null){ Thread t=new Thread(thisDevice); t.setName(deviceName); t.start(); } } //-- Stop the worker thread thats collecting the data. private void stopDevice(String deviceName) throws InterruptedException { deviceObjs.get(deviceName).setShutdownRequested(true); Thread t=this.getThread(deviceName); t.interrupt(); t.join(1000); } private Thread getThread( final String name ) { if ( name == null ) throw new NullPointerException( "Null name" ); final Thread[] threads = getAllThreads( ); for ( Thread thread : threads ) if ( thread.getName( ).equals( name ) ) return thread; return null; } private ThreadGroup getRootThreadGroup( ) { if ( rootThreadGroup != null ) return rootThreadGroup; ThreadGroup tg = Thread.currentThread( ).getThreadGroup( ); ThreadGroup ptg; while ( (ptg = tg.getParent( )) != null ) tg = ptg; return tg; } private Thread[] getAllThreads( ) { final ThreadGroup root = getRootThreadGroup( ); final ThreadMXBean thbean = ManagementFactory.getThreadMXBean( ); int nAlloc = thbean.getThreadCount( ); int n = 0; Thread[] threads; do { nAlloc *= 2; threads = new Thread[ nAlloc ]; n = root.enumerate( threads, true ); } while ( n == nAlloc ); return java.util.Arrays.copyOf( threads, n ); } }//-- MedicalDeviceWorkManager public class MedicalDeviceData implements Runnable{ //-- Data Collected from medical device private final ConcurrentHashMap&lt;String,BigDecimal&gt; collectedData=new ConcurrentHashMap&lt;String,BigDecimal&gt;(); //-- Set by Thread Manager to request a shutdown..after which it should interrupt the thread private AtomicBoolean shutdownRequested; //-- Simple data Counter private AtomicInteger dataCounter=new AtomicInteger(0); //-- Device Name private String thisDeviceName; public void run() { //-- Initialize I/O for the device ; while(!this.getShutdownRequested()){ try{ //-- just to compile the code Thread.sleep(0); //-- perform I/O operation to get data from medical device ; //-- Add data into the ConcurrentHashMap...Both key and value are immutable. collectedData.put("DataKey", new BigDecimal("9999")); //-- data counter dataCounter.getAndIncrement(); } catch(InterruptedException ie){ if(this.getShutdownRequested()){ return; } //throw new InterruptedException(); } } }//-- run public MedicalDeviceData(String thisDeviceName){ this.thisDeviceName=thisDeviceName; } /** * @return the shutdownRequested */ public boolean getShutdownRequested() { return this.shutdownRequested.get(); } /** * @param shutdownRequested the shutdownRequested to set */ public void setShutdownRequested(boolean shutdownRequested) { this.shutdownRequested.set(shutdownRequested); } /** * Both key and value are immutable, so ok to publish reference. * * @return the collectedData */ public ConcurrentHashMap&lt;String, BigDecimal&gt; getCollectedData() { return collectedData; } /** * @return the dataCounter */ public AtomicInteger getDataCounter() { return dataCounter; } } </code></pre>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload