Monday, January 30, 2012

Java 7 new features - 6. NIO2 : D. Non Blocking TCP server/client example


<< Previous Table of Categories Next>>


To illustrate how non-blocking works, the server will sleep for 1 second. You can understand this better by runing thest two classes and check the log messages
Non Blocking TCP Server
 
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Iterator;

public class NonBlockingTcpServer
{    
    public static void main(String[] args) throws Exception
    {
        final int DEFAULT_PORT = 9001;
        ByteBuffer incomingBuffer = ByteBuffer.allocateDirect(1024);
        Charset charset = Charset.defaultCharset();
        CharsetDecoder decoder = charset.newDecoder();
        ByteBuffer outgoingBuffer = ByteBuffer.wrap("World".getBytes());
    
        //Open Selector and ServerSocketChannel
        try (Selector selector = Selector.open();
             ServerSocketChannel serverSocketChannel = ServerSocketChannel.open())
        {
            //Check if both of them were opened
            if ((serverSocketChannel.isOpen()) && (selector.isOpen()))
            {
                //Configure non-blocking mode
                serverSocketChannel.configureBlocking(false);
                //Bind to the specific port number
                serverSocketChannel.bind(new InetSocketAddress(DEFAULT_PORT));
                //Register the current channel with the given selector
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            
                System.out.println("Waiting for incoming connections ...");
                while (true)
                {
                    //wait for incomming events
                    selector.select();
                    //there is something to process on selected keys
                    Iterator keys = selector.selectedKeys().iterator();
                    while (keys.hasNext())
                    {
                        SelectionKey key = (SelectionKey) keys.next();
                        //prevent the same key from coming up again
                        keys.remove();
                        if (!key.isValid())
                        {
                            continue;
                        }
                        //Accept incoming connection
                        if(key.isAcceptable())
                        {
                            ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
                            SocketChannel socketChannel = serverChannel.accept();
                            socketChannel.configureBlocking(false); 
                            System.out.println("Accept incomming connection from: " + socketChannel.getRemoteAddress());
                            socketChannel.register(selector, SelectionKey.OP_READ);
                        }
                        if (key.isReadable())
                        {
                            SocketChannel socketChannel = (SocketChannel) key.channel();
                            incomingBuffer.clear();
                            int numRead = -1;
                            try
                            {
                                numRead = socketChannel.read(incomingBuffer);
                            }
                            catch (Exception e)
                            {
                                e.printStackTrace();
                                key.cancel();
                                continue;
                            }
                            incomingBuffer.flip();
                            String requestMsg = decoder.decode(incomingBuffer).toString();
                            System.out.println("Request from " + socketChannel.getRemoteAddress() + " : " + requestMsg);
                            Thread.sleep(1000);
                            socketChannel.write(outgoingBuffer);
                            outgoingBuffer.flip();
                            socketChannel.shutdownOutput();
                            key.cancel();
                        }
                    }
                }
            }
            else
            {
                System.out.println("The server socket channel or selector cannot be opened!");
            }
        }
        catch (IOException ex)
        {
            System.err.println(ex);
        }
    }
}

Non Blocking TCP Client
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Iterator;
import java.util.Set;

public class NonBlockingTcpClient
{
    public static void main(String[] args)
    {
        final int DEFAULT_PORT = 9001;
        final String IP = "127.0.0.1";
        ByteBuffer receivingBuffer = ByteBuffer.allocateDirect(1024);
        ByteBuffer sendingBuffer = ByteBuffer.wrap("Hello".getBytes());
        Charset charset = Charset.defaultCharset();
        CharsetDecoder decoder = charset.newDecoder();
        String responseMsg = "";
        //open Selector and ServerSocketChannel
        try (Selector selector = Selector.open();
             SocketChannel socketChannel = SocketChannel.open())
        {
            //check that both of them were opened
            if ((socketChannel.isOpen()) && (selector.isOpen()))
            {
                //configure non-blocking mode
                socketChannel.configureBlocking(false);
                //register the current channel with the given selector
                socketChannel.register(selector, SelectionKey.OP_CONNECT);
                socketChannel.connect(new java.net.InetSocketAddress(IP, DEFAULT_PORT));

                //waiting for the connection
                while(selector.select(1000) > 0)
                {
                    //get keys
                    Set keys = selector.selectedKeys();
                    Iterator its = keys.iterator();
                    //process each key
                    while (its.hasNext())
                    {
                        SelectionKey key = (SelectionKey) its.next();
                        //remove the current key
                        its.remove();
                        //get the socket channel for this key
                        try (SocketChannel keySocketChannel = (SocketChannel) key.channel())
                        {
                            //attempt a connection
                            if (key.isConnectable())
                            {
                                //make sure the connection estqablishment has been finished
                                if (keySocketChannel.isConnectionPending())
                                {
                                    keySocketChannel.finishConnect();
                                }
                                keySocketChannel.write(sendingBuffer);
                                keySocketChannel.shutdownOutput();
                                long startTime = System.currentTimeMillis();
                                while (keySocketChannel.read(receivingBuffer) != -1)
                                {
                                    long elapsedTime = System.currentTimeMillis() - startTime;
                                    System.out.println("elapsedTime=" + elapsedTime);
                                    receivingBuffer.flip();
                                    String msgReceived = decoder.decode(receivingBuffer).toString();
                                    System.out.println("Msg received in this loop : " + msgReceived);
                                    responseMsg = responseMsg + msgReceived;
                                    if (receivingBuffer.hasRemaining())
                                    {
                                        receivingBuffer.compact();
                                    }
                                    else
                                    {
                                        receivingBuffer.clear();
                                    }
                                }
                                System.out.println("Response from server : " + responseMsg);
                            }
                            else
                            {
                                System.out.println("The connection cannot be established!");   
                            }
                        }
                        catch (IOException ex)
                        {
                            System.err.println(ex);
                        }
                    }
                }
            }
            else
            {
                System.out.println("The socket channel or selector cannot be opened!");
            }
        }
        catch (IOException ex)
        {
            System.err.println(ex);
        }
    }
}

1 comment: