Note that there are some explanatory texts on larger screens.

plurals
  1. POUsing SignalR with Redis messagebus failover using BookSleeve's ConnectionUtils.Connect()
    primarykey
    data
    text
    <p>I am trying to create a Redis message bus failover scenario with a SignalR app.</p> <p>At first, we tried a simple hardware load-balancer failover, that simply monitored two Redis servers. The SignalR application pointed to the singular HLB endpoint. I then failed one server, but was unable to successfully get any messages through on the second Redis server without recycling the SignalR app pool. Presumably this is because it needs to issue the setup commands to the new Redis message bus.</p> <p>As of SignalR RC1, <code>Microsoft.AspNet.SignalR.Redis.RedisMessageBus</code> uses Booksleeve's <code>RedisConnection()</code> to connect to a single Redis for pub/sub.</p> <p>I created a new class, <code>RedisMessageBusCluster()</code> that uses Booksleeve's <code>ConnectionUtils.Connect()</code> to connect to one in a cluster of Redis servers.</p> <pre><code>using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using BookSleeve; using Microsoft.AspNet.SignalR.Infrastructure; namespace Microsoft.AspNet.SignalR.Redis { /// &lt;summary&gt; /// WIP: Getting scaleout for Redis working /// &lt;/summary&gt; public class RedisMessageBusCluster : ScaleoutMessageBus { private readonly int _db; private readonly string[] _keys; private RedisConnection _connection; private RedisSubscriberConnection _channel; private Task _connectTask; private readonly TaskQueue _publishQueue = new TaskQueue(); public RedisMessageBusCluster(string serverList, int db, IEnumerable&lt;string&gt; keys, IDependencyResolver resolver) : base(resolver) { _db = db; _keys = keys.ToArray(); // uses a list of connections _connection = ConnectionUtils.Connect(serverList); //_connection = new RedisConnection(host: server, port: port, password: password); _connection.Closed += OnConnectionClosed; _connection.Error += OnConnectionError; // Start the connection - TODO: can remove this Open as the connection is already opened, but there's the _connectTask is used later on _connectTask = _connection.Open().Then(() =&gt; { // Create a subscription channel in redis _channel = _connection.GetOpenSubscriberChannel(); // Subscribe to the registered connections _channel.Subscribe(_keys, OnMessage); // Dirty hack but it seems like subscribe returns before the actual // subscription is properly setup in some cases while (_channel.SubscriptionCount == 0) { Thread.Sleep(500); } }); } protected override Task Send(Message[] messages) { return _connectTask.Then(msgs =&gt; { var taskCompletionSource = new TaskCompletionSource&lt;object&gt;(); // Group messages by source (connection id) var messagesBySource = msgs.GroupBy(m =&gt; m.Source); SendImpl(messagesBySource.GetEnumerator(), taskCompletionSource); return taskCompletionSource.Task; }, messages); } private void SendImpl(IEnumerator&lt;IGrouping&lt;string, Message&gt;&gt; enumerator, TaskCompletionSource&lt;object&gt; taskCompletionSource) { if (!enumerator.MoveNext()) { taskCompletionSource.TrySetResult(null); } else { IGrouping&lt;string, Message&gt; group = enumerator.Current; // Get the channel index we're going to use for this message int index = Math.Abs(group.Key.GetHashCode()) % _keys.Length; string key = _keys[index]; // Increment the channel number _connection.Strings.Increment(_db, key) .Then((id, k) =&gt; { var message = new RedisMessage(id, group.ToArray()); return _connection.Publish(k, message.GetBytes()); }, key) .Then((enumer, tcs) =&gt; SendImpl(enumer, tcs), enumerator, taskCompletionSource) .ContinueWithNotComplete(taskCompletionSource); } } private void OnConnectionClosed(object sender, EventArgs e) { // Should we auto reconnect? if (true) { ; } } private void OnConnectionError(object sender, BookSleeve.ErrorEventArgs e) { // How do we bubble errors? if (true) { ; } } private void OnMessage(string key, byte[] data) { // The key is the stream id (channel) var message = RedisMessage.Deserialize(data); _publishQueue.Enqueue(() =&gt; OnReceived(key, (ulong)message.Id, message.Messages)); } protected override void Dispose(bool disposing) { if (disposing) { if (_channel != null) { _channel.Unsubscribe(_keys); _channel.Close(abort: true); } if (_connection != null) { _connection.Close(abort: true); } } base.Dispose(disposing); } } } </code></pre> <p>Booksleeve has its own mechanism for determining a master, and will automatically fail over to another server, and am now testing this with <code>SignalR.Chat</code>.</p> <p>In <code>web.config</code>, I set the list of available servers:</p> <pre><code>&lt;add key="redis.serverList" value="dbcache1.local:6379,dbcache2.local:6379"/&gt; </code></pre> <p>Then in <code>Application_Start()</code>:</p> <pre><code> // Redis cluster server list string redisServerlist = ConfigurationManager.AppSettings["redis.serverList"]; List&lt;string&gt; eventKeys = new List&lt;string&gt;(); eventKeys.Add("SignalR.Redis.FailoverTest"); GlobalHost.DependencyResolver.UseRedisCluster(redisServerlist, eventKeys); </code></pre> <p>I added two additional methods to <code>Microsoft.AspNet.SignalR.Redis.DependencyResolverExtensions</code>:</p> <pre><code>public static IDependencyResolver UseRedisCluster(this IDependencyResolver resolver, string serverList, IEnumerable&lt;string&gt; eventKeys) { return UseRedisCluster(resolver, serverList, db: 0, eventKeys: eventKeys); } public static IDependencyResolver UseRedisCluster(this IDependencyResolver resolver, string serverList, int db, IEnumerable&lt;string&gt; eventKeys) { var bus = new Lazy&lt;RedisMessageBusCluster&gt;(() =&gt; new RedisMessageBusCluster(serverList, db, eventKeys, resolver)); resolver.Register(typeof(IMessageBus), () =&gt; bus.Value); return resolver; } </code></pre> <p>Now the problem is that when I have several breakpoints enabled, until after a user name has been added, then disable all breakpoints, the application works as expected. However, with the breakpoints disabled from the beginning, there seems to be some race condition that may be failing during the connection process.</p> <p>Thus, in <code>RedisMessageCluster()</code>:</p> <pre><code> // Start the connection _connectTask = _connection.Open().Then(() =&gt; { // Create a subscription channel in redis _channel = _connection.GetOpenSubscriberChannel(); // Subscribe to the registered connections _channel.Subscribe(_keys, OnMessage); // Dirty hack but it seems like subscribe returns before the actual // subscription is properly setup in some cases while (_channel.SubscriptionCount == 0) { Thread.Sleep(500); } }); </code></pre> <p>I tried adding both a <code>Task.Wait</code>, and even an additional <code>Sleep()</code> (not shown above) - which were waiting/etc, but still getting errors.</p> <p>The recurring error seems to be in <code>Booksleeve.MessageQueue.cs</code> ~ln 71:</p> <pre><code>A first chance exception of type 'System.InvalidOperationException' occurred in BookSleeve.dll iisexpress.exe Error: 0 : SignalR exception thrown by Task: System.AggregateException: One or more errors occurred. ---&gt; System.InvalidOperationException: The queue is closed at BookSleeve.MessageQueue.Enqueue(RedisMessage item, Boolean highPri) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\MessageQueue.cs:line 71 at BookSleeve.RedisConnectionBase.EnqueueMessage(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 910 at BookSleeve.RedisConnectionBase.ExecuteInt64(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 826 at BookSleeve.RedisConnection.IncrementImpl(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 277 at BookSleeve.RedisConnection.BookSleeve.IStringCommands.Increment(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 270 at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.SendImpl(IEnumerator`1 enumerator, TaskCompletionSource`1 taskCompletionSource) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 90 at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.&lt;Send&gt;b__2(Message[] msgs) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 67 at Microsoft.AspNet.SignalR.TaskAsyncHelper.GenericDelegates`4.&lt;&gt;c__DisplayClass57.&lt;ThenWithArgs&gt;b__56() in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 893 at Microsoft.AspNet.SignalR.TaskAsyncHelper.TaskRunners`2.&lt;&gt;c__DisplayClass42.&lt;RunTask&gt;b__41(Task t) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 821 --- End of inner exception stack trace --- ---&gt; (Inner Exception #0) System.InvalidOperationException: The queue is closed at BookSleeve.MessageQueue.Enqueue(RedisMessage item, Boolean highPri) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\MessageQueue.cs:line 71 at BookSleeve.RedisConnectionBase.EnqueueMessage(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 910 at BookSleeve.RedisConnectionBase.ExecuteInt64(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 826 at BookSleeve.RedisConnection.IncrementImpl(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 277 at BookSleeve.RedisConnection.BookSleeve.IStringCommands.Increment(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 270 at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.SendImpl(IEnumerator`1 enumerator, TaskCompletionSource`1 taskCompletionSource) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 90 at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.&lt;Send&gt;b__2(Message[] msgs) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 67 at Microsoft.AspNet.SignalR.TaskAsyncHelper.GenericDelegates`4.&lt;&gt;c__DisplayClass57.&lt;ThenWithArgs&gt;b__56() in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 893 at Microsoft.AspNet.SignalR.TaskAsyncHelper.TaskRunners`2.&lt;&gt;c__DisplayClass42.&lt;RunTask&gt;b__41(Task t) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 821&lt;--- public void Enqueue(RedisMessage item, bool highPri) { lock (stdPriority) { if (closed) { throw new InvalidOperationException("The queue is closed"); } </code></pre> <p>Where a closed queue exception is being thrown.</p> <p>I foresee another issue: Since the Redis connection is made in <code>Application_Start()</code> there may be some issues in "reconnecting" to another server. However, I think this is valid when using the singular <code>RedisConnection()</code>, where there is only one connection to choose from. However, with the intorduction of <code>ConnectionUtils.Connect()</code> I'd like to hear from <code>@dfowler</code> or the other SignalR guys in how this scenario is handled in SignalR.</p>
    singulars
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload