Note that there are some explanatory texts on larger screens.

plurals
  1. POHow to consume a UDP stream of bytes using RX extensions in .net
    text
    copied!<p>I've come up with this solution. ( Not tested yet ) via a lot of bouncing around on the web.</p> <pre><code>Private Function ObserveUDP() As IObservable(Of bytes()) Dim f = Function(observer) Dim endpoint = New IPEndPoint(IPAddress.Parse(Me.IpAdress), Me.IPPort) Dim client = New UdpClient(endpoint) Dim obs = observable.*emphasized text*Generate(Of Task(Of UdpReceiveResult), UdpReceiveResult) _ ( Nothing _ , Function(task As Task(Of UdpReceiveResult)) task Is Nothing Or Not task.IsCompleted() _ , Function(task As Task(Of UdpReceiveResult)) client.ReceiveAsync() _ , Function(task As Task(Of UdpReceiveResult)) task.Result) Dim observable = obs.Select(Function(r) r.Buffer) dim handle = observable.Subscribe(observer) Dim df = Sub() client.Close() handle.Dispose() End Sub Return Disposable.Create(df) End Function Return observable.Create(f) End Function </code></pre> <p>My requirement was to make sure the UDP client is closed when the subscription is droppped. I'm pretty sure the above code is close but I think it's not quite right. Any input would be appreciated.</p> <p><strong>* EDIT *</strong></p> <p>Actually the above example is totally wrong and will just create a large number of task objects synchronously but not await them. After a bit of trial and error I've come up with the following generic function for unfolding an awaitable which is called over and over again. Any comments?</p> <pre><code>''' initializer - a function that initializes and returns the state object ''' generator - a function that asynchronously using await generates each value ''' finalizer - a function for cleaning up the state object when the sequence is unsubscribed Private Function ObservableAsyncSeq(Of T, I)( _ initializer As Func(Of I), _ generator As Func(Of I, Task(Of T)), _ finalizer As Action(Of I)) As IObservable(Of T) Dim q = Function(observer As IObserver(Of T)) Dim go = True Try Dim r = Async Sub() Dim ii As I = initializer() While go Dim result = Await generator(ii) observer.OnNext(result) End While finalizer(ii) observer.OnCompleted() End Sub Task.Run(r) Catch ex As Exception observer.OnError(ex) End Try ' Disposable for stopping the sequence as per ' the observable contract Return Sub() go = False End Function Return Observable.Create(q) End Function </code></pre> <p>And example of use with UDP</p> <pre><code>Private Function ObserveMeasurementPoints2() As IObservable(Of ProcessedDate) Dim initializer = Function() Dim endpoint = New IPEndPoint(IPAddress.Parse(Me.IpAdress), Me.IPPort) Return New UdpClient(endpoint) End Function Dim finalizer = Function(client As UdpClient) client.Close() End Function Dim generator = Function(client As UdpClient) As Task(Of UdpReceiveResult) Return client.ReceiveAsync() End Function Return ObservableAsyncSeq(initializer, generator, finalizer).Select(Function(r) ProcessBytes(r.Buffer)) End Function </code></pre>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload