Note that there are some explanatory texts on larger screens.

plurals
  1. POModifying RX Computational Expression Builder to hold previous values
    primarykey
    data
    text
    <p>I'm using a slightly modified version of the RX builder presented here:</p> <p><a href="http://mnajder.blogspot.com/2011/09/when-reactive-framework-meets-f-30.html" rel="nofollow">http://mnajder.blogspot.com/2011/09/when-reactive-framework-meets-f-30.html</a></p> <p>Rather than taking <code>IObservable&lt;'T&gt;</code> directly my computational expression has a type of:</p> <pre><code>type MyType&lt;'a,'b&gt; = MyType of (IObservable&lt;'a&gt; -&gt; IObservable&lt;'b&gt;) let extract (MyType t) = t </code></pre> <p>Combinators then take on the form:</p> <pre><code>let where (f: 'b -&gt; bool) (m:MyType&lt;_,'b&gt;) = MyType(fun input -&gt; (extract m input).Where(f)) </code></pre> <p>Within the expression itself, I often need to reference back to previous values that have been fed into the stream. In order to do so, I've defined a <code>MyType</code> which maintains a rolling immutable list of the <code>n</code> most recent values.</p> <pre><code>let history n = MyType(fun input -&gt; Observable.Create(fun (o:IObserver&lt;_&gt;) -&gt; let buffer = new History&lt;_&gt;(n) o.OnNext(HistoryReadOnly(buffer)) input.Subscribe(buffer.Push, o.OnError, o.OnCompleted) ) ) </code></pre> <p>With this, I can now do something like:</p> <pre><code>let f = obs { let! history = history 20 // Run some other types, and possibly do something with history } </code></pre> <p>I am finding that I am using this history quite frequently, ideally I would want to have this embedded directly into <code>IObservable&lt;'a&gt;</code>. Obviously I can't do that. So my question is, what is a reasonable way to introduce this concept of history that I have here. Should I be extending <code>IObservable&lt;'T&gt;</code> (not sure how to do that), wrapping the <code>IObservable&lt;'T&gt;</code>?</p> <p>I appreciate any suggestions.</p> <p>Edit: Added full example code.</p> <pre><code>open System open System.Collections.Generic open System.Reactive.Subjects open System.Reactive.Linq // Container function type MyType&lt;'a,'b&gt; = MyType of (IObservable&lt;'a&gt; -&gt; IObservable&lt;'b&gt;) let extract (MyType t) = t // Mini Builder let internal mbind (myTypeB:MyType&lt;'a,'b&gt;) (f:'b -&gt; MyType&lt;'a,'c&gt;) = MyType(fun input -&gt; let obsB = extract myTypeB input let myTypeC= fun resB -&gt; extract (f resB) input obsB.SelectMany(myTypeC) ) type MyTypeBuilder() = member x.Bind (m,f) = mbind m f member x.Combine (a,b) = MyType(fun input -&gt; (extract a input).Concat(extract b input)) member x.Yield (r) = MyType(fun input -&gt; Observable.Return(r)) member x.YieldFrom (m:MyType&lt;_,_&gt;) = m member x.Zero() = MyType(fun input -&gt; Observable.Empty()) member x.Delay(f:unit -&gt; MyType&lt;'a,'b&gt;) = f() let mtypeBuilder = new MyTypeBuilder() // Combinators let simplehistory = MyType(fun input -&gt; Observable.Create(fun (o:IObserver&lt;_&gt;) -&gt; let buffer = new List&lt;_&gt;() o.OnNext(buffer) input.Subscribe(buffer.Add, o.OnError, o.OnCompleted) ) ) let where (f: 'b -&gt; bool) m = MyType(fun input -&gt; (extract m input).Where(f)) let take (n:int) m = MyType(fun input -&gt; (extract m input).Take(n)) let buffer m = MyType(fun input -&gt; (extract m input).Buffer(1)) let stream = MyType(id) // Example let myTypeResult (t:MyType&lt;'a,'b&gt;) (input:'a[]) = (extract t (input.ToObservable().Publish().RefCount())).ToArray().Single() let dat = [|1 .. 20|] let example = mtypeBuilder { let! history = simplehistory let! someEven = stream |&gt; where(fun v -&gt; v % 2 = 0) // Foreach Even let! firstValAfterPrevMatch = stream |&gt; take 1 // Potentially where a buffer operation would run, all values here are after i.e. we cant get values before last match let! odd = stream |&gt; where(fun v -&gt; v % 2 = 1) |&gt; take 2 // Take 2 odds that follow it yield (history.[history.Count - 1], history.[0], someEven,firstValAfterPrevMatch, odd) // Return the last visited item in our stream, the very first item, an even, the first value after the even and an odd } let result = myTypeResult example dat val result : (int * int * int * int * int) [] = [|(5, 1, 2, 3, 5); (7, 1, 2, 3, 7); (7, 1, 4, 5, 7); (9, 1, 4, 5, 9); (9, 1, 6, 7, 9); (11, 1, 6, 7, 11); (11, 1, 8, 9, 11); (13, 1, 8, 9, 13); (13, 1, 10, 11, 13); (15, 1, 10, 11, 15); (15, 1, 12, 13, 15); (17, 1, 12, 13, 17); (17, 1, 14, 15, 17); (19, 1, 14, 15, 19); (19, 1, 16, 17, 19)|] </code></pre>
    singulars
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload