Note that there are some explanatory texts on larger screens.

plurals
  1. POParallel.ForEach stalled when integrated with BlockingCollection
    text
    copied!<p>I adopted my implementation of parallel/consumer based on the code in <a href="https://codereview.stackexchange.com/questions/8972/is-this-a-valid-producer-consumer-implementation-for-parallel-processing-in-th">this question</a></p> <pre><code>class ParallelConsumer&lt;T&gt; : IDisposable { private readonly int _maxParallel; private readonly Action&lt;T&gt; _action; private readonly TaskFactory _factory = new TaskFactory(); private CancellationTokenSource _tokenSource; private readonly BlockingCollection&lt;T&gt; _entries = new BlockingCollection&lt;T&gt;(); private Task _task; public ParallelConsumer(int maxParallel, Action&lt;T&gt; action) { _maxParallel = maxParallel; _action = action; } public void Start() { try { _tokenSource = new CancellationTokenSource(); _task = _factory.StartNew( () =&gt; { Parallel.ForEach( _entries.GetConsumingEnumerable(), new ParallelOptions { MaxDegreeOfParallelism = _maxParallel, CancellationToken = _tokenSource.Token }, (item, loopState) =&gt; { Log("Taking" + item); if (!_tokenSource.IsCancellationRequested) { _action(item); Log("Finished" + item); } else { Log("Not Taking" + item); _entries.CompleteAdding(); loopState.Stop(); } }); }, _tokenSource.Token); } catch (OperationCanceledException oce) { System.Diagnostics.Debug.WriteLine(oce); } } private void Log(string message) { Console.WriteLine(message); } public void Stop() { Dispose(); } public void Enqueue(T entry) { Log("Enqueuing" + entry); _entries.Add(entry); } public void Dispose() { if (_task == null) { return; } _tokenSource.Cancel(); while (!_task.IsCanceled) { } _task.Dispose(); _tokenSource.Dispose(); _task = null; } } </code></pre> <p>And here is a test code </p> <pre><code>class Program { static void Main(string[] args) { TestRepeatedEnqueue(100, 1); } private static void TestRepeatedEnqueue(int itemCount, int parallelCount) { bool[] flags = new bool[itemCount]; var consumer = new ParallelConsumer&lt;int&gt;(parallelCount, (i) =&gt; { flags[i] = true; } ); consumer.Start(); for (int i = 0; i &lt; itemCount; i++) { consumer.Enqueue(i); } Thread.Sleep(1000); Debug.Assert(flags.All(b =&gt; b == true)); } } </code></pre> <p>The test always fails - it always stuck at around 93th-item from the 100 tested. Any idea which part of my code caused this issue, and how to fix it?</p>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload