Note that there are some explanatory texts on larger screens.

plurals
  1. PO
    primarykey
    data
    text
    <p>If you assign the value of <code>m_Head.Next</code> after it is read in <code>Take</code> for passing to <code>Interlocked.Exchange(ref this.m_Head, this.m_Head.Next)</code>, the pointer will be lost because the only way to access it was through <code>m_Head</code>.</p> <ul> <li><code>Take</code> reads <code>m_Head.Next</code> (<code>==null</code>)</li> <li><code>Put</code> writes <code>m_Head.Next</code> (<code>!=null</code>)</li> <li><code>Take</code> writes <code>m_Head</code> (<code>==null</code>)</li> </ul> <p><strong>Edit:</strong> This should work. I used a non-null sentinel value and <code>Interlocked.CompareExchange</code> to make sure <code>Put</code> doesn't try to reuse an entry that <code>Take</code> is already removing.</p> <p><strong>Edit 2:</strong> Tweaks to <code>Take</code>.</p> <p><strong>Edit 3:</strong> I believe I still need to add a <code>goto retry;</code> in <code>Put</code> if the identified tail is <code>Entry.Sentinel</code>.</p> <pre><code>using System; using System.Threading; #pragma warning disable 420 namespace Tychaia.Threading { public class TaskPipeline&lt;T&gt; { private int? m_InputThread; private int? m_OutputThread; private volatile Entry m_Head; private sealed class Entry { public static readonly Entry Sentinel = new Entry(default(T)); public readonly T Value; public Entry Next; public Entry(T value) { Value = value; Next = null; } } /// &lt;summary&gt; /// Creates a new TaskPipeline with the current thread being /// considered to be the input side of the pipeline. The /// output thread should call Connect(). /// &lt;/summary&gt; public TaskPipeline() { this.m_InputThread = Thread.CurrentThread.ManagedThreadId; this.m_OutputThread = null; } /// &lt;summary&gt; /// Connects the current thread as the output of the pipeline. /// &lt;/summary&gt; public void Connect() { if (this.m_OutputThread != null) throw new InvalidOperationException("TaskPipeline can only have one output thread connected."); this.m_OutputThread = Thread.CurrentThread.ManagedThreadId; } /// &lt;summary&gt; /// Puts an item into the queue to be processed. /// &lt;/summary&gt; /// &lt;param name="value"&gt;Value.&lt;/param&gt; public void Put(T value) { if (this.m_InputThread != Thread.CurrentThread.ManagedThreadId) throw new InvalidOperationException("Only the input thread may place items into TaskPipeline."); retry: // Walk the queued items until we find one that // has Next set to null. var head = this.m_Head; while (head != null) { if (head.Next != null) head = head.Next; if (head.Next == null) break; } if (head == null) { if (Interlocked.CompareExchange(ref m_Head, new Entry(value), null) != null) goto retry; } else { if (Interlocked.CompareExchange(ref head.Next, new Entry(value), null) != null) goto retry; } } /// &lt;summary&gt; /// Takes the next item from the pipeline, or blocks until an item /// is recieved. /// &lt;/summary&gt; /// &lt;returns&gt;The next item.&lt;/returns&gt; public T Take() { if (this.m_OutputThread != Thread.CurrentThread.ManagedThreadId) throw new InvalidOperationException("Only the output thread may retrieve items from TaskPipeline."); // Wait until there is an item to take. var spin = new SpinWait(); while (this.m_Head == null) spin.SpinOnce(); // Return the item and exchange the current head with // the next item, all in an atomic operation. Entry head = m_Head; retry: Entry next = head.Next; // replace m_Head.Next with a non-null sentinel to ensure Put won't try to reuse it if (Interlocked.CompareExchange(ref head.Next, Entry.Sentinel, next) != next) goto retry; m_Head = next; return head.Value; } } } </code></pre>
    singulars
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload