Note that there are some explanatory texts on larger screens.

plurals
  1. PO
    primarykey
    data
    text
    <p>Given you have:</p> <pre><code>IObservable&lt;string&gt; UnparsedMessages = ...; Func&lt;string, Message&gt; ParseMessage = ...; </code></pre> <p>Then you could use a <code>SelectAsync</code> extension method like so:</p> <pre><code>IObservable&lt;Message&gt; ParsedMessages = UnparsedMessages.SelectAsync(ParseMessage); </code></pre> <p>The <code>SelectAsync</code> extension method processes each unparsed message asynchronously and ensures that the results come back in the order they arrived.</p> <p>Let me know if this does what you need.</p> <p>Here's the code:</p> <pre><code>public static IObservable&lt;U&gt; SelectAsync&lt;T, U&gt;(this IObservable&lt;T&gt; source, Func&lt;T, U&gt; selector) { var subject = new Subject&lt;U&gt;(); var queue = new Queue&lt;System.Threading.Tasks.Task&lt;U&gt;&gt;(); var completing = false; var subscription = (IDisposable)null; Action&lt;Exception&gt; onError = ex =&gt; { queue.Clear(); subject.OnError(ex); subscription.Dispose(); }; Action dequeue = () =&gt; { lock (queue) { var error = false; while (queue.Count &gt; 0 &amp;&amp; queue.Peek().IsCompleted) { var task = queue.Dequeue(); if (task.Exception != null) { error = true; onError(task.Exception); break; } else { subject.OnNext(task.Result); } } if (!error &amp;&amp; completing &amp;&amp; queue.Count == 0) { subject.OnCompleted(); subscription.Dispose(); } } }; Action&lt;T&gt; enqueue = t =&gt; { if (!completing) { var task = new System.Threading.Tasks.Task&lt;U&gt;(() =&gt; selector(t)); queue.Enqueue(task); task.ContinueWith(tu =&gt; dequeue()); task.Start(); } }; subscription = source.Subscribe( t =&gt; { lock(queue) enqueue(t); }, x =&gt; { lock(queue) onError(x); }, () =&gt; { lock(queue) completing = true; }); return subject.AsObservable(); } </code></pre> <hr> <p>I ended up needing to revisit this for work and wrote a more robust version of this code (based also on Richard's answer.)</p> <p>The key advantage to this code is the absence of any explicit queue. I'm purely using task continuations to put the results back in order. Works like a treat!</p> <pre><code>public static IObservable&lt;U&gt; ForkSelect&lt;T, U&gt;(this IObservable&lt;T&gt; source, Func&lt;T, U&gt; selector) { return source.ForkSelect&lt;T, U&gt;(t =&gt; Task&lt;U&gt;.Factory.StartNew(() =&gt; selector(t))); } public static IObservable&lt;U&gt; ForkSelect&lt;T, U&gt;(this IObservable&lt;T&gt; source, Func&lt;T, Task&lt;U&gt;&gt; selector) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return Observable.CreateWithDisposable&lt;U&gt;(observer =&gt; { var gate = new object(); var onNextTask = Task.Factory.StartNew(() =&gt; { }); var sourceCompleted = false; var taskErrored = false; Action&lt;Exception&gt; onError = ex =&gt; { sourceCompleted = true; onNextTask = onNextTask.ContinueWith(t =&gt; observer.OnError(ex)); }; Action onCompleted = () =&gt; { sourceCompleted = true; onNextTask = onNextTask.ContinueWith(t =&gt; observer.OnCompleted()); }; Action&lt;T&gt; onNext = t =&gt; { var task = selector(t); onNextTask = Task.Factory.ContinueWhenAll(new[] { onNextTask, task }, ts =&gt; { if (!taskErrored) { if (task.IsFaulted) { taskErrored = true; observer.OnError(task.Exception); } else { observer.OnNext(task.Result); } } }); }; var subscription = source .AsObservable() .Subscribe( t =&gt; { if (!sourceCompleted) lock (gate) onNext(t); }, ex =&gt; { if (!sourceCompleted) lock (gate) onError(ex); }, () =&gt; { if (!sourceCompleted) lock (gate) onCompleted(); }); var @return = new CompositeDisposable(subscription); return @return; }); } </code></pre> <p>And the <code>SelectMany</code> overloads to allow LINQ to be used are:</p> <pre><code>public static IObservable&lt;U&gt; SelectMany&lt;T, U&gt;(this IObservable&lt;T&gt; source, Func&lt;T, Task&lt;U&gt;&gt; selector) { return source.ForkSelect&lt;T, U&gt;(selector); } public static IObservable&lt;V&gt; SelectMany&lt;T, U, V&gt;(this IObservable&lt;T&gt; source, Func&lt;T, Task&lt;U&gt;&gt; taskSelector, Func&lt;T, U, V&gt; resultSelector) { if (source == null) throw new ArgumentNullException("source"); if (taskSelector == null) throw new ArgumentNullException("taskSelector"); if (resultSelector == null) throw new ArgumentNullException("resultSelector"); return source.Zip(source.ForkSelect&lt;T, U&gt;(taskSelector), (t, u) =&gt; resultSelector(t, u)); } </code></pre> <p>So these methods can now be used like this:</p> <pre><code>var observableOfU = observableOfT.ForkSelect(funcOfT2U); </code></pre> <p>Or:</p> <pre><code>var observableOfU = observableOfT.ForkSelect(funcOfT2TaskOfU); </code></pre> <p>Or:</p> <pre><code>var observableOfU = from t in observableOfT from u in funcOfT2TaskOfU(t) select u; </code></pre> <p>Enjoy!</p>
    singulars
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. VO
      singulars
      1. This table or related slice is empty.
    2. VO
      singulars
      1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload