Note that there are some explanatory texts on larger screens.

plurals
  1. POCan a C# blocking FIFO queue leak messages?
    text
    copied!<p>I'm working on an academic open source project and now I need to create a fast blocking FIFO queue in C#. My first implementation simply wrapped a synchronized queue (w/dynamic expansion) within a reader's semaphore, then I decided to re-implement in the following (theorically faster) way</p> <pre><code>public class FastFifoQueue&lt;T&gt; { private T[] _array; private int _head, _tail, _count; private readonly int _capacity; private readonly Semaphore _readSema, _writeSema; /// &lt;summary&gt; /// Initializes FastFifoQueue with the specified capacity /// &lt;/summary&gt; /// &lt;param name="size"&gt;Maximum number of elements to store&lt;/param&gt; public FastFifoQueue(int size) { //Check if size is power of 2 //Credit: http://stackoverflow.com/questions/600293/how-to-check-if-a-number-is-a-power-of-2 if ((size &amp; (size - 1)) != 0) throw new ArgumentOutOfRangeException("size", "Size must be a power of 2 for this queue to work"); _capacity = size; _array = new T[size]; _count = 0; _head = int.MinValue; //0 is the same! _tail = int.MinValue; _readSema = new Semaphore(0, _capacity); _writeSema = new Semaphore(_capacity, _capacity); } public void Enqueue(T item) { _writeSema.WaitOne(); int index = Interlocked.Increment(ref _head); index %= _capacity; if (index &lt; 0) index += _capacity; //_array[index] = item; Interlocked.Exchange(ref _array[index], item); Interlocked.Increment(ref _count); _readSema.Release(); } public T Dequeue() { _readSema.WaitOne(); int index = Interlocked.Increment(ref _tail); index %= _capacity; if (index &lt; 0) index += _capacity; T ret = Interlocked.Exchange(ref _array[index], null); Interlocked.Decrement(ref _count); _writeSema.Release(); return ret; } public int Count { get { return _count; } } } </code></pre> <p>This is the classic FIFO queue implementation with static array we find on textbooks. It is designed to atomically increment pointers, and since I can't make the pointer go back to zero when reached (capacity-1), I compute modulo apart. In theory, using Interlocked is the same as locking before doing the increment, and since there are semaphores, multiple producers/consumers may enter the queue but only one at a time is able to modify the queue pointers. First, because Interlocked.Increment first increments, then returns, I already understand that I am limited to use the post-increment value and start store items from position 1 in the array. It's not a problem, I'll go back to 0 when I reach a certain value</p> <p>What's the problem with it? You wouldn't believe that, running on heavy loads, sometimes the queue returns a NULL value. I am SURE, repeat, I AM SURE, that no method enqueues <em>null</em> into the queue. This is definitely true because I tried to put a null check in Enqueue to be sure, and no error was thrown. I created a test case for that with Visual Studio (by the way, I use a dual core CPU like maaaaaaaany people)</p> <pre><code> private int _errors; [TestMethod()] public void ConcurrencyTest() { const int size = 3; //Perform more tests changing it _errors = 0; IFifoQueue&lt;object&gt; queue = new FastFifoQueue&lt;object&gt;(2048); Thread.CurrentThread.Priority = ThreadPriority.AboveNormal; Thread[] producers = new Thread[size], consumers = new Thread[size]; for (int i = 0; i &lt; size; i++) { producers[i] = new Thread(LoopProducer) { Priority = ThreadPriority.BelowNormal }; consumers[i] = new Thread(LoopConsumer) { Priority = ThreadPriority.BelowNormal }; producers[i].Start(queue); consumers[i].Start(queue); } Thread.Sleep(new TimeSpan(0, 0, 1, 0)); for (int i = 0; i &lt; size; i++) { producers[i].Abort(); consumers[i].Abort(); } Assert.AreEqual(0, _errors); } private void LoopProducer(object queue) { try { IFifoQueue&lt;object&gt; q = (IFifoQueue&lt;object&gt;)queue; while (true) { try { q.Enqueue(new object()); } catch { } } } catch (ThreadAbortException) { } } private void LoopConsumer(object queue) { try { IFifoQueue&lt;object&gt; q = (IFifoQueue&lt;object&gt;)queue; while (true) { object item = q.Dequeue(); if (item == null) Interlocked.Increment(ref _errors); } } catch (ThreadAbortException) { } } </code></pre> <p>Once a null is got by the consumer thread, an error is counted. When performing the test with 1 producer and 1 consumer, it succeeds. When performing the test with 2 producers and 2 consumers, or more, a disaster happens: even 2000 leaks are detected. I found that the problem can be in the Enqueue method. By design contract, a producer can write only into a cell that is empty (<em>null</em>), but modifying my code with some diagnostics I found that sometimes a producer is trying to write on a non-empty cell, which is then occupied by "good" data.</p> <pre><code> public void Enqueue(T item) { _writeSema.WaitOne(); int index = Interlocked.Increment(ref _head); index %= _capacity; if (index &lt; 0) index += _capacity; //_array[index] = item; T leak = Interlocked.Exchange(ref _array[index], item); //Diagnostic code if (leak != null) { throw new InvalidOperationException("Too bad..."); } Interlocked.Increment(ref _count); _readSema.Release(); } </code></pre> <p>The "too bad" exception happens then often. But it's too strange that a conflict raises from concurrent writes, because increments are atomic and writer's semaphore allows only as many writers as the free array cells.</p> <p>Can somebody help me with that? I would really appreciate if you share your skills and experience with me.</p> <p>Thank you.</p>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload