Note that there are some explanatory texts on larger screens.

plurals
  1. PO
    text
    copied!<p>Here is a stab to answer my own question, please let me know if you see any issues with it.</p> <pre><code>public static class ObservableHelper { /// &lt;summary&gt; /// Buffers entries that do no satisfy the &lt;paramref name="shouldFlush"/&gt; condition, using a circular buffer with a max /// capacity. When an entry that satisfies the condition ocurrs, then it flushes the circular buffer and the new entry, /// and starts buffering again. /// &lt;/summary&gt; /// &lt;typeparam name="T"&gt;The type of entry.&lt;/typeparam&gt; /// &lt;param name="stream"&gt;The original stream of events.&lt;/param&gt; /// &lt;param name="shouldFlush"&gt;The condition that defines whether the item and the buffered entries are flushed.&lt;/param&gt; /// &lt;param name="bufferSize"&gt;The buffer size for accumulated entries.&lt;/param&gt; /// &lt;returns&gt;An observable that has this filtering capability.&lt;/returns&gt; public static IObservable&lt;T&gt; FlushOnTrigger&lt;T&gt;(this IObservable&lt;T&gt; stream, Func&lt;T, bool&gt; shouldFlush, int bufferSize) { if (stream == null) throw new ArgumentNullException("stream"); if (shouldFlush == null) throw new ArgumentNullException("shouldFlush"); if (bufferSize &lt; 1) throw new ArgumentOutOfRangeException("bufferSize"); return System.Reactive.Linq.Observable.Create&lt;T&gt;(observer =&gt; { var buffer = new CircularBuffer&lt;T&gt;(bufferSize); var subscription = stream.Subscribe( newItem =&gt; { bool result; try { result = shouldFlush(newItem); } catch (Exception ex) { return; } if (result) { foreach (var buffered in buffer.TakeAll()) { observer.OnNext(buffered); } observer.OnNext(newItem); } else { buffer.Add(newItem); } }, observer.OnError, observer.OnCompleted); return subscription; }); } } </code></pre> <p>By the way, CircularBuffer does not exist out of the box, but the implementation is straightforward.</p> <p>Then I just call:</p> <pre><code> data .FlushOnTrigger(item =&gt; item == 'X', bufferSize: 2) .Subscribe(Console.WriteLine); </code></pre>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload