Note that there are some explanatory texts on larger screens.

plurals
  1. PO
    text
    copied!<p>I can't admit Richard Szalay's solution as acceptable. If you start 100 requests and second request failed with server unavailable error (for example) remaining 98 requests will be aborted. The second problem is how UI will react on such observable? Too sad. </p> <p>Keeping in mind chapter 4.3 of <a href="http://blogs.msdn.com/b/rxteam/archive/2010/10/28/rx-design-guidelines.aspx" rel="nofollow">Rx Design Guidelines</a> I desired to express WebRequest observable via Observable.Using() operator. But WebRequest is not disposable! So I defined DisposableWebRequest: </p> <pre><code>public class DisposableWebRequest : WebRequest, IDisposable { private static int _Counter = 0; private readonly WebRequest _request; private readonly int _index; private volatile bool _disposed = false; public DisposableWebRequest (string url) { this._request = WebRequest.Create(url); this._index = ++DisposableWebRequest._Counter; } public override IAsyncResult BeginGetResponse(AsyncCallback callback, object state) { return this._request.BeginGetResponse(callback, state); } public override WebResponse EndGetResponse(IAsyncResult asyncResult) { Trace.WriteLine(string.Format("EndGetResponse index {0} in thread {1}", this._index, Thread.CurrentThread.ManagedThreadId)); Trace.Flush(); if (!this._disposed) { return this._request.EndGetResponse(asyncResult); } else { return null; } } public override WebResponse GetResponse() { return this._request.GetResponse(); } public override void Abort() { this._request.Abort(); } public void Dispose() { if(!this._disposed) { this._disposed = true; Trace.WriteLine(string.Format("Disposed index {0} in thread {1}", this._index, Thread.CurrentThread.ManagedThreadId )); Trace.Flush(); this.Abort(); } } } </code></pre> <p>Then I create WPF window and put two buttons on it (Start and Stop). </p> <p>So, let's create proper requests observable collection. At first, define URL's observable create function:</p> <pre><code> Func&lt;IObservable&lt;string&gt;&gt; createUrlObservable = () =&gt; Observable .Return("http://yahoo.com") .Repeat(100) .OnStartup(() =&gt; { this._failed = 0; this._successed = 0; }); </code></pre> <p>On every url we should create webrequest obervable, so: </p> <pre><code> Func&lt;string, IObservable&lt;WebResponse&gt;&gt; createRequestObservable = url =&gt; Observable.Using( () =&gt; new DisposableWebRequest("http://yahoo.com"), r =&gt; { Trace.WriteLine("Queued " + url); Trace.Flush(); return Observable .FromAsyncPattern&lt;WebResponse&gt;(r.BeginGetResponse, r.EndGetResponse)(); }); </code></pre> <p>In addition define two event observables which reacts on buttons "Start"/"Stop" clicks: </p> <pre><code> var startMouseDown = Observable.FromEvent&lt;RoutedEventArgs&gt;(this.StartButton, "Click"); var stopMouseDown = Observable.FromEvent&lt;RoutedEventArgs&gt;(this.StopButton, "Click"); </code></pre> <p>So bricks are ready, time to compose them (I do it in view constructor just after InitializeComponent()): </p> <pre><code> startMouseDown .SelectMany(down =&gt; createUrlObservable() .SelectMany(url =&gt; createRequestObservable(url) .TakeUntil(stopMouseDown) .Select(r =&gt; r.GetResponseStream()) .Do(s =&gt; { using (var sr = new StreamReader(s)) { Trace.WriteLine(sr.ReadLine()); Trace.Flush(); } }) .Do(r =&gt; this._successed++) .HandleError(e =&gt; this._failed++)) .ObserveOnDispatcher() .Do(_ =&gt; this.RefresLabels(), e =&gt; { }, () =&gt; this.RefresLabels()) ) .Subscribe(); </code></pre> <p>You may wonder on function "HandleError()". If exception occured in EndGetResponse() call (I turned off network connection to reproduce it) and not catched in request observable - it will crash the startMouseDown observable. HandleError catches exception silently, perfom provided action and instead of call OnError for next observer it calls OnCompleted: </p> <pre><code>public static class ObservableExtensions { public static IObservable&lt;TSource&gt; HandleError&lt;TSource&gt;(this IObservable&lt;TSource&gt; source, Action&lt;Exception&gt; errorHandler) { return Observable.CreateWithDisposable&lt;TSource&gt;(observer =&gt; { return source.Subscribe(observer.OnNext, e =&gt; { errorHandler(e); //observer.OnError(e); observer.OnCompleted(); }, observer.OnCompleted); }); } } </code></pre> <p>The last unexplained place is method RefreshLabels, which updates UI controls: </p> <pre><code> private void RefresLabels() { this.SuccessedLabel.Content = string.Format("Successed {0}", this._successed); this.FailedLabel.Content = string.Format("Failed {0}", this._failed); } </code></pre>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload