Note that there are some explanatory texts on larger screens.

plurals
  1. PO
    text
    copied!<p>@GideonEngelberth makes a great point that my original solution (below) doesn't work. I've stolen some ideas from @Enigmativity, but where he stops short of actually using the 'switching stream' to retrieve values from the source, I have gone the extra step.</p> <p>The result is this extension function:</p> <pre><code>public static IObservable&lt;T&gt; WhereTimed&lt;T&gt;(this IObservable&lt;T&gt; source, Func&lt;T,bool&gt; pred, TimeSpan minTime) { var published = source.Publish().RefCount(); // we make multiple subscriptions, let's share them var switches = published.Select(pred).DistinctUntilChanged(); return published.Window(switches.Where(x =&gt; x), _ =&gt; switches.Where(x =&gt; !x)) .SelectMany(xs =&gt; xs.SkipUntil(Observable.Timer(minTime)).TakeWhile(pred)); } </code></pre> <p>This uses the <code>Select(pred).DistinctUntilChanged()</code> to give us a stream that fires once whenever we go under the threshold (<code>true</code>) and once whenever we go above the threshold (<code>false</code>). </p> <p>We then leverage this stream to create a window over the observables between <code>true</code> and <code>false</code>. We skip the first N seconds of that new stream and take while we're still below the threshold.</p> <hr> <p><strong>Original:</strong> (not working)</p> <p>This problem breaks down to saying:</p> <p>When the stream is below 50, skip the first 10 seconds of values then return the stream. Do this until the stream is above 50. Then start all over again.</p> <p>The beauty of RX is that this can be translated as is to the relevant functions. Here's a simple extension method that does exactly this:</p> <pre><code>public static IObservable&lt;T&gt; WhereTimed&lt;T&gt;(this IObservable&lt;T&gt; source, Func&lt;T,bool&gt; pred, TimeSpan minTime) { var published = source.Publish().RefCount(); // we make multiple subscriptions, let's share them var openers = published.Where(pred); // start taking at this point var closers = published.Where(z =&gt; !pred(z)); // stop taking at this point return openers.SkipUntil(Observable.Timer(minTime)) .TakeUntil(closers) .Repeat(); } </code></pre> <p>Here is a test to show it working:</p> <pre><code>var ws = Observable.Repeat(1,10); var xs = Observable.Repeat(2,10); var ys = Observable.Repeat(100,10); var zs = ws.Concat(xs.Delay(TimeSpan.FromSeconds(2)).Concat(ys)).Repeat(5); zs.WhereTimed(z =&gt; z &lt;= 50, TimeSpan.FromSeconds(1)).Subscribe(Console.WriteLine); </code></pre> <p>Here the <code>zs</code> stream is:</p> <pre><code>(1 x 10) &lt;2 second pause&gt; (2 x 10) (100 x 10) [repeat 5 times] </code></pre> <p>Based on our rules, this should result in the first 2 second pause triggering the stream, and showing the <code>2</code> value 10 times. However, that should immediately reset when the <code>100</code> is hit, then when the <code>1</code> gets played, there has been insufficient pause for any <code>1</code> to show. This then repeats a few times, and only the number <code>2</code> is ever shown.</p>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload