Note that there are some explanatory texts on larger screens.

plurals
  1. POIncreasing max_frame rate in amqp.js -- Hitting limits in buffer copy
    primarykey
    data
    text
    <p>I have a situation where I have about 50 listeners on 50 'direct' exchanges. The client and the server are in javascript (node.js) . It is using the node-amqp from postwait . Things work fairly well at low frequency of messages. Once the message frequency increases ~ 5000 messages per minute then there is a buffer copy error being shown in amqp.js From what I could trace the max_frame_size in amqp.js is fixed to <strong>131072</strong> . </p> <p>I just tried to double the value from 128k to 256k . But doing so causes the node.js to silently fail without starting up. There is no error message. I am assuming that I also have to change the corresponding value (max_frame) in the rabbit.config file.</p> <p>Do I have to do anything else to increase this value . Any other suggestions will also be appreciated. </p> <p>I have attached the minimal code to simulate the error . Run the commands below in 2 windows to simulate the error</p> <p><strong>node engine-so-client.js -c 200 -p 12000</strong></p> <p><strong>node server-so.js</strong> </p> <p>File <strong>server-so.js</strong></p> <pre><code>var util= require('util') var amqp = require('amqp'); var express = require ('express') function httpServer(exchange) { console.log("In httpServer start %s",exchange.name); var port; app = express.createServer(); app.get('/message/:routingKey/:message',function(req,res) { exchange.publish(req.params.routingKey,{'d' : req.params.message}); res.send('Published the message '+req.params.message+'\n'); }); app.get('/register/:socket/:routingKey',function(req,res) { var queue1 = conn.queue('', {autoDelete: false, durable: true, exclusive: true}, function() { console.log("Queue1 Callback"); queue1.subscribe( function(message) { console.log("subscribe Callback for "+req.params.routingKey + " " + message.d); }); console.log("Queue Callback Binding with "+req.params.routingKey); queue1.bind(exchange.name,req.params.routingKey); }); res.send('Started the socket at '+req.params.socket+'\n'); }); app.listen(3000); app.use(express.logger()); console.log('Started server on port %s', app.address().port); } function setup() { console.log("Setup"); var exchange = conn.exchange('cf2-demo', {'type': 'direct', durable: false}, function() { var queue = conn.queue('', {autoDelete: false, durable: true, exclusive: true}, function() { console.log("Queue Callback Binding with test key"); queue.bind(exchange.name,'testKey'); }); queue.on('queueBindOk', function() { httpServer(exchange); }); }); console.log("Completed setup %s", exchange.name); } var conn = amqp.createConnection({host:'localhost', login:'guest', password:'guest'}, {defaultExchangeName: "cf2-demo"}); conn.on('ready',setup); </code></pre> <p>File <strong>engine-so-client.js</strong></p> <pre><code>var program = require('commander.js'); var util = require('util'); var http = require('http'); program .version('0.0.1') .option('-h, --host &lt;host&gt;', 'Host running server', String,'localhost') .option('-p, --port &lt;port&gt;', 'Port to open to connect messages on',Number,12000) .option('-k, --key &lt;key&gt;,', 'Routing Key to be used',String,'key1') .option('-c, --count &lt;count&gt;','Iteration count',Number,50) .option('-m, --mesg &lt;mesg&gt;','Message prefix',String,'hello') .option('-t, --timeout', 'Timeout in ms between message posts') .parse(process.argv); function setup(host,port,key,mesg) { var client = http.createClient(3000, host); var request = client.request('GET','/register/'+port+"/"+key); request.end(); request.on('response', function(response) { response.on('data', function(chunk) { postMessage(host,port,key,mesg,1); }); }); } function postMessage(host,port,key,mesg,count) { var timeNow = new Date().getTime(); var mesgNow = mesg+"-"+count+"-"+port; console.log("Type: Sent Mesg, Message: %s, Time: %s",mesgNow,timeNow); var client1 = http.createClient(3000, host); var request1 = client1.request('GET','/message/'+key+"/"+mesgNow); request1.end(); count++; if (count &lt;100) { setTimeout( function() { postMessage(host,port,key,mesg,count); }, 1000 ); } } var port = program.port; var host = program.host; var key = program.key; var mesg = program.mesg; var count = program.count; var keys = ['key1','key2','key3','key4','key5']; var messages = ['hello','world','good','morning','bye']; var start=port; for (i=0; i&lt;count; i++) { var index = i%keys.length; var socket = start + i; setup(host,socket,keys[index],messages[index]); } </code></pre> <p>Error attached</p> <pre><code>buffer.js:494 throw new Error('sourceEnd out of bounds'); ^ Error: sourceEnd out of bounds at Buffer.copy (buffer.js:494:11) at frame (/home/hvram/programs/node_modules/node-amqp/amqp.js:170:10) at header (/home/hvram/programs/node_modules/node-amqp/amqp.js:160:14) at frameEnd (/home/hvram/programs/node_modules/node-amqp/amqp.js:205:16) at frame (/home/hvram/programs/node_modules/node-amqp/amqp.js:172:14) at header (/home/hvram/programs/node_modules/node-amqp/amqp.js:160:14) at frameEnd (/home/hvram/programs/node_modules/node-amqp/amqp.js:205:16) at frame (/home/hvram/programs/node_modules/node-amqp/amqp.js:172:14) at header (/home/hvram/programs/node_modules/node-amqp/amqp.js:160:14) at frameEnd (/home/hvram/programs/node_modules/node-amqp/amqp.js:205:16) </code></pre>
    singulars
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload