Note that there are some explanatory texts on larger screens.

plurals
  1. POUsing Reactive Extensions (Rx) for socket programming practical?
    text
    copied!<p>What is the most succint way of writing the <code>GetMessages</code> function with Rx:</p> <pre><code>static void Main() { Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); var messages = GetMessages(socket, IPAddress.Loopback, 4000); messages.Subscribe(x =&gt; Console.WriteLine(x)); Console.ReadKey(); } static IObservable&lt;string&gt; GetMessages(Socket socket, IPAddress addr, int port) { var whenConnect = Observable.FromAsyncPattern&lt;IPAddress, int&gt;(socket.BeginConnect, socket.EndConnect)(addr, port); // now will receive a stream of messages // each message is prefixed with an 4 bytes/Int32 indicating it's length. // the rest of the message is a string // ????????????? Now What ????????????? } </code></pre> <p>A simple server as a driver for the above sample: <a href="http://gist.github.com/452893#file_program.cs" rel="noreferrer">http://gist.github.com/452893#file_program.cs</a></p> <h3>On Using Rx For Socket Programming</h3> <p>I've been investigating using <a href="http://msdn.microsoft.com/en-us/devlabs/ee794896.aspx" rel="noreferrer">Reactive Extensions</a> for some socket programming work I am doing. My motivation for doing so would be that it would somehow make the code "simpler". Whether this would mean less code, less nesting something along those lines. </p> <p>However so far that does not seem to be the case:</p> <ol> <li>I haven't found very many examples of using Rx with sockets</li> <li>The <a href="http://social.msdn.microsoft.com/Forums/en/rx/thread/ad211e98-19e3-4d69-a0f3-ac7acaea8dc0" rel="noreferrer">example</a><strike>s</strike> I have found don't seem less complicated then my existing BeginXXXX, EndXXXX code</li> <li>Although <code>Observable</code> has extension methods for <a href="http://rxwiki.wikidot.com/101samples#toc9" rel="noreferrer">FromAsyncPattern</a>, this does not cover the <code>SocketEventArgs</code> Async API.</li> </ol> <h3>Current Non-Working Solution</h3> <p>Here is what I have so far. This doesn't work, it fails with a stack overflow (heh) I haven't figured out the semantics so that I can create an <code>IObservable</code> that will read a specified number of bytes.</p> <pre><code> static IObservable&lt;int&gt; GetMessages(Socket socket, IPAddress addr, int port) { var whenConnect = Observable.FromAsyncPattern&lt;IPAddress, int&gt;(socket.BeginConnect, socket.EndConnect)(addr, port); // keep reading until we get the first 4 bytes byte[] buffer = new byte[1024]; var readAsync = Observable.FromAsyncPattern&lt;byte[], int, int, SocketFlags, int&gt;(socket.BeginReceive, socket.EndReceive); IObservable&lt;int&gt; readBytes = null; var temp = from totalRead in Observable.Defer(() =&gt; readBytes) where totalRead &lt; 4 select readAsync(buffer, totalRead, totalRead - 4, SocketFlags.None); readBytes = temp.SelectMany(x =&gt; x).Sum(); var nowDoSomethingElse = readBytes.SkipUntil(whenConnect); } </code></pre>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload