Note that there are some explanatory texts on larger screens.

plurals
  1. POJava NIO Framework stops working under heavy load with no write
    text
    copied!<p>The problem is fairly odd to me although I'm a newbie. </p> <p>What's going on is that if you force the server under heavy load of connections and keep sending an invalid packet that doesn't represent POLICY_XML packet.</p> <p>Pretty much what I'm trying to say is that if you connect it goes into socket READ OPERATION. Then you never go into send() which changes SelectionKey into WRITE OPERATION. Somehow the read operations stack up and after 2000 or so connection requests the server will stop accepting connections, no matter what. I've tried to connect with telnet and always fail to make a connection.. But after around 5 minutes it starts accepting connections again and becomes fully functional.</p> <p>Very strange problem but note that if I remove the packet matching statement it will act similarly to an echo server. Then it will run endlessly without experiencing any connection accepting issues, pretty much turns stable.</p> <p>I've attached the whole server source code below. Can someone who has extensive knowledge with NIO please check it out and let me know if there is a way to fix it.</p> <p>All that really catches my eye is the selector wakeup in send() which may fix everything after putting the line below into the read() it seems to do absolutely nothing and the problem remains.</p> <pre><code>// Finally, wake up our selecting thread so it can make the required changes this.selector.wakeup(); </code></pre> <p>Here is the source for the simple server.</p> <pre><code>import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.nio.charset.CodingErrorAction; import java.nio.channels.CancelledKeyException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.channels.spi.SelectorProvider; import java.util.*; public class PolicyServer implements Runnable { public static final String POLICY_REQUEST = "&lt;policy-file-request/&gt;"; public static final String POLICY_XML = "&lt;?xml version=\"1.0\"?&gt;" + "&lt;cross-domain-policy&gt;" + "&lt;allow-access-from domain=\"*\" to-ports=\"*\" /&gt;" + "&lt;/cross-domain-policy&gt;" + (char)0; // The host:port combination to listen on private InetAddress hostAddress; private int port; // The channel on which we'll accept connections private ServerSocketChannel serverChannel; // The selector we'll be monitoring private Selector selector; // The buffer into which we'll read data when it's available private ByteBuffer readBuffer = ByteBuffer.allocate(255); // This decodes raw bytes into ascii data. private CharsetDecoder asciiDecoder; // A list of PendingChange instances private List&lt;ChangeRequest&gt; pendingChanges = new LinkedList&lt;ChangeRequest&gt;(); // Maps a SocketChannel to a list of ByteBuffer instances private Map&lt;SocketChannel, List&lt;ByteBuffer&gt;&gt; pendingData = new HashMap&lt;SocketChannel, List&lt;ByteBuffer&gt;&gt;(); public PolicyServer(InetAddress hostAddress, int port) throws IOException { this.hostAddress = hostAddress; this.port = port; this.selector = this.initSelector(); this.asciiDecoder = Charset.forName("US-ASCII").newDecoder().onMalformedInput( CodingErrorAction.REPLACE).onUnmappableCharacter( CodingErrorAction.REPLACE); } public void send(SocketChannel socket, byte[] data) { synchronized (this.pendingChanges) { // Indicate we want the interest ops set changed this.pendingChanges.add(new ChangeRequest(socket, ChangeRequest.CHANGEOPS, SelectionKey.OP_WRITE)); // And queue the data we want written synchronized (this.pendingData) { List&lt;ByteBuffer&gt; queue = (List&lt;ByteBuffer&gt;) this.pendingData.get(socket); if (queue == null) { queue = new ArrayList&lt;ByteBuffer&gt;(); this.pendingData.put(socket, queue); } queue.add(ByteBuffer.wrap(data)); } } // Finally, wake up our selecting thread so it can make the required changes this.selector.wakeup(); } public void run() { while (true) { try { // Process any pending changes synchronized (this.pendingChanges) { Iterator changes = this.pendingChanges.iterator(); while (changes.hasNext()) { ChangeRequest change = (ChangeRequest) changes.next(); changes.remove(); if(change == null) continue; switch (change.type) { case ChangeRequest.CHANGEOPS: SelectionKey key = change.socket.keyFor(this.selector); try { if(key!=null) key.interestOps(change.ops); } catch(Exception ex) { if (key!=null) key.cancel(); } } } this.pendingChanges.clear(); } // Wait for an event one of the registered channels this.selector.select(); // Iterate over the set of keys for which events are available Iterator selectedKeys = this.selector.selectedKeys().iterator(); while (selectedKeys.hasNext()) { SelectionKey key = (SelectionKey) selectedKeys.next(); selectedKeys.remove(); if (!key.isValid()) { key.cancel(); continue; } // Check what event is available and deal with it try { if (key.isAcceptable()) { this.accept(key); } else if (key.isReadable()) { this.read(key); } else if (key.isWritable()) { this.write(key); } } catch(IOException io) { this.pendingData.remove(key.channel()); try { ((SocketChannel)key.channel()).socket().close(); } catch (IOException e) {} key.channel().close(); key.cancel(); key.attach(null); key = null; } } } catch (Exception e) { e.printStackTrace(); } } } private void accept(SelectionKey key) throws IOException { // For an accept to be pending the channel must be a server socket channel. ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); // Accept the connection and make it non-blocking SocketChannel socketChannel = serverSocketChannel.accept(); Socket socket = socketChannel.socket(); socketChannel.configureBlocking(false); // Register the new SocketChannel with our Selector, indicating // we'd like to be notified when there's data waiting to be read // also contains a attachment of a new StringBuffer (for storing imcomplete/multi packets) socketChannel.register(this.selector, SelectionKey.OP_READ, new StringBuffer()); } private void read(SelectionKey key) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); // Clear out our read buffer so it's ready for new data this.readBuffer.clear(); // Attempt to read off the channel int numRead = socketChannel.read(this.readBuffer); if (numRead == -1) { // Remote entity shut the socket down cleanly. Do the // same from our end and cancel the channel. throw new IOException(""); } // Grab the StringBuffer we stored as the attachment StringBuffer sb = (StringBuffer)key.attachment(); // Flips the readBuffer by setting the current position of // packet stream to beginning. // Append the data to the attachment StringBuffer this.readBuffer.flip(); sb.append(this.asciiDecoder.decode(this.readBuffer).toString()); this.readBuffer.clear(); // Get the policy request as complete packet if(sb.indexOf("\0") != -1) { String packets = new String(sb.substring(0, sb.lastIndexOf("\0")+1)); sb.delete(0, sb.lastIndexOf("\0")+1); if(packets.indexOf(POLICY_REQUEST) != -1) send(socketChannel, POLICY_XML.getBytes()); } else if(sb.length() &gt; 8192) { sb.setLength(0); //Force disconnect. throw new IOException(""); } } private void write(SelectionKey key) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); synchronized (this.pendingData) { List&lt;ByteBuffer&gt; queue = (List&lt;ByteBuffer&gt;) this.pendingData.get(socketChannel); if(queue == null || queue.isEmpty()) { // We wrote away all data, so we're no longer interested // in writing on this socket. Switch back to waiting for // data. try { if (key!=null) key.interestOps(SelectionKey.OP_READ); } catch(Exception ex) { if (key!=null) key.cancel(); } } // Write until there's not more data ... while (!queue.isEmpty()) { ByteBuffer buf = (ByteBuffer) queue.get(0); socketChannel.write(buf); if (buf.remaining() &gt; 0) { // ... or the socket's buffer fills up break; } queue.remove(0); } } } private Selector initSelector() throws IOException { // Create a new selector Selector socketSelector = SelectorProvider.provider().openSelector(); // Create a new non-blocking server socket channel this.serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); // Bind the server socket to the specified address and port InetSocketAddress isa = new InetSocketAddress(this.hostAddress, this.port); serverChannel.socket().bind(isa); // Register the server socket channel, indicating an interest in // accepting new connections serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT); return socketSelector; } public static void main(String[] args) { try { new Thread(new PolicyServer(null, 5556)).start(); } catch (IOException e) { e.printStackTrace(); } } } import java.nio.channels.SocketChannel; public class ChangeRequest { public static final int CHANGEOPS = 1; public SocketChannel socket; public int type; public int ops; public ChangeRequest(SocketChannel socket, int type, int ops) { this.socket = socket; this.type = type; this.ops = ops; } } </code></pre>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload