Note that there are some explanatory texts on larger screens.

plurals
  1. PO
    text
    copied!<p>UPDATE: <a href="https://docs.djangoproject.com/en/dev/releases/1.5/#explicit-support-for-streaming-responses" rel="nofollow">As of Django 1.5</a>, you'll need to return a StreamingHttpResponse instance if you want to lazily stream things out as I'm doing in this question/answer.</p> <p>ORIGINAL ANSWER BELOW</p> <p>After a lot of banging on things and reading framework code, I've found what I think is the right answer to this question.</p> <ol> <li>According to the <a href="http://www.python.org/dev/peps/pep-0333/" rel="nofollow">WSGI PEP</a>, if your application returns an iterator with a close() method, it should be called by the WSGI server once the response has finished. Django supports this too. That's a natural place to do the Redis connection cleanup that I need.</li> <li>There's <a href="http://bugs.python.org/issue16220" rel="nofollow">a bug</a> in Python's wsgiref implementation, and by extension in Django's 'runserver', that causes close() to be skipped if the client disconnects from the server mid-stream. I've submitted a patch.</li> <li>Even if the server honors close(), it won't be called until a write to the client actually fails. If your iterator is blocked waiting on the pubsub and not sending anything, close() won't be called. I've worked around this by sending a no-op message into the pubsub each time a client connects. That way when a browser does a normal reconnect, the now-defunct threads will try to write to their closed connections, throw an exception, then get cleaned up when the server calls close(). The <a href="http://dev.w3.org/html5/eventsource/#parsing-an-event-stream" rel="nofollow">SSE spec</a> says that any line beginning with a colon is a comment that should be ignored, so I'm just sending ":\n" as my no-op message to flush out stale clients.</li> </ol> <p>Here's the new code. First the Django view:</p> <pre><code>def event_stream(request): """ Stream worker events out to browser. """ return events.SSEResponse( settings.EVENTS_PUBSUB_URL, channels=[settings.EVENTS_PUBSUB_CHANNEL], buffer_key=settings.EVENTS_BUFFER_KEY, last_event_id=request.META.get('HTTP_LAST_EVENT_ID') ) </code></pre> <p>And the Listener class that does the work, along with a helper function to format the SSEs and an HTTPResponse subclass that lets the view be a little cleaner:</p> <pre><code>class Listener(object): def __init__(self, rcon_or_url=settings.EVENTS_PUBSUB_URL, channels=None, buffer_key=settings.EVENTS_BUFFER_KEY, last_event_id=None): if isinstance(rcon_or_url, redis.StrictRedis): self.rcon = rcon_or_url elif isinstance(rcon_or_url, basestring): self.rcon = redis.StrictRedis(**utils.parse_redis_url(rcon_or_url)) if channels is None: channels = [settings.EVENTS_PUBSUB_CHANNEL] self.channels = channels self.buffer_key = buffer_key self.last_event_id = last_event_id self.pubsub = self.rcon.pubsub() self.pubsub.subscribe(channels) # Send a superfluous message down the pubsub to flush out stale # connections. for channel in self.channels: # Use buffer_key=None since these pings never need to be remembered # and replayed. sender = Sender(self.rcon, channel, None) sender.publish('_flush', tags=['hidden']) def __iter__(self): # If we've been initted with a buffer key, then get all the events off # that and spew them out before blocking on the pubsub. if self.buffer_key: buffered_events = self.rcon.lrange(self.buffer_key, 0, -1) # check whether msg with last_event_id is still in buffer. If so, # trim buffered_events to have only newer messages. if self.last_event_id: # Note that we're looping through most recent messages first, # here counter = 0 for msg in buffered_events: if (json.loads(msg)['id'] == self.last_event_id): break counter += 1 buffered_events = buffered_events[:counter] for msg in reversed(list(buffered_events)): # Stream out oldest messages first yield to_sse({'data': msg}) for msg in self.pubsub.listen(): if msg['type'] == 'message': yield to_sse(msg) def close(self): self.pubsub.close() self.rcon.connection_pool.disconnect() class SSEResponse(HttpResponse): def __init__(self, rcon_or_url, channels, buffer_key=None, last_event_id=None, *args, **kwargs): self.listener = Listener(rcon_or_url, channels, buffer_key, last_event_id) super(SSEResponse, self).__init__(self.listener, mimetype='text/event-stream', *args, **kwargs) def close(self): """ This will be called by the WSGI server at the end of the request, even if the client disconnects midstream. Unless you're using Django's runserver, in which case you should expect to see Redis connections build up until http://bugs.python.org/issue16220 is fixed. """ self.listener.close() def to_sse(msg): """ Given a Redis pubsub message that was published by a Sender (ie, has a JSON body with time, message, title, tags, and id), return a properly-formatted SSE string. """ data = json.loads(msg['data']) # According to the SSE spec, lines beginning with a colon should be # ignored. We can use that as a way to force zombie listeners to try # pushing something down the socket and clean up their redis connections # when they get an error. # See http://dev.w3.org/html5/eventsource/#event-stream-interpretation if data['message'] == '_flush': return ":\n" # Administering colonic! if 'id' in data: out = "id: " + data['id'] + '\n' else: out = '' if 'name' in data: out += 'name: ' + data['name'] + '\n' payload = json.dumps({ 'time': data['time'], 'message': data['message'], 'tags': data['tags'], 'title': data['title'], }) out += 'data: ' + payload + '\n\n' return out </code></pre>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload