Note that there are some explanatory texts on larger screens.

plurals
  1. POTThreadedQueue not capable of multiple consumers?
    text
    copied!<p>Trying to use the TThreadedQueue (Generics.Collections) in a single producer multiple consumer scheme. (Delphi-XE). The idea is to push objects into a queue and let several worker threads draining the queue.</p> <p>It does not work as expected, though. When two or more worker threads are calling PopItem, access violations are thrown from the TThreadedQueue.</p> <p>If the call to PopItem is serialized with a critical section, all is fine.</p> <p>Surely the TThreadedQueue should be able to handle multiple consumers, so am I missing something or is this a pure bug in TThreadedQueue ?</p> <p>Here is a simple example to produce the error.</p> <pre><code>program TestThreadedQueue; {$APPTYPE CONSOLE} uses // FastMM4 in '..\..\..\FastMM4\FastMM4.pas', Windows, Messages, Classes, SysUtils, SyncObjs, Generics.Collections; type TThreadTaskMsg = class(TObject) private threadID : integer; threadMsg : string; public Constructor Create( ID : integer; const msg : string); end; type TThreadReader = class(TThread) private fPopQueue : TThreadedQueue&lt;TObject&gt;; fSync : TCriticalSection; fMsg : TThreadTaskMsg; fException : Exception; procedure DoSync; procedure DoHandleException; public Constructor Create( popQueue : TThreadedQueue&lt;TObject&gt;; sync : TCriticalSection); procedure Execute; override; end; Constructor TThreadReader.Create( popQueue : TThreadedQueue&lt;TObject&gt;; sync : TCriticalSection); begin fPopQueue:= popQueue; fMsg:= nil; fSync:= sync; Self.FreeOnTerminate:= FALSE; fException:= nil; Inherited Create( FALSE); end; procedure TThreadReader.DoSync ; begin WriteLn(fMsg.threadMsg + ' ' + IntToStr(fMsg.threadId)); end; procedure TThreadReader.DoHandleException; begin WriteLn('Exception -&gt;' + fException.Message); end; procedure TThreadReader.Execute; var signal : TWaitResult; begin NameThreadForDebugging('QueuePop worker'); while not Terminated do begin try {- Calling PopItem can return empty without waittime !? Let other threads in by sleeping. } Sleep(20); {- Serializing calls to PopItem works } if Assigned(fSync) then fSync.Enter; try signal:= fPopQueue.PopItem( TObject(fMsg)); finally if Assigned(fSync) then fSync.Release; end; if (signal = wrSignaled) then begin try if Assigned(fMsg) then begin fMsg.threadMsg:= '&lt;Thread id :' +IntToStr( Self.threadId) + '&gt;'; fMsg.Free; // We are just dumping the message in this test //Synchronize( Self.DoSync); //PostMessage( fParentForm.Handle,WM_TestQueue_Message,Cardinal(fMsg),0); end; except on E:Exception do begin end; end; end; except FException:= Exception(ExceptObject); try if not (FException is EAbort) then begin {Synchronize(} DoHandleException; //); end; finally FException:= nil; end; end; end; end; Constructor TThreadTaskMsg.Create( ID : Integer; Const msg : string); begin Inherited Create; threadID:= ID; threadMsg:= msg; end; var fSync : TCriticalSection; fThreadQueue : TThreadedQueue&lt;TObject&gt;; fReaderArr : array[1..4] of TThreadReader; i : integer; begin try IsMultiThread:= TRUE; fSync:= TCriticalSection.Create; fThreadQueue:= TThreadedQueue&lt;TObject&gt;.Create(1024,1,100); try {- Calling without fSync throws exceptions when two or more threads calls PopItem at the same time } WriteLn('Creating worker threads ...'); for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,Nil); {- Calling with fSync works ! } //for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,fSync); WriteLn('Init done. Pushing items ...'); for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,'')); ReadLn; finally for i:= 1 to 4 do fReaderArr[i].Free; fThreadQueue.Free; fSync.Free; end; except on E: Exception do begin Writeln(E.ClassName, ': ', E.Message); ReadLn; end; end; end. </code></pre> <p><strong>Update</strong> : The error in TMonitor that caused TThreadedQueue to crash is fixed in Delphi XE2.</p> <p><strong>Update 2</strong> : The above test stressed the queue in the empty state. Darian Miller found that stressing the queue at full state, still could reproduce the error in XE2. The error once again is in the TMonitor. See his answer below for more information. And also a link to the QC101114.</p> <p><strong>Update 3</strong> : With Delphi-XE2 update 4 there was an announced fix for <code>TMonitor</code> that would cure the problems in <code>TThreadedQueue</code>. My tests so far are not able to reproduce any errors in <code>TThreadedQueue</code> anymore. Tested single producer/multiple consumer threads when queue is empty and full. Also tested multiple producers/multiple consumers. I varied the reader threads and writer threads from 1 to 100 without any glitch. But knowing the history, I dare others to break <code>TMonitor</code>. </p>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload