Note that there are some explanatory texts on larger screens.

plurals
  1. PO
    primarykey
    data
    text
    <p>I have not used the UDPClient before, but it appears that you are using Tasks (Cardinality =1) to try and receive streams of data (Cardinality = many). It seems to work around this you have slapped a repeat on your query. This means that your query will do this</p> <ol> <li>Create a UDPClient </li> <li>Invoke the request for data </li> <li>Receive the first data it gets </li> <li>push the data on the sequence </li> <li>close the sequence</li> <li>Dispose the UDPClient </li> <li>Create a UDPClient (back to step 1)</li> <li>Invoke the request for data</li> <li>Receive the first data it gets</li> <li>....until you dispose the connection.</li> </ol> <p>I think you should just be able to read off the socket/network connection by pulling in a stream of bytes. I show you how to do this in my blog post:</p> <p><a href="http://introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#CreatingYourOwnIterator" rel="nofollow">http://introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#CreatingYourOwnIterator</a></p> <p>This way you will just hold one connection open and push bytes as you receive them.</p> <p>Having a quick google I also found that there were concerns about the reliability of the UDPClient implementation. <a href="http://www.codeproject.com/Articles/1938/Issues-with-UdpClient-Receive" rel="nofollow">http://www.codeproject.com/Articles/1938/Issues-with-UdpClient-Receive</a></p> <p>HTH</p> <p>Lee</p> <pre><code>using System; using System.IO; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Linq; namespace MyLib { public static class ObservableExtensions { //TODO: Could potentially upgrade to using tasks/Await-LC public static IObservable&lt;byte&gt; ToObservable( this Stream source, int buffersize, IScheduler scheduler) { var bytes = Observable.Create&lt;byte&gt;(o =&gt; { var initialState = new StreamReaderState(source, buffersize); var currentStateSubscription = new SerialDisposable(); Action&lt;StreamReaderState, Action&lt;StreamReaderState&gt;&gt; iterator = (state, self) =&gt; currentStateSubscription.Disposable = state.ReadNext() .Subscribe( bytesRead =&gt; { for (int i = 0; i &lt; bytesRead; i++) { o.OnNext(state.Buffer[i]); } if (bytesRead &gt; 0) self(state); else o.OnCompleted(); }, o.OnError); var scheduledWork = scheduler.Schedule(initialState, iterator); return new CompositeDisposable(currentStateSubscription, scheduledWork); }); return Observable.Using(() =&gt; source, _ =&gt; bytes); } private sealed class StreamReaderState { private readonly int _bufferSize; private readonly Func&lt;byte[], int, int, IObservable&lt;int&gt;&gt; _factory; public StreamReaderState(Stream source, int bufferSize) { _bufferSize = bufferSize; _factory = Observable.FromAsyncPattern&lt;byte[], int, int, int&gt;( source.BeginRead, source.EndRead); Buffer = new byte[bufferSize]; } public IObservable&lt;int&gt; ReadNext() { return _factory(Buffer, 0, _bufferSize); } public byte[] Buffer { get; set; } } } } </code></pre>
    singulars
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. VO
      singulars
      1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload