Note that there are some explanatory texts on larger screens.

plurals
  1. POSingle producer-multiple consumers: How to tell consumers that production is complete
    primarykey
    data
    text
    <p>I my program a producer thread reads lines of text from a text file ( have about 8000 lines of text) and loads the lines into a concurrent queue.</p> <p>Three consumer threads read the lines from the queue each writing to a separate file.</p> <p>When I run the program only the producer thread and only one of the consumer threads complete. The other two threads seem to hang. </p> <p>How do I reliably tell all consumer threads that the end of file has been reached so they should return but making sure the queue is completely empty.</p> <p>My platform is Windows 7 64-bit </p> <p>VC11.</p> <p>Code compiled as 64-bit and 32-bit got the same behavior.</p> <p>Here is the code. (It is self-contained and compilable)</p> <pre><code>#include &lt;queue&gt; #include&lt;iostream&gt; #include&lt;fstream&gt; #include &lt;atomic&gt; #include &lt;thread&gt; #include &lt;condition_variable&gt; #include &lt;mutex&gt; #include&lt;string&gt; #include&lt;memory&gt; template&lt;typename Data&gt; class concurrent_queue { private: std::queue&lt;Data&gt; the_queue; mutable std::mutex the_mutex; std::condition_variable the_condition_variable; public: void push(Data const&amp; data){ { std::lock_guard&lt;std::mutex&gt; lock(the_mutex); the_queue.push(data); } the_condition_variable.notify_one(); } bool empty() const{ std::unique_lock&lt;std::mutex&gt; lock(the_mutex); return the_queue.empty(); } const size_t size() const{ std::lock_guard&lt;std::mutex&gt; lock(the_mutex); return the_queue.size(); } bool try_pop(Data&amp; popped_value){ std::unique_lock&lt;std::mutex&gt; lock(the_mutex); if(the_queue.empty()){ return false; } popped_value=the_queue.front(); the_queue.pop(); return true; } void wait_and_pop(Data&amp; popped_value){ std::unique_lock&lt;std::mutex&gt; lock(the_mutex); while(the_queue.empty()){ the_condition_variable.wait(lock); } popped_value=the_queue.front(); the_queue.pop(); } }; std::atomic&lt;bool&gt; done(true); typedef std::vector&lt;std::string&gt; segment; concurrent_queue&lt;segment&gt; data; const int one_block = 15; void producer() { done.store(false); std::ifstream inFile("c:/sample.txt"); if(!inFile.is_open()){ std::cout &lt;&lt; "Can't read from file\n"; return; } std::string line; segment seg; int cnt = 0; while(std::getline(inFile,line)){ seg.push_back(line); ++cnt; if( cnt == one_block ){ data.push( seg ); seg.clear(); cnt = 0; } } inFile.close(); done.store(true); std::cout &lt;&lt; "all done\n"; } void consumer( std::string fname) { std::ofstream outFile(fname.c_str()); if(!outFile.is_open()){ std::cout &lt;&lt; "Can't write to file\n"; return; } do{ while(!data.empty()){ segment seg; data.wait_and_pop( seg ); for(size_t i = 0; i &lt; seg.size(); ++i) { outFile &lt;&lt; seg[i] &lt;&lt; std::endl; } outFile.flush(); } } while(!done.load()); outFile.close(); std::cout &lt;&lt; fname &lt;&lt; " done.\n"; } int main() { std::thread th0(producer); std::thread th1(consumer, "Worker1.txt"); std::thread th2(consumer, "Worker2.txt"); std::thread th3(consumer, "Worker3.txt"); th0.join(); th1.join(); th2.join(); th3.join(); return 0; } </code></pre>
    singulars
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload