Note that there are some explanatory texts on larger screens.

plurals
  1. PO
    primarykey
    data
    text
    <p>You could take a look at a Producer/Consumer pattern if you are unfortunate enough not to be using .net 4.</p> <p>Here is my code I have disassembled, my apologies for the mess but you should be able to use this by adding to a project and recompiling, then creating your process using the resulting dll.</p> <p>Enum for ChannelState:</p> <pre><code>public enum ChannelState { WaitingForSend, WaitingForReceive, Open } </code></pre> <p>Interfaces:</p> <pre><code>public interface IChannel&lt;TMessage&gt; { // Methods TMessage Receive(); void Send(TMessage message); // Properties bool CanReceive { get; } bool CanSend { get; } ChannelState State { get; } } using System; public interface IReceiver&lt;TMessage&gt; { // Events event EventHandler&lt;MessageReceivedEventArgs&lt;TMessage&gt;&gt; MessageReceived; // Methods void Activate(); void Deactivate(); // Properties bool IsActive { get; } } </code></pre> <p>Concrete classes:</p> <pre><code>using System.Collections.Generic; using System.Threading; using System; public class BufferedChannel&lt;TMessage&gt; : IChannel&lt;TMessage&gt; { // Fields private int _blockedReceivers; private int _blockedSenders; private Queue&lt;TMessage&gt; _buffer; private int _capacity; private EventWaitHandle _capacityAvailableEvent; private EventWaitHandle _messagesAvailableEvent; // Methods public BufferedChannel() { this._buffer = new Queue&lt;TMessage&gt;(); this._messagesAvailableEvent = new EventWaitHandle(false, EventResetMode.AutoReset); this._capacityAvailableEvent = new EventWaitHandle(true, EventResetMode.AutoReset); this._capacity = 50; } public BufferedChannel(int bufferSize) { this._buffer = new Queue&lt;TMessage&gt;(); this._messagesAvailableEvent = new EventWaitHandle(false, EventResetMode.AutoReset); this._capacityAvailableEvent = new EventWaitHandle(true, EventResetMode.AutoReset); this._capacity = 50; if (bufferSize &lt;= 0) { throw new ArgumentOutOfRangeException("bufferSize", bufferSize, ExceptionMessages.ChannelsBufferSizeMustBeGreaterThanZero); } this._capacity = bufferSize; } public TMessage Receive() { Interlocked.Increment(ref this._blockedReceivers); try { this._messagesAvailableEvent.WaitOne(); } catch { lock (this._buffer) { Interlocked.Decrement(ref this._blockedReceivers); } throw; } lock (this._buffer) { Interlocked.Decrement(ref this._blockedReceivers); this._capacityAvailableEvent.Set(); if ((this._buffer.Count - 1) &gt; this._blockedReceivers) { this._messagesAvailableEvent.Set(); } return this._buffer.Dequeue(); } } public void Send(TMessage message) { Interlocked.Increment(ref this._blockedSenders); try { this._capacityAvailableEvent.WaitOne(); } catch { lock (this._buffer) { Interlocked.Decrement(ref this._blockedSenders); } throw; } lock (this._buffer) { Interlocked.Decrement(ref this._blockedSenders); this._buffer.Enqueue(message); if (this._buffer.Count &lt; this.BufferSize) { this._capacityAvailableEvent.Set(); } this._messagesAvailableEvent.Set(); } } // Properties public int BufferCount { get { lock (this._buffer) { return this._buffer.Count; } } } public int BufferSize { get { lock (this._buffer) { return this._capacity; } } set { lock (this._buffer) { if (value &lt;= 0) { throw new ArgumentOutOfRangeException("BufferSize", value, ExceptionMessages.ChannelsBufferSizeMustBeGreaterThanZero); } this._capacity = value; if ((this._blockedSenders &gt; 0) &amp;&amp; (this._capacity &gt; this._buffer.Count)) { this._capacityAvailableEvent.Set(); } } } } public bool CanReceive { get { return true; } } public bool CanSend { get { return true; } } public ChannelState State { get { if (this._blockedSenders &gt; 0) { return ChannelState.WaitingForReceive; } if (this._blockedReceivers &gt; 0) { return ChannelState.WaitingForSend; } return ChannelState.Open; } } } using System; using System.Collections.Generic; using System.Threading; using System.ComponentModel; using System.Runtime.CompilerServices; public sealed class Receiver&lt;TMessage&gt; : Component, IReceiver&lt;TMessage&gt; { // Fields private volatile bool _continue; private object _controlLock; private volatile bool _disposed; private Thread _receiverThread; private bool _receiving; private object _receivingLock; private object _threadLock; [CompilerGenerated] private IChannel&lt;TMessage&gt; channel; // Events public event EventHandler&lt;MessageReceivedEventArgs&lt;TMessage&gt;&gt; MessageReceived; // Methods public Receiver(IChannel&lt;TMessage&gt; channel) { this._controlLock = new object(); this._threadLock = new object(); this._receivingLock = new object(); if (channel == null) { throw new ArgumentNullException("channel"); } this.Channel = channel; } public void Activate() { this.CheckDisposed(); lock (this._controlLock) { if (this._receiverThread != null) { throw new InvalidOperationException(); } this._continue = true; this._receiverThread = new Thread(new ThreadStart(this.RunAsync)); this._receiverThread.IsBackground = true; this._receiverThread.Start(); } } private void CheckDisposed() { if (this._disposed) { throw new ObjectDisposedException(base.GetType().Name); } } public void Deactivate() { lock (this._controlLock) { if (this._continue) { this._continue = false; lock (this._threadLock) { if (this._receiverThread != null) { this.SafeInterrupt(); this._receiverThread.Join(); this._receiverThread = null; } } } } } protected override void Dispose(bool disposing) { base.Dispose(disposing); if (disposing) { this.Deactivate(); this._disposed = true; } } private void OnMessageReceived(TMessage message) { EventHandler&lt;MessageReceivedEventArgs&lt;TMessage&gt;&gt; messageReceived = this.MessageReceived; if (messageReceived != null) { messageReceived(this, new MessageReceivedEventArgs&lt;TMessage&gt;(message)); } } private void RunAsync() { while (this._continue) { TMessage message = default(TMessage); bool flag = false; try { lock (this._receivingLock) { this._receiving = true; } message = this.Channel.Receive(); flag = true; lock (this._receivingLock) { this._receiving = false; } Thread.Sleep(0); } catch (ThreadInterruptedException) { } if (!this._continue) { if (flag) { this.Channel.Send(message); return; } break; } this.OnMessageReceived(message); } } private void SafeInterrupt() { lock (this._receivingLock) { lock (this._threadLock) { if (this._receiving &amp;&amp; (this._receiverThread != null)) { this._receiverThread.Interrupt(); } } } } // Properties protected override bool CanRaiseEvents { get { return true; } } public IChannel&lt;TMessage&gt; Channel { [CompilerGenerated] get { return this.channel; } [CompilerGenerated] private set { this.channel = value; } } public bool IsActive { get { lock (this._controlLock) { return (this._receiverThread != null); } } } } using System; using System.Runtime.CompilerServices; public class MessageReceivedEventArgs&lt;TMessage&gt; : EventArgs { // Fields [CompilerGenerated] private TMessage message; // Methods public MessageReceivedEventArgs(TMessage message) { this.Message = message; } // Properties public TMessage Message { [CompilerGenerated] get { return this.message; } [CompilerGenerated] private set { this.message = value; } } } using System.Threading; public class BlockingChannel&lt;TMessage&gt; : IChannel&lt;TMessage&gt; { // Fields private TMessage _message; private EventWaitHandle _messageReceiveEvent; private EventWaitHandle _messageReceiveyEvent; private object _sendLock; private ChannelState _state; private object _stateLock; // Methods public BlockingChannel() { this._state = ChannelState.Open; this._stateLock = new object(); this._messageReceiveyEvent = new EventWaitHandle(false, EventResetMode.AutoReset); this._messageReceiveEvent = new EventWaitHandle(false, EventResetMode.AutoReset); this._sendLock = new object(); } public TMessage Receive() { this.State = ChannelState.WaitingForSend; this._messageReceiveyEvent.WaitOne(); this._messageReceiveEvent.Set(); this.State = ChannelState.Open; return this._message; } public void Send(TMessage message) { lock (this._sendLock) { this._message = message; this.State = ChannelState.WaitingForReceive; this._messageReceiveyEvent.Set(); this._messageReceiveEvent.WaitOne(); } } // Properties public bool CanReceive { get { return true; } } public bool CanSend { get { return true; } } public ChannelState State { get { lock (this._stateLock) { return this._state; } } private set { lock (this._stateLock) { this._state = value; } } } } </code></pre>
    singulars
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. VO
      singulars
      1. This table or related slice is empty.
    2. VO
      singulars
      1. This table or related slice is empty.
    3. VO
      singulars
      1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload