Note that there are some explanatory texts on larger screens.

plurals
  1. POBroadcasting message to all clients using Pika + sockjs-tornado
    primarykey
    data
    text
    <p>I'm new to realtime apps based on WebSockets, and stucked at one point. My app has following components:</p> <ol> <li>Simple Python script which is fired when some generic event occurs. It receives data and sends it to the queue (RabbitMQ) using pika.</li> <li>Tornado app (with sockjs-tornado) receiving message from queue (pika asynchronous client), processing its content, saving new app state to database and broadcastig data to clients (SockJS clients). Communication with clients is only in one direction - they just connect to server and receive data.</li> </ol> <p>The problem is that I can't figure out how to pass data received from queue to all clients. I've done a pub/sub exchange, so when user connects to server, new connection is established with RabbitMQ for every user, but that's not what I want. Below is what I've got as far.</p> <p><strong>common/pika_client.py</strong>: </p> <pre><code>import logging import pika from pika.adapters.tornado_connection import TornadoConnection class PikaClient(object): def __init__(self, exchange, host='localhost', port=5672, vhost=''): # Default values self.connected = False self.connecting = False self.connection = None self.channel = None self.host = host self.port = port self.vhost = vhost self.exchange = exchange # preparing logger self.log = logging.getLogger(__name__) self.set_log_level() def set_log_level(self, log_level=logging.WARNING): self.log.setLevel(log_level) def connect(self): self.log.info("CONNECTING") if self.connecting: self.log.info('%s: Already connecting to RabbitMQ' % self.__class__.__name__) return self.log.info('%s: Connecting to RabbitMQ on localhost:5672' % self.__class__.__name__) self.connecting = True param = pika.ConnectionParameters( host=self.host, port=self.port, virtual_host=self.vhost ) self.connection = TornadoConnection(param, on_open_callback=self.on_connected) self.connection.add_on_close_callback(self.on_closed) def on_connected(self, connection): self.log.info('%s: Connected to RabbitMQ on localhost:5672' % self.__class__.__name__) self.connected = True self.connection = connection self.connection.channel(self.on_channel_open) def on_channel_open(self, channel): self.log.info('%s: Channel Open, Declaring Exchange %s' % (self.__class__.__name__, self.exchange)) self.channel = channel self.channel.exchange_declare( exchange=self.exchange, type="fanout", callback=self.on_exchange_declared ) def on_exchange_declared(self, frame): self.log.info('%s: Exchange Declared, Declaring Queue' % (self.__class__.__name__)) self.channel.queue_declare(exclusive=True, callback=self.on_queue_declared) def on_queue_declared(self, frame): self.log.info('%s: Queue Declared, Binding Queue %s' % (self.__class__.__name__, frame.method.queue)) self.queue_name = frame.method.queue self.channel.queue_bind( exchange=self.exchange, queue=frame.method.queue, callback=self.on_queue_bound ) def on_queue_bound(self, frame): self.log.info('%s: Queue Bound. To receive messages implement \ on_queue_bound method' % self.__class__.__name__) def on_closed(self, connection): self.log.info('%s: Connection Closed' % self.__class__.__name__) self.connected = False self.connection = None self.connecting = False self.channel = None self.connection = self.connect() def add_message_handler(self, handler): self.message_handler = handler </code></pre> <p><strong>tracker.py</strong></p> <pre><code>from sockjs.tornado import SockJSConnection import settings from common.pika_client import PikaClient class QueueReceiver(PikaClient): """Receives messages from RabbitMQ """ def on_queue_bound(self, frame): self.log.info('Consuming on queue %s' % self.queue_name) self.channel.basic_consume(consumer_callback=self.message_handler, queue=self.queue_name ) class TrackerConnection(SockJSConnection): def on_open(self, info): self.queue = QueueReceiver('clt') self.queue.add_message_handler(self.on_queue_message) self.queue.set_log_level(settings.LOG_LEVEL) self.queue.connect() def on_queue_message(self, channel, method, header, body): self.send(body) self.queue.channel.basic_ack(delivery_tag=method.delivery_tag) </code></pre> <p>It works, but like I mentioned, I'd like to have only one connection to queue, receive messages, do some stuff and broadcast results to clients using <em>broadcast()</em> method. Thanks in advance for any help.</p>
    singulars
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload