Note that there are some explanatory texts on larger screens.

plurals
  1. PO
    text
    copied!<p>Ok, so this is perhaps "cheating", but I suppose you could re-purpose my non-Rx answer and wrap it with <code>Observable.Create</code>. </p> <p>I'm fairly sure that returning the socket as the <code>IDisposable</code> is the wrong semantics, but not sure what would be.</p> <pre><code> static IObservable&lt;string&gt; GetMessages(Socket socket, IPAddress addr, int port) { return Observable.CreateWithDisposable&lt;string&gt;( o =&gt; { byte[] buffer = new byte[1024]; Action&lt;int, Action&lt;int&gt;&gt; readIntoBuffer = (length, callback) =&gt; { var totalRead = 0; AsyncCallback receiveCallback = null; AsyncCallback temp = r =&gt; { var read = socket.EndReceive(r); if (read == 0) { socket.Close(); o.OnCompleted(); return; } totalRead += read; if (totalRead &lt; length) { socket.BeginReceive(buffer, totalRead, length - totalRead, SocketFlags.None, receiveCallback, null); } else { callback(length); } }; receiveCallback = temp; socket.BeginReceive(buffer, totalRead, length, SocketFlags.None, receiveCallback, null); }; Action&lt;int&gt; sizeRead = null; Action&lt;int&gt; messageRead = x =&gt; { var message = Encoding.UTF8.GetString(buffer, 0, x); o.OnNext(message); readIntoBuffer(4, sizeRead); }; Action&lt;int&gt; temp2 = x =&gt; { var size = BitConverter.ToInt32(buffer, 0); readIntoBuffer(size, messageRead); }; sizeRead = temp2; AsyncCallback connectCallback = r =&gt; { socket.EndConnect(r); readIntoBuffer(4, sizeRead); }; socket.BeginConnect(addr, port, connectCallback, null); return socket; }); } </code></pre>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload