Note that there are some explanatory texts on larger screens.

plurals
  1. POWhy does iterating over GetConsumingEnumerable() not fully empty the underlying blocking collection
    text
    copied!<p>I have a quantifiable &amp; repeatable problem using the Task Parallel Library, <code>BlockingCollection&lt;T&gt;</code>, <code>ConcurrentQueue&lt;T&gt;</code> &amp; <code>GetConsumingEnumerable</code> while trying to create a simple pipeline.</p> <p>In a nutshell, adding entries to a default <code>BlockingCollection&lt;T&gt;</code> (which under the hood is relying on a <code>ConcurrentQueue&lt;T&gt;</code>) from one thread, does not guarantee that they will be popped off the <code>BlockingCollection&lt;T&gt;</code> from another thread calling the <code>GetConsumingEnumerable()</code> Method.</p> <p>I've created a very simple Winforms Application to reproduce/simulate this which just prints integers to the screen.</p> <ul> <li><code>Timer1</code> is responsible for queueing up the work items... It uses a concurrent dictionary called <code>_tracker</code> so that it knows what it has already added to the blocking collection.</li> <li><code>Timer2</code> is just logging the count state of both the <code>BlockingCollection</code> &amp; of the <code>_tracker</code></li> <li>The START button kicks off a <code>Paralell.ForEach</code> which simply iterates over the blocking collections <code>GetConsumingEnumerable()</code> and starts printing them to the second list box.</li> <li>The STOP button stops <code>Timer1</code> preventing more entries from being added to the blocking collection.</li> </ul> <pre><code>public partial class Form1 : Form { private int Counter = 0; private BlockingCollection&lt;int&gt; _entries; private ConcurrentDictionary&lt;int, int&gt; _tracker; private CancellationTokenSource _tokenSource; private TaskFactory _factory; public Form1() { _entries = new BlockingCollection&lt;int&gt;(); _tracker = new ConcurrentDictionary&lt;int, int&gt;(); _tokenSource = new CancellationTokenSource(); _factory = new TaskFactory(); InitializeComponent(); } private void timer1_Tick(object sender, EventArgs e) { //ADDING TIMER -&gt; LISTBOX 1 for(var i = 0; i &lt; 3; i++,Counter++) { if (_tracker.TryAdd(Counter, Counter)) _entries.Add(Counter); listBox1.Items.Add(string.Format("Adding {0}", Counter)); } } private void timer2_Tick_1(object sender, EventArgs e) { //LOGGING TIMER -&gt; LIST BOX 3 listBox3.Items.Add(string.Format("Tracker Count : {0} / Entries Count : {1}", _tracker.Count, _entries.Count)); } private void button1_Click(object sender, EventArgs e) { //START BUTTON -&gt; LOGS TO LIST BOX 2 var options = new ParallelOptions { CancellationToken = _tokenSource.Token, MaxDegreeOfParallelism = 1 }; _factory.StartNew(() =&gt; { Parallel.ForEach(_entries.GetConsumingEnumerable(), options, DoWork); }); timer1.Enabled = timer2.Enabled = true; timer1.Start(); timer2.Start(); } private void DoWork(int entry) { Thread.Sleep(1000); //Sleep for 1 second to simulate work being done. Invoke((MethodInvoker)(() =&gt; listBox2.Items.Add(string.Format("Processed {0}", entry)))); int oldEntry; _tracker.TryRemove(entry, out oldEntry); } private void button2_Click(object sender, EventArgs e) { //STOP BUTTON timer1.Stop(); timer1.Enabled = false; } </code></pre> <p>Here's the sequence of events:</p> <ul> <li>Press Start</li> <li>Timer1 ticks &amp; ListBox1 is immediately updated with 3 messages (Adding 0, 1, 2)</li> <li>ListBox2 is subsequent updated with 3 messages, 1 second apart <ul> <li>Processing 0</li> <li>Processing 1</li> <li>Processing 2</li> </ul></li> <li>Timer1 ticks &amp; ListBox1 is immediately updated with 3 messages (Adding 3, 4, 5)</li> <li>ListBox2 is sbsequent updated with 2 messages, 1 second apart <ul> <li>Processing 3</li> <li>Processing 4</li> <li><strong>Processing 5</strong> is not printed... would appear to have gone "missing"</li> </ul></li> <li>Press STOP to prevent more messages being added by timer 1</li> <li>Wait... "Processing 5" still does not appear</li> </ul> <p><img src="https://i.stack.imgur.com/vud45.png" alt="Missing Entry"></p> <p>You can see that the concurrent dictionary is still tracking that 1 item has not yet been processed &amp; subsequently removed from <code>_tracker</code></p> <p>If I Press Start again, then timer1 begins adding more 3 more entries and the Parallel loop comes back to life printing 5, 6, 7 &amp; 8. </p> <p><img src="https://i.stack.imgur.com/o4VVC.png" alt="Entry returned after subsequent items shoved in behind it"></p> <p>I'm at a complete loss as to why this occurs. Calling start again obviously calls a newtask, which calls a Paralell foreach, and re-executes GetConsumingEnumerable() which magically finds the missing entry... I</p> <p><strong>Why is the <code>BlockingCollection.GetConsumingEnumerable()</code> not guaranteeing to iterate over every item that's added to the collection.</strong></p> <p><strong>Why does the addition of more entries subsequently cause it to get "unstuck" and continue with it's processing?</strong></p>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload