Note that there are some explanatory texts on larger screens.

plurals
  1. POFirstOrLast IObservable Extension
    text
    copied!<p>I want to run through an <code>IObservable&lt;T&gt;</code> looking for an element that matches a predicate, and if not found, return the last element of the <code>IObservable&lt;T&gt;</code>. I don't want to have to store the entire contents of the <code>IObservable&lt;T&gt;</code>, and I don't want to loop through the <code>IObservable</code> twice, so I've set up an extension method</p> <pre><code>public static class ObservableExtensions { public static IObservable&lt;T&gt; FirstOrLastAsync&lt;T&gt;(this IObservable&lt;T&gt; source, Func&lt;T, bool&gt; pred) { return Observable.Create&lt;T&gt;(o =&gt; { var hot = source.Publish(); var store = new AsyncSubject&lt;T&gt;(); var d1 = hot.Subscribe(store); var d2 = hot.FirstAsync(x =&gt; pred(x)).Amb(store).Subscribe(o); var d3 = hot.Connect(); return new CompositeDisposable(d1, d2, d3); }); } public static T FirstOrLast&lt;T&gt;(this IObservable&lt;T&gt; source, Func&lt;T, bool&gt; pred) { return source.FirstOrLastAsync(pred).Wait(); } } </code></pre> <p>The Async method creates a hot observable from a potentially cold one passed in. It subscribes an <code>AsyncSubject&lt;T&gt;</code> to remember the last element, and an <code>IObservable&lt;T&gt;</code> that looks for the element. It then takes the first element from either of those <code>IObservable&lt;T&gt;</code>s, which ever returns a value first via <code>.Amb</code> (<code>AsyncSubject&lt;T&gt;</code> doesn't return a value until it gets an <code>.OnCompleted</code> message).</p> <p>My questions are the following: </p> <ul> <li>Can this be written better or more concisely using different Observable methods? </li> <li>Do all of those disposables need to be included in the CompositeDisposable? </li> <li>When the hot observable is completed without finding a matching element, is there a race condition between FirstAsync throwing an exception, and the AsyncSubject propagating its value? </li> <li>If so, do I need to change the line to:</li> </ul> <p><code>var d2 = hot.Where(x =&gt; pred(x)).Take(1).Amb(store).Subscribe(o);</code></p> <p>I'm pretty new to RX, and this is my first extension on IObservable.</p> <p><em>EDIT</em></p> <p>I ended up going with</p> <pre><code>public static class ObservableExtensions { public static IObservable&lt;T&gt; FirstOrLastAsync&lt;T&gt;(this IObservable&lt;T&gt; source, Func&lt;T, bool&gt; pred) { var hot = source.Publish().RefCount(); return hot.TakeLast(1).Amb(hot.Where(pred).Take(1).Concat(Observable.Never&lt;T&gt;())); } public static T FirstOrLast&lt;T&gt;(this IObservable&lt;T&gt; source, Func&lt;T, bool&gt; pred) { return source.FirstOrLastAsync(pred).First(); } } </code></pre>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload