Note that there are some explanatory texts on larger screens.

plurals
  1. PODjango: Cleaning up redis connection after client disconnects from stream
    text
    copied!<p>I've implemented a <a href="http://www.html5rocks.com/en/tutorials/eventsource/basics/" rel="nofollow">Server Sent Event</a> API in my Django app to stream realtime updates from my backend to the browser. The backend is a Redis pubsub. My Django view looks like this:</p> <pre><code>def event_stream(request): """ Stream worker events out to browser. """ listener = events.Listener( settings.EVENTS_PUBSUB_URL, channels=[settings.EVENTS_PUBSUB_CHANNEL], buffer_key=settings.EVENTS_BUFFER_KEY, last_event_id=request.META.get('HTTP_LAST_EVENT_ID') ) return http.HttpResponse(listener, mimetype='text/event-stream') </code></pre> <p>And the events.Listener class that I'm returning as an iterator looks like this:</p> <pre><code>class Listener(object): def __init__(self, rcon_or_url, channels, buffer_key=None, last_event_id=None): if isinstance(rcon_or_url, redis.StrictRedis): self.rcon = rcon_or_url elif isinstance(rcon_or_url, basestring): self.rcon = redis.StrictRedis(**utils.parse_redis_url(rcon_or_url)) self.channels = channels self.buffer_key = buffer_key self.last_event_id = last_event_id self.pubsub = self.rcon.pubsub() self.pubsub.subscribe(channels) def __iter__(self): # If we've been initted with a buffer key, then get all the events off # that and spew them out before blocking on the pubsub. if self.buffer_key: buffered_events = self.rcon.lrange(self.buffer_key, 0, -1) # check whether msg with last_event_id is still in buffer. If so, # trim buffered_events to have only newer messages. if self.last_event_id: # Note that we're looping through most recent messages first, # here counter = 0 for msg in buffered_events: if (json.loads(msg)['id'] == self.last_event_id): break counter += 1 buffered_events = buffered_events[:counter] for msg in reversed(list(buffered_events)): # Stream out oldest messages first yield to_sse({'data': msg}) try: for msg in self.pubsub.listen(): if msg['type'] == 'message': yield to_sse(msg) finally: logging.info('Closing pubsub') self.pubsub.close() self.rcon.connection_pool.disconnect() </code></pre> <p>I'm able to successfully stream events out to the browser with this setup. However, it seems that the disconnect calls in the listener's "finally" don't ever actually get called. I assume that they're still camped out waiting for messages to come from the pubsub. As clients disconnect and reconnect, I can see the number of connections to my Redis instance climbing and never going down. Once it gets to around 1000, Redis starts freaking out and consuming all the available CPU.</p> <p>I would like to be able to detect when the client is no longer listening and close the Redis connection(s) at that time.</p> <p>Things I've tried or thought about:</p> <ol> <li>A connection pool. But as the <a href="https://github.com/andymccurdy/redis-py" rel="nofollow">redis-py</a> README states, "It is not safe to pass PubSub or Pipeline objects between threads."</li> <li>A middleware to handle the connections, or maybe just disconnections. This won't work because a middleware's process_response() method gets called too early (before http headers are even sent to the client). I need something called when the client disconnects while I'm in the middle of streaming content to them.</li> <li>The <a href="https://docs.djangoproject.com/en/dev/ref/signals/#request-finished" rel="nofollow">request_finished</a> and <a href="https://docs.djangoproject.com/en/dev/ref/signals/#got-request-exception" rel="nofollow">got_request_exception</a> signals. The first, like process_response() in a middleware, seems to fire too soon. The second doesn't get called when a client disconnects mid-stream.</li> </ol> <p>Final wrinkle: In production I'm using Gevent so I can get away with keeping a lot of connections open at once. However, this connection leak issue occurs whether I'm using plain old 'manage.py runserver', or Gevent monkeypatched runserver, or Gunicorn's gevent workers.</p>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload