Note that there are some explanatory texts on larger screens.

plurals
  1. PO
    text
    copied!<p>There's a fairly straight forward way to do what you want with Rx, but you need to think in terms of observables only and not mix in enumerables.</p> <p>The function signature that you really need to think in terms of is:</p> <pre><code>IObservable&lt;IObservable&lt;ChannelState&gt;&gt; --&gt; IObservable&lt;ChannelSetState&gt; </code></pre> <p>Here's the function:</p> <pre><code>Func&lt;IObservable&lt;IObservable&lt;ChannelState&gt;&gt;, IObservable&lt;ChannelSetState&gt;&gt; f = channelStates =&gt; channelStates .Merge() .Select(cs =&gt; cs == ChannelState.Operational ? 1 : -1) .Scan(0, (cssn, csn) =&gt; cssn + csn) .Select(cssn =&gt; cssn &gt; 0 ? ChannelSetState.Up : ChannelSetState.Down) .DistinctUntilChanged(); </code></pre> <p>It is important that each <code>IObservable&lt;ChannelState&gt;</code> in the <code>IObservable&lt;IObservable&lt;ChannelState&gt;&gt;</code> behaves properly for this to work.</p> <p>I've assumed that the <code>ChannelState</code> enum has an <code>Idle</code> state and that each <code>IObservable&lt;ChannelState&gt;</code> will produce zero or more pairs of <code>Operational</code>/<code>Idle</code> values (<code>Operational</code> followed by <code>Idle</code>) before completing.</p> <p>Also you said "the collection of channels can be added to and removed from" - thinking in terms of <code>IEnumerable&lt;IObservable&lt;ChannelState&gt;&gt;</code> this sounds reasonable - but in Rx you don't have to worry about removes because each observable can signal its own completion. Once it signals completion then it is as if it has been removed from the collection because it can not produce any further values. So you only need to worry about adding to the collection - this is easy using subjects.</p> <p>So now the function can be used like so:</p> <pre><code>var channelStatesSubject = new Subject&lt;IObservable&lt;ChannelState&gt;&gt;(); var channelStates = channelStatesSubject.AsObservable(); var channelSetStates = f(channelStates); channelSetStates.Subscribe(css =&gt; { /* ChannelSetState subscription code */ }); channelStatesSubject.OnNext(/* IObservable&lt;ChannelState&gt; */); channelStatesSubject.OnNext(/* IObservable&lt;ChannelState&gt; */); channelStatesSubject.OnNext(/* IObservable&lt;ChannelState&gt; */); // etc </code></pre> <p>I ran this using some test code, that used three random <code>ChannelState</code> observables, with a <code>Do</code> call in the <code>f</code> function for debugging, and got the following sequence:</p> <pre><code>1 Up 2 3 2 1 2 1 0 Down 1 Up 0 Down </code></pre> <p>I think that's what you're after. Let me know if I've missed anything.</p> <hr> <p>As per the comments below, the <code>ChannelState</code> enum has multiple states, but only <code>Operational</code> means that the connection is up. So it's very easy to add a <code>DistinctUntilChanged</code> operator to hide multiple "down" states. Here's the code now:</p> <pre><code>Func&lt;IObservable&lt;IObservable&lt;ChannelState&gt;&gt;, IObservable&lt;ChannelSetState&gt;&gt; f = channelStates =&gt; channelStates .Merge() .Select(cs =&gt; cs == ChannelState.Operational ? 1 : -1) .DistinctUntilChanged() .Scan(0, (cssn, csn) =&gt; cssn + csn) .Select(cssn =&gt; cssn &gt; 0 ? ChannelSetState.Up : ChannelSetState.Down) .DistinctUntilChanged(); </code></pre> <hr> <p>Added code to ensure that the first select query always starts with a <code>1</code>. Here's the code now:</p> <pre><code>Func&lt;IObservable&lt;IObservable&lt;ChannelState&gt;&gt;, IObservable&lt;ChannelSetState&gt;&gt; f = channelStates =&gt; channelStates .Merge() .Select(cs =&gt; cs == ChannelState.Operational ? 1 : -1) .StartWith(1) .DistinctUntilChanged() .Scan(0, (cssn, csn) =&gt; cssn + csn) .Select(cssn =&gt; cssn &gt; 0 ? ChannelSetState.Up : ChannelSetState.Down) .DistinctUntilChanged(); </code></pre>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload