Note that there are some explanatory texts on larger screens.

plurals
  1. PO.net Rx: in-order batch-processing of messages
    primarykey
    data
    text
    <p>I am attempting to implement an asynchronous workflow using Rx and I seem to be doing it completely wrong. </p> <p>What I would like to do is this: </p> <pre><code>From an undefined asynchronous stream of un-parsed message strings (i.e. an IObservable&lt;string&gt;) parse the message strings asynchronously, but preserve their order. (IObservable&lt;Message&gt;) Batch up parsed Messages in groups of 100 or so (IObservable&lt;IEnumerable&lt;Message&gt;&gt;) Send each batch, when complete, to the UI thread to be processed. Batches must arrive in the same order they were started. </code></pre> <p>I can't seem to get the order-preservation, and also Rx doesn't appear to be doing things asynchronously when I expected them to. </p> <p>I made an attempt at order preservation by using an IEnumerable instead of an IObservable, and then calling the .AsParallel().AsOrdered() operators on it. Here is the code. See notes below for the issues I'm having:</p> <pre><code> private IObservable&lt;IEnumerable&lt;Message&gt;&gt; messageSource; public IObservable&lt;IEnumerable&lt;Message&gt;&gt; MessageSource { get { return messageSource; } } /// &lt;summary&gt; /// Sub-classes of MessageProviderBase provide this IEnumerable to /// generate unparsed message strings synchronously /// &lt;/summary&gt; protected abstract IEnumerable&lt;string&gt; UnparsedMessages { get; } public MessageProviderBase() { // individual parsed messages as a PLINQ query var parsedMessages = from unparsedMessage in UnparsedMessages.AsParallel().AsOrdered() select ParseMessage(unparsedMessage); // convert the above PLINQ query to an observable, buffering up to 100 messages at a time var batchedMessages = parsedMessages.ToObservable().BufferWithTimeOrCount(TimeSpan.FromMilliseconds(200), 100); // ISSUE #1: // batchedMessages seems to call OnNext before all of the messages in its buffer are parsed. // If you convert the IObservable&lt;Message&gt; it generates to an enumerable, it blocks // when you try to enumerate it. // Convert each batch to an IEnumerable // ISSUE #2: Even if the following Rx query were to run asynchronously (it doesn't now, see the above comment), // it could still deliver messages out of order. Only, instead of delivering individual // messages out of order, the message batches themselves could arrive out of order. messageSource = from messageBatch in batchedMessages select messageBatch.ToEnumerable().ToList(); } </code></pre>
    singulars
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload