Note that there are some explanatory texts on larger screens.

plurals
  1. POHow to queue IObservable subjects, with cancellation support, for slow consumers using Reactive Extensions (Rx)
    text
    copied!<p>First some background, I've written an open source .NET library, named <a href="https://github.com/slashdotdash/Duplicity/" rel="nofollow">Duplicitiy</a> (on github.com), that uses the <code>FileSystemWatcher</code> to duplicate all file changes between two directories. </p> <p>I have written a <a href="https://github.com/slashdotdash/Duplicity/blob/master/src/Duplicity/FileSystemObservable.cs" rel="nofollow"><code>FileSystemObservable</code></a> class that implements <code>IObservable&lt;FileSystemChange&gt;</code> (which uses <a href="https://github.com/acken/FSWatcher" rel="nofollow">FSWatcher</a> to wrap the actual <code>FileSystemWatcher</code>). When files or directories are created, modified or deleted the changes are published via a <code>Subject&lt;FileSystemChange&gt;</code> using Reactive Extensions.</p> <p>I then subscribe to this observable using the following subscription.</p> <pre><code> return observable .Buffer(() =&gt; observable.Throttle(TimeSpan.FromSeconds(2)).Timeout(TimeSpan.FromMinutes(1))) .PrioritizeFileSystemChanges() .SelectMany(x =&gt; x); </code></pre> <p>Changes are buffered until there is at least a 2 second period without any change for a maximum of 1 minute. This is due to the fact that when deleting a directory, the <code>FileSystemWatcher</code> notifies for all contained files and directories. We can optimise the behaviour by swallowing the changes contained within the directory and simply delete the parent in our subscriber. This is handled by the <code>PrioritizeFileSystemChanges</code> filter. It also allows us to ignore files that are created and subsequently deleted within the buffer window, again reducing IO operations on the target.</p> <p>This works, albeit in a naive manner at the moment with no support for failure/retries. </p> <p>However my question is that the subscriber to this observable could likely take a reasonable amount of time to process each change. For example, copying a large file to a slow file system. When a new file system change occurs for the same file that is currently being copied how can I handle aborting the in progress operation. Or, if the file is included in the buffered list but is outstanding, how can it be removed or excluded?</p> <p>I assume that there would need to be another subscription to the original observable, but am unsure how best to share state, or modify pending tasks? The changes must be processed in the order they were received, that indicates a queue. However, a new file system change may apply to a queued operation that needs to be cancelled or removed. Queues aren't designed for out-of-order removal.</p> <p>For example if we're currently copying file <code>Foo\Bar.txt</code> and the <code>Foo</code> directory is deleted. Then any in progress or pending changes for the directory, and all subdirectories, must be cancelled. Could this be a use case for the Task Parallel Library or is there some Reactive approach that I could take? </p> <p>Any github pull requests would be kindly received too!</p>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload