Note that there are some explanatory texts on larger screens.

plurals
  1. POImplementing IObservable<T> from scratch
    text
    copied!<p>The Reactive Extensions come with a lot of helper methods for turning existing events and asynchronous operations into observables but how would you implement an IObservable&lt;T&gt; from scratch?</p> <p>IEnumerable has the lovely yield keyword to make it very simple to implement.</p> <p>What is the proper way of implementing IObservable&lt;T&gt;?</p> <p>Do I need to worry about thread safety? </p> <p>I know there is support for getting called back on a specific synchronization context but is this something I as an IObservable&lt;T&gt; author need to worry about or this somehow built-in?</p> <p><strong>update:</strong></p> <p>Here's my C# version of Brian's F# solution</p> <pre><code>using System; using System.Linq; using Microsoft.FSharp.Collections; namespace Jesperll { class Observable&lt;T&gt; : IObservable&lt;T&gt;, IDisposable where T : EventArgs { private FSharpMap&lt;int, IObserver&lt;T&gt;&gt; subscribers = FSharpMap&lt;int, IObserver&lt;T&gt;&gt;.Empty; private readonly object thisLock = new object(); private int key; private bool isDisposed; public void Dispose() { Dispose(true); } protected virtual void Dispose(bool disposing) { if (disposing &amp;&amp; !isDisposed) { OnCompleted(); isDisposed = true; } } protected void OnNext(T value) { if (isDisposed) { throw new ObjectDisposedException("Observable&lt;T&gt;"); } foreach (IObserver&lt;T&gt; observer in subscribers.Select(kv =&gt; kv.Value)) { observer.OnNext(value); } } protected void OnError(Exception exception) { if (isDisposed) { throw new ObjectDisposedException("Observable&lt;T&gt;"); } if (exception == null) { throw new ArgumentNullException("exception"); } foreach (IObserver&lt;T&gt; observer in subscribers.Select(kv =&gt; kv.Value)) { observer.OnError(exception); } } protected void OnCompleted() { if (isDisposed) { throw new ObjectDisposedException("Observable&lt;T&gt;"); } foreach (IObserver&lt;T&gt; observer in subscribers.Select(kv =&gt; kv.Value)) { observer.OnCompleted(); } } public IDisposable Subscribe(IObserver&lt;T&gt; observer) { if (observer == null) { throw new ArgumentNullException("observer"); } lock (thisLock) { int k = key++; subscribers = subscribers.Add(k, observer); return new AnonymousDisposable(() =&gt; { lock (thisLock) { subscribers = subscribers.Remove(k); } }); } } } class AnonymousDisposable : IDisposable { Action dispose; public AnonymousDisposable(Action dispose) { this.dispose = dispose; } public void Dispose() { dispose(); } } } </code></pre> <p><strong>edit:</strong> Don't throw ObjectDisposedException if Dispose is called twice</p>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload