Note that there are some explanatory texts on larger screens.

plurals
  1. PO
    primarykey
    data
    text
    <p>The code you have written is almost there to run the observable in parallel. If you write your observer as this:</p> <pre><code>public class Subscriber : IObserver&lt;int&gt; { public void OnNext(int a) { Console.WriteLine("{0} on {1} at {2}", a, Thread.CurrentThread.ManagedThreadId, DateTime.Now.ToString()); } public void OnError(Exception e) { } public void OnCompleted() { } } </code></pre> <p>Then running this code:</p> <pre><code>var observable = Observable .Interval(TimeSpan.FromSeconds(1.0)) .Select(x =&gt; (int)x) .Take(5) .ObserveOn(Scheduler.ThreadPool); var sub1 = new Subscriber(); var sub2 = new Subscriber(); observable.Subscribe(sub1); observable.Subscribe(sub2); Thread.Sleep(10000); </code></pre> <p>Will produce the following:</p> <pre><code>0 on 28 at 2011/10/20 00:13:49 0 on 16 at 2011/10/20 00:13:49 1 on 29 at 2011/10/20 00:13:50 1 on 22 at 2011/10/20 00:13:50 2 on 27 at 2011/10/20 00:13:51 2 on 29 at 2011/10/20 00:13:51 3 on 27 at 2011/10/20 00:13:52 3 on 19 at 2011/10/20 00:13:52 4 on 27 at 2011/10/20 00:13:53 4 on 27 at 2011/10/20 00:13:53 </code></pre> <p>It's already running the subscriptions in parallel on different threads.</p> <p>The important thing that I used was the <code>.ObserveOn</code> extension method - that's what made this work.</p> <p>You should keep in mind that observers don't generally share the same instance of observables. Subscribing to an observable effectively wires up a unique "chain" of observable operators from the source of the observable to the observer. This is much the same as calling <code>GetEnumerator</code> twice on an enumerable, you will not share the same enumerator instance, you will get two unique instances.</p> <p>Now, I want to describe what I mean by a chain. I'm going to give the Reflector.NET extracted code from <code>Observable.Generate</code> &amp; <code>Observable.Where</code> to illustrate the point.</p> <p>Take this code for example:</p> <pre><code>var xs = Observable.Generate(0, x =&gt; x &lt; 10, x =&gt; x + 1, x =&gt; x); var ys = xs.Where(x =&gt; x % 2 == 0); ys.Subscribe(y =&gt; { /* produces 0, 2, 4, 6, 8 */ }); </code></pre> <p>Under the hood both <code>Generate</code> &amp; <code>Where</code> each create a new instance of the internal Rx class <code>AnonymousObservable&lt;T&gt;</code>. The constructor for <code>AnonymousObservable&lt;T&gt;</code> takes a <code>Func&lt;IObserver&lt;T&gt;, IDisposable&gt;</code> delegate which it uses whenever it receives a call to <code>Subscribe</code>.</p> <p>The slightly cleaned up code for <code>Observable.Generate&lt;T&gt;(...)</code> from Reflector.NET is:</p> <pre><code>public static IObservable&lt;TResult&gt; Generate&lt;TState, TResult&gt;( TState initialState, Func&lt;TState, bool&gt; condition, Func&lt;TState, TState&gt; iterate, Func&lt;TState, TResult&gt; resultSelector, IScheduler scheduler) { return new AnonymousObservable&lt;TResult&gt;((IObserver&lt;TResult&gt; observer) =&gt; { TState state = initialState; bool first = true; return scheduler.Schedule((Action self) =&gt; { bool flag = false; TResult local = default(TResult); try { if (first) { first = false; } else { state = iterate(state); } flag = condition(state); if (flag) { local = resultSelector(state); } } catch (Exception exception) { observer.OnError(exception); return; } if (flag) { observer.OnNext(local); self(); } else { observer.OnCompleted(); } }); }); } </code></pre> <p>The <code>Action self</code> parameter is a recursive call that iterates output values. You'll notice that nowhere in this code does the <code>observer</code> get stored or that the values get pasted to more than one observer. This code runs once for each new observer.</p> <p>The slightly cleaned up code for <code>Observable.Where&lt;T&gt;(...)</code> from Reflector.NET is:</p> <pre><code>public static IObservable&lt;TSource&gt; Where&lt;TSource&gt;( this IObservable&lt;TSource&gt; source, Func&lt;TSource, bool&gt; predicate) { return new AnonymousObservable&lt;TSource&gt;(observer =&gt; source.Subscribe(x =&gt; { bool flag; try { flag = predicate(x); } catch (Exception exception) { observer.OnError(exception); return; } if (flag) { observer.OnNext(x); } }, ex =&gt; observer.OnError(ex), () =&gt; observer.OnCompleted)); } </code></pre> <p>Again this code doesn't track multiple observers. It calls <code>Subscribe</code> effectively passing its own code as the observer to the underlying <code>source</code> observable.</p> <p>You should see that, in my example code above, subscribing to <code>Where</code> creates a subscription to <code>Generate</code> and hence this is a chain of observables. In fact it's chaining subscribe calls on a series of <code>AnonymousObservable</code> objects.</p> <p>If you have two subscriptions you have two chains. If you have 1,000 subscriptions you have 1,000 chains.</p> <p>Now, just as a side note - even though there are <code>IObservable&lt;T&gt;</code> and <code>IObserver&lt;T&gt;</code> interfaces - you should very very rarely actually implement these in your own classes. The built-in classes and operators handle 99.99% of all cases. It's a bit like <code>IEnumerable&lt;T&gt;</code> - how often do you need to implement this interface yourself?</p> <p>Let me know if this helps and if you need any further explanation.</p>
    singulars
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. VO
      singulars
      1. This table or related slice is empty.
    2. VO
      singulars
      1. This table or related slice is empty.
    3. VO
      singulars
      1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload