Note that there are some explanatory texts on larger screens.

plurals
  1. PO
    text
    copied!<p>Honestly, I am not sure how 'right' all this is, but if feels pretty good based on my experience so far. It's F# code, but hopefully you get a sense of the flavor. It lets you 'new up' a source object, which you can then call Next/Completed/Error on, and it manages subscriptions and tries to Assert when the source or clients do bad things.</p> <pre><code>type ObservableSource&lt;'T&gt;() = // ' let protect f = let mutable ok = false try f() ok &lt;- true finally Debug.Assert(ok, "IObserver methods must not throw!") // TODO crash? let mutable key = 0 // Why a Map and not a Dictionary? Someone's OnNext() may unsubscribe, so we need threadsafe 'snapshots' of subscribers to Seq.iter over let mutable subscriptions = Map.empty : Map&lt;int,IObserver&lt;'T&gt;&gt; // ' let next(x) = subscriptions |&gt; Seq.iter (fun (KeyValue(_,v)) -&gt; protect (fun () -&gt; v.OnNext(x))) let completed() = subscriptions |&gt; Seq.iter (fun (KeyValue(_,v)) -&gt; protect (fun () -&gt; v.OnCompleted())) let error(e) = subscriptions |&gt; Seq.iter (fun (KeyValue(_,v)) -&gt; protect (fun () -&gt; v.OnError(e))) let thisLock = new obj() let obs = { new IObservable&lt;'T&gt; with // ' member this.Subscribe(o) = let k = lock thisLock (fun () -&gt; let k = key key &lt;- key + 1 subscriptions &lt;- subscriptions.Add(k, o) k) { new IDisposable with member this.Dispose() = lock thisLock (fun () -&gt; subscriptions &lt;- subscriptions.Remove(k)) } } let mutable finished = false // The methods below are not thread-safe; the source ought not call these methods concurrently member this.Next(x) = Debug.Assert(not finished, "IObserver is already finished") next x member this.Completed() = Debug.Assert(not finished, "IObserver is already finished") finished &lt;- true completed() member this.Error(e) = Debug.Assert(not finished, "IObserver is already finished") finished &lt;- true error e // The object returned here is threadsafe; you can subscribe and unsubscribe (Dispose) concurrently from multiple threads member this.Value = obs </code></pre> <p>I'll be interested in any thoughts about what's good or bad here; I haven't had a chance to look at all the new Rx stuff from devlabs yet...</p> <p>My own experiences suggest that:</p> <ul> <li>Those who subscribe to observables should never throw from the subscriptions. There is nothing reasonable an observable can do when a subscriber throws. (This is similar to events.) Most likely the exception will just bubble up to a top-level catch-all handler or crash the app.</li> <li>Sources probably should be "logically single threaded". I think it may be harder to write clients that can react to concurrent OnNext calls; even if each individual call comes from a different thread, it is helpful to avoid concurrent calls.</li> <li>It's definitely useful to have a base/helper class that enforces some 'contracts'.</li> </ul> <p>I'm very curious if people can show more concrete advice along these lines.</p>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload