Note that there are some explanatory texts on larger screens.

plurals
  1. POCustom blocking queue locking issue
    primarykey
    data
    text
    <p>I am trying to do some custom implementation of blocking queue with fixed length array of byte arrays. I am not removing polled elements, therefore I adjusted put method to return byte array so that it can be written directly (producer thread uses MappedByteBuffer to write directly to this byte array). I added "commitPut()" method to simply increase counters and set "lengths" arrays. (if multiple threads would be writing this could be the concurrency problems, but I know that only one thread is writing).</p> <p>Below is what I currently have. It works if I debug through step by step, but if I "run" it looks like it encounters some locking problems. I copied, stripped down and adjusted ArrayBlockingQueue code. Can someone with better knowledge please look at the class and tell me what I am doing wrong, or how to do it better (like write directy to buffer and set lengths array and counters at the same step)?</p> <pre><code>public class ByteArrayBlockingQueue { private final int[] lens; // array to valid lengths private final byte[][] items; // array of byte arrays private int takeIndex = 0; private int putIndex = 0; private int count = 0; public volatile int polledLen = 0; // lenght of last polled byte array private final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull; final int inc(int i) { return (++i == items.length)? 0 : i; } public ByteArrayBlockingQueue(int capacity, int size, boolean fair) { if (capacity &lt;= 0) throw new IllegalArgumentException(); this.items = new byte[capacity][size]; this.lens = new int[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public byte[] put() throws InterruptedException { final byte[][] items = this.items; final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == items.length) notFull.await(); } catch (InterruptedException ie) { notFull.signal(); // propagate to non-interrupted thread throw ie; } //insert(e, len); return items[putIndex]; } finally { lock.unlock(); } } public void commitPut(int lenBuf) throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { lens[putIndex] = lenBuf; putIndex = inc(putIndex); ++count; notEmpty.signal(); } finally { lock.unlock(); } } public byte[] poll() { final ReentrantLock lock = this.lock; lock.lock(); try { if (count == 0) return null; final byte[][] items = this.items; final int[] lens = this.lens; byte[] e = items[takeIndex]; this.polledLen = lens[takeIndex]; //items[takeIndex] = null; takeIndex = inc(takeIndex); --count; notFull.signal(); return e; } finally { lock.unlock(); } } } </code></pre>
    singulars
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload