Note that there are some explanatory texts on larger screens.

plurals
  1. PO
    primarykey
    data
    text
    <p>Here is Circular Bounded Queue which is (supposed to be)thread safe and provides a blocking <code>take</code> operation.</p> <pre><code>public class CircularQueue&lt;T&gt; { private final int MAX_SIZE; private final AtomicReferenceArray&lt;T&gt; buffer; private final AtomicInteger start; private final AtomicInteger end; private final AtomicInteger len; private final ReentrantLock rwlock; private final Condition readCondition; public CircularQueue(int size) { MAX_SIZE = size; buffer = new AtomicReferenceArray&lt;T&gt;(size); start = new AtomicInteger(0); end = new AtomicInteger(0); len = new AtomicInteger(0); rwlock = new ReentrantLock(true); readCondition = rwlock.newCondition(); } /** * Adds to tail of the queue */ public void put(T val) { try { rwlock.lock(); buffer.set(end.get(), val); end.set((end.get() + 1) % MAX_SIZE); if (len.get() == MAX_SIZE) { // overwrite start.set((start.get() + 1) % MAX_SIZE); } else { len.incrementAndGet(); } readCondition.signal(); } finally { rwlock.unlock(); } } /** * Blocking removeFront operation * @return */ public T take() { T val = null; try { rwlock.lock(); while (len.get() == 0) { try { readCondition.await(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } val = buffer.get(start.get()); buffer.set(start.get(), null); start.set((start.get() + 1) % MAX_SIZE); len.decrementAndGet(); } finally { rwlock.unlock(); } return val; } public int size() { int curLen = 0; try { rwlock.lock(); curLen = len.get(); } finally { rwlock.unlock(); } return curLen; } } </code></pre> <p>There are many operations which are yet to be added like <code>poll</code>, <code>offer</code> etc. But you can test this out with some threads : </p> <p>It is going to hang your JVM if it runs correctly. </p> <pre><code>public static void main(String[] args) { final int MAX_QUEUE_SIZE = 4; final CircularQueue&lt;Integer&gt; q = new CircularQueue&lt;Integer&gt;(MAX_QUEUE_SIZE); new Thread(new Runnable() { @Override public void run() { for (int i = 0; i &lt; MAX_QUEUE_SIZE; ++i) { try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } System.out.println("Putting: from " + Thread.currentThread().getName() + " " + i); q.put(i); } for (int i = 0; i &lt; MAX_QUEUE_SIZE; ++i) { System.out.println("Trying to get from " + Thread.currentThread().getName() + " " + q.take()); } } }).start(); new Thread(new Runnable() { @Override public void run() { for (int i = 10; i &lt; 10 + MAX_QUEUE_SIZE; ++i) { try { Thread.sleep(1001); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } System.out.println("Putting: from " + Thread.currentThread().getName() + " " + i); q.put(i); } for (int i = 0; i &lt; MAX_QUEUE_SIZE; ++i) { System.out.println("Trying to get from " + Thread.currentThread().getName() + " " + q.take()); } } }).start(); } </code></pre> <p>Your output should probably match</p> <pre><code>Putting: from Thread-0 0 Putting: from Thread-1 10 Putting: from Thread-0 1 Putting: from Thread-1 11 Putting: from Thread-0 2 Putting: from Thread-1 12 Putting: from Thread-0 3 Trying to get from Thread-0 11 Trying to get from Thread-0 2 Trying to get from Thread-0 12 Trying to get from Thread-0 3 Putting: from Thread-1 13 Trying to get from Thread-1 13 </code></pre> <p>The other take operations from Thread-1 are waiting for a corresponding put operation since Thread-1 is slightly slower than Thread-0.</p>
    singulars
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload