Note that there are some explanatory texts on larger screens.

plurals
  1. PO
    text
    copied!<p>Something along these lines could work. This is not tested, does not take into account exceptions and the case when a message is returned partially. But otherwise, I believe this is a right direction to go.</p> <pre><code> public static IObservable&lt;T&gt; GetSocketData&lt;T&gt;(this Socket socket, int sizeToRead, Func&lt;byte[], T&gt; valueExtractor) { return Observable.CreateWithDisposable&lt;T&gt;(observer =&gt; { var readSize = Observable .FromAsyncPattern&lt;byte[], int, int, SocketFlags, int&gt;( socket.BeginReceive, socket.EndReceive); var buffer = new byte[sizeToRead]; return readSize(buffer, 0, sizeToRead, SocketFlags.None) .Subscribe( x =&gt; observer.OnNext(valueExtractor(buffer)), observer.OnError, observer.OnCompleted); }); } public static IObservable&lt;int&gt; GetMessageSize(this Socket socket) { return socket.GetSocketData(4, buf =&gt; BitConverter.ToInt32(buf, 0)); } public static IObservable&lt;string&gt; GetMessageBody(this Socket socket, int messageSize) { return socket.GetSocketData(messageSize, buf =&gt; Encoding.UTF8.GetString(buf, 0, messageSize)); } public static IObservable&lt;string&gt; GetMessage(this Socket socket) { return from size in socket.GetMessageSize() from message in Observable.If(() =&gt; size != 0, socket.GetMessageBody(size), Observable.Return&lt;string&gt;(null)) select message; } public static IObservable&lt;string&gt; GetMessagesFromConnected( this Socket socket) { return socket .GetMessage() .Repeat() .TakeWhile(msg =&gt; !string.IsNullOrEmpty(msg)); } public static IObservable&lt;string&gt; GetMessages(this Socket socket, IPAddress addr, int port) { return Observable.Defer(() =&gt; { var whenConnect = Observable .FromAsyncPattern&lt;IPAddress, int&gt;( socket.BeginConnect, socket.EndConnect); return from _ in whenConnect(addr, port) from msg in socket.GetMessagesFromConnected() .Finally(socket.Close) select msg; }); } </code></pre> <p>Edit: To handle incomplete reads, Observable.While can be used (within GetSockedData) as proposed by Dave Sexton in the <a href="http://social.msdn.microsoft.com/Forums/en-US/rx/thread/5c62e690-2c8d-4f32-8ec4-5e9b5ea6d2a0" rel="noreferrer">same thread on RX forum</a>.</p> <p>Edit: Also, take a look at this Jeffrey Van Gogh's article: <a href="http://blogs.msdn.com/b/jeffva/archive/2010/08/30/rx-on-the-server-part-4-of-n-buffering-output-to-a-stream.aspx" rel="noreferrer">Asynchronous System.IO.Stream reading</a></p>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload