Note that there are some explanatory texts on larger screens.

plurals
  1. POHuge amount of packet drops and latency faced in netty
    primarykey
    data
    text
    <p>I am using netty 3.5.11 with Jdk 1.7 on Ubuntu to receive a large amount of updates of stocks rates at a very high frequency. The message format being sent is JSON. The data is subscribed from topic on a redis server. There is a Subscriber for each symbol. The channel object is passed to multiple Subscribers and on receiving the data it is written to the client.</p> <p>Now the amount of data received is around 25,000 records in 2 minutes. Each record size is on an average around 500 bytes long.</p> <p>During test runs around 7500/8000 records were dropped because the channel was not writable. How do i avoid this. ?</p> <p>I also noticed that the latency increases systematically leading to updates being received after a long period. This happened when i I used Bufferedwritehandler to avoid packet drops.</p> <p>Here are the options that i set on bootstrap.</p> <pre><code>executionHandler = new ExecutionHandler( new OrderedMemoryAwareThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2, 1000000, 10000000, 100, TimeUnit.MILLISECONDS)); serverBootStrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new PortUnificationServerHandler(getConfiguration(), executionHandler)); } }); serverBootStrap.setOption("child.tcpNoDelay", true); serverBootStrap.setOption("tcpNoDelay", true); serverBootStrap.setOption("child.keepAlive", true); serverBootStrap.setOption("child.reuseAddress", true); //setting buffer size can improve I/O serverBootStrap.setOption("child.sendBufferSize", 16777216); serverBootStrap.setOption("receiveBufferSize", 16777216);//1048576); // better to have an receive buffer predictor serverBootStrap.setOption("receiveBufferSizePredictorFactory", new AdaptiveReceiveBufferSizePredictorFactory(1024, 1024 * 16, 16777216));//1048576)); //if the server is sending 1000 messages per sec, optimum write buffer water marks will //prevent unnecessary throttling, Check NioSocketChannelConfig doc serverBootStrap.setOption("backlog", 1000); serverBootStrap.setOption("sendBufferSize", 16777216);//1048576); serverBootStrap.setOption("writeBufferLowWaterMark", 1024 * 1024 * 25); serverBootStrap.setOption("writeBufferHighWaterMark", 1024 * 1024 * 50); </code></pre> <p>The pipeline and handlers class</p> <pre><code>public class PortUnificationServerHandler extends FrameDecoder </code></pre> <p>{</p> <pre><code>private AppConfiguration appConfiguration; private final ExecutionHandler executionHandler; public PortUnificationServerHandler(AppConfiguration pAppConfiguration, ExecutionHandler pExecutionHandler) { appConfiguration = pAppConfiguration; this.executionHandler = pExecutionHandler; } @Override protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { String lRequest = buffer.toString(CharsetUtil.UTF_8); if (ConnectionServiceHelper.isValidJSON(lRequest)) { ObjectMapper lObjectMapper = new ObjectMapper(); StringReader lStringReader = new StringReader(lRequest); JsonNode lNode = lObjectMapper.readTree(lStringReader); if (lNode.get(Constants.REQUEST_TYPE).asText().trim().equalsIgnoreCase(Constants.LOGIN_REQUEST)) { JsonNode lDataNode1 = lNode.get(Constants.REQUEST_DATA); LoginRequest lLogin = lObjectMapper.treeToValue(lDataNode1, LoginRequest.class); if (lLogin.getCompress() != null) { if (lLogin.getCompress().trim().equalsIgnoreCase(Constants.COMPRESS_FLAG_TRUE)) { enableJSON(ctx); enableGzip(ctx); ctx.getPipeline().remove(this); } else { enableJSON(ctx); ctx.getPipeline().remove(this); } } else { enableJSON(ctx); ctx.getPipeline().remove(this); } } } // Forward the current read buffer as is to the new handlers. return buffer.readBytes(buffer.readableBytes()); } private void enableJSON(ChannelHandlerContext ctx) { ChannelPipeline pipeline = ctx.getPipeline(); boolean lHandlerExists = pipeline.getContext("bufferedwriter") != null; if (!lHandlerExists) { pipeline.addFirst("bufferedwriter", new MyBufferedWriteHandler()); // 80960 } boolean lHandlerExists = pipeline.getContext("framer") != null; if (!lHandlerExists) { pipeline.addLast("framer", new DelimiterBasedFrameDecoder(65535, new ChannelBuffer[] { ChannelBuffers.wrappedBuffer( new byte[] { '\n' }) })); } lHandlerExists = pipeline.getContext("decoder") != null; if (!lHandlerExists) { pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); } lHandlerExists = pipeline.getContext("encoder") != null; if (!lHandlerExists) { pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); } lHandlerExists = pipeline.getContext("executor") != null; if (!lHandlerExists) { pipeline.addLast("executor", executionHandler); } lHandlerExists = pipeline.getContext("handler") != null; if (!lHandlerExists) { pipeline.addLast("handler", new ConnectionServiceUpStreamHandler(appConfiguration)); } lHandlerExists = pipeline.getContext("unite") != null; if (!lHandlerExists) { pipeline.addLast("unite", new PortUnificationServerHandler(appConfiguration, executionHandler)); } } private void enableGzip(ChannelHandlerContext ctx) { ChannelPipeline pipeline = ctx.getPipeline(); //pipeline.remove("decoder"); //pipeline.addLast("decoder", new MyStringDecoder(CharsetUtil.UTF_8, true)); //pipeline.addLast("compress", new CompressionHandler(80, "gzipdeflater")); boolean lHandlerExists = pipeline.getContext("encoder") != null; if (lHandlerExists) { pipeline.remove("encoder"); } lHandlerExists = pipeline.getContext("gzipdeflater") != null; if (!lHandlerExists) { pipeline.addBefore("executor", "gzipdeflater", new ZlibEncoder(ZlibWrapper.GZIP)); } lHandlerExists = pipeline.getContext("lengthprepender") != null; if (!lHandlerExists) { pipeline.addAfter("gzipdeflater", "lengthprepender", new LengthFieldPrepender(4)); } } </code></pre> <p>}</p> <p>The BufferedWriterHandler</p> <pre><code> public class MyBufferedWriteHandler extends BufferedWriteHandler </code></pre> <p>{</p> <pre><code>private final AtomicLong bufferSize = new AtomicLong(); final Logger logger = LoggerFactory.getLogger(getClass()); public MyBufferedWriteHandler() { // Enable consolidation by default. super(true); } @Override public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception { ChannelBuffer data = (ChannelBuffer) e.getMessage(); if (e.getChannel().isWritable()) { long newBufferSize = bufferSize.get(); // Flush the queue if it gets larger than 8KiB. if (newBufferSize &gt; 0) { flush(); bufferSize.set(0); } ctx.sendDownstream(e); } else { logger.warn( "Buffering data for : " + e.getChannel().getRemoteAddress() ); super.writeRequested(ctx, e); bufferSize.addAndGet(data.readableBytes()); } } @Override public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { if (e.getChannel().isWritable()) { flush(); } } </code></pre> <p>The function used in the Subscriber class to write data</p> <pre><code> public void writeToClient(Channel pClientChannel, String pMessage) throws IOException { String lMessage = pMessage; if (pClientChannel.isWritable()) { lMessage += Constants.RESPONSE_DELIMITER; pClientChannel.write(lMessage); } else { logger.warn(DroppedCounter++ + " droppped : " + pMessage); } } </code></pre> <p>I have implemented some of the suggestions that i read on stackoverflow and other sites. But i have not been successfull in resolving this issue.</p> <p>Kindly suggest or advice as to what am i missing ?</p> <p>Thanks</p>
    singulars
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload