Note that there are some explanatory texts on larger screens.

plurals
  1. PO
    text
    copied!<p>Have you tried the Reactive Extensions?</p> <p><a href="http://msdn.microsoft.com/en-us/data/gg577609.aspx" rel="nofollow noreferrer">http://msdn.microsoft.com/en-us/data/gg577609.aspx</a></p> <p>The Rx is a new tecnology from Microsoft, the focus as stated in the official site:</p> <blockquote> <p>The Reactive Extensions (Rx)... ...is a library to compose asynchronous and event-based programs using observable collections and LINQ-style query operators.</p> </blockquote> <p>You can download it as a Nuget package</p> <p><a href="https://nuget.org/packages/Rx-Main/1.0.11226" rel="nofollow noreferrer">https://nuget.org/packages/Rx-Main/1.0.11226</a></p> <p>Since I am currently learning Rx I wanted to take this example and just write code for it, the code I ended up it is not actually executed in parallel, but it is completely asynchronous, and guarantees the source lines are executed in order.</p> <p>Perhaps this is not the best implementation, but like I said I am learning Rx, (thread-safe should be a good improvement)</p> <p>This is a DTO that I am using to return data from the background threads</p> <pre><code>class MyItem { public string Line { get; set; } public int CurrentThread { get; set; } } </code></pre> <p>These are the basic methods doing the real work, I am simulating the time with a simple <code>Thread.Sleep</code> and I am returning the thread used to execute each method <code>Thread.CurrentThread.ManagedThreadId</code>. Note the timer of the <code>ProcessLine</code> it is 4 sec, it's the most time-consuming operation</p> <pre><code>private IEnumerable&lt;MyItem&gt; ReadLinesFromFile(string fileName) { var source = from e in Enumerable.Range(1, 10) let v = e.ToString() select v; foreach (var item in source) { Thread.Sleep(1000); yield return new MyItem { CurrentThread = Thread.CurrentThread.ManagedThreadId, Line = item }; } } private MyItem UpdateResultToDatabase(string processedLine) { Thread.Sleep(700); return new MyItem { Line = "s" + processedLine, CurrentThread = Thread.CurrentThread.ManagedThreadId }; } private MyItem ProcessLine(string line) { Thread.Sleep(4000); return new MyItem { Line = "p" + line, CurrentThread = Thread.CurrentThread.ManagedThreadId }; } </code></pre> <p>The following method I am using it just to update the UI</p> <pre><code>private void DisplayResults(MyItem myItem, Color color, string message) { this.listView1.Items.Add( new ListViewItem( new[] { message, myItem.Line , myItem.CurrentThread.ToString(), Thread.CurrentThread.ManagedThreadId.ToString() } ) { ForeColor = color } ); } </code></pre> <p><strong>And finally this is the method that calls the Rx API</strong></p> <pre><code>private void PlayWithRx() { // we init the observavble with the lines read from the file var source = this.ReadLinesFromFile("some file").ToObservable(Scheduler.TaskPool); source.ObserveOn(this).Subscribe(x =&gt; { // for each line read, we update the UI this.DisplayResults(x, Color.Red, "Read"); // for each line read, we subscribe the line to the ProcessLine method var process = Observable.Start(() =&gt; this.ProcessLine(x.Line), Scheduler.TaskPool) .ObserveOn(this).Subscribe(c =&gt; { // for each line processed, we update the UI this.DisplayResults(c, Color.Blue, "Processed"); // for each line processed we subscribe to the final process the UpdateResultToDatabase method // finally, we update the UI when the line processed has been saved to the database var persist = Observable.Start(() =&gt; this.UpdateResultToDatabase(c.Line), Scheduler.TaskPool) .ObserveOn(this).Subscribe(z =&gt; this.DisplayResults(z, Color.Black, "Saved")); }); }); } </code></pre> <p>This process runs totally in the background, this is the output generated:</p> <p><img src="https://i.stack.imgur.com/hUfzl.png" alt="enter image description here"></p>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload