Note that there are some explanatory texts on larger screens.

plurals
  1. POIssue with threaded queue implementation in .NET
    primarykey
    data
    text
    <p>I'm trying to implement a threaded queue in .NET, but I'm having some trouble when I run it through tests.</p> <p>The implementation is permitted to forego some of the complexities with threading as it enforces that only one thread will ever place items into the queue and only one thread will ever take them out (this is by design).</p> <p>The problem is that sometimes, the Take() will skip an item as if it was never there and in my tests I'll get "Expected: 736 But was: 737". I can't see anywhere in this code that that kind of effect would occur; Put will only ever place after the very last item (so it shouldn't be affecting this.m_Head directly) and Take is using Interlocked.Exchange to take the item from the head.</p> <p>How does this implementation permit the issue to occur?</p> <p><strong>Implementation:</strong></p> <pre><code>using System; using System.Threading; #pragma warning disable 420 namespace Tychaia.Threading { public class TaskPipeline&lt;T&gt; { private int? m_InputThread; private int? m_OutputThread; private volatile TaskPipelineEntry&lt;T&gt; m_Head; /// &lt;summary&gt; /// Creates a new TaskPipeline with the current thread being /// considered to be the input side of the pipeline. The /// output thread should call Connect(). /// &lt;/summary&gt; public TaskPipeline() { this.m_InputThread = Thread.CurrentThread.ManagedThreadId; this.m_OutputThread = null; } /// &lt;summary&gt; /// Connects the current thread as the output of the pipeline. /// &lt;/summary&gt; public void Connect() { if (this.m_OutputThread != null) throw new InvalidOperationException("TaskPipeline can only have one output thread connected."); this.m_OutputThread = Thread.CurrentThread.ManagedThreadId; } /// &lt;summary&gt; /// Puts an item into the queue to be processed. /// &lt;/summary&gt; /// &lt;param name="value"&gt;Value.&lt;/param&gt; public void Put(T value) { if (this.m_InputThread != Thread.CurrentThread.ManagedThreadId) throw new InvalidOperationException("Only the input thread may place items into TaskPipeline."); // Walk the queued items until we find one that // has Next set to null. var head = this.m_Head; while (head != null) { if (head.Next != null) head = head.Next; if (head.Next == null) break; } if (head == null) this.m_Head = new TaskPipelineEntry&lt;T&gt; { Value = value }; else head.Next = new TaskPipelineEntry&lt;T&gt; { Value = value }; } /// &lt;summary&gt; /// Takes the next item from the pipeline, or blocks until an item /// is recieved. /// &lt;/summary&gt; /// &lt;returns&gt;The next item.&lt;/returns&gt; public T Take() { if (this.m_OutputThread != Thread.CurrentThread.ManagedThreadId) throw new InvalidOperationException("Only the output thread may retrieve items from TaskPipeline."); // Wait until there is an item to take. var spin = new SpinWait(); while (this.m_Head == null) spin.SpinOnce(); // Return the item and exchange the current head with // the next item, all in an atomic operation. return Interlocked.Exchange(ref this.m_Head, this.m_Head.Next).Value; } } } #pragma warning restore 420 </code></pre> <p><strong>Failing Test:</strong></p> <pre><code>[Test] public void TestPipelineParallelTo100() { var random = new Random(); var pipeline = new TaskPipeline&lt;int&gt;(); var success = true; int expected = 0, actual = 0; ThreadStart processor = () =&gt; { pipeline.Connect(); for (int i = 0; i &lt; 100; i++) { var v = pipeline.Take(); if (v != i) { success = false; expected = i; actual = v; break; } Thread.Sleep(random.Next(1, 10)); } }; var thread = new Thread(processor); thread.Start(); for (int i = 0; i &lt; 100; i++) { pipeline.Put(i); Thread.Sleep(random.Next(1, 10)); } thread.Join(); if (!success) Assert.AreEqual(expected, actual); } </code></pre>
    singulars
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload