Monday, January 30, 2012

Java 7 new features - 6. NIO2 : D. Non Blocking TCP server/client example


<< Previous Table of Categories Next>>


To illustrate how non-blocking works, the server will sleep for 1 second. You can understand this better by runing thest two classes and check the log messages
Non Blocking TCP Server
 
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Iterator;

public class NonBlockingTcpServer
{    
    public static void main(String[] args) throws Exception
    {
        final int DEFAULT_PORT = 9001;
        ByteBuffer incomingBuffer = ByteBuffer.allocateDirect(1024);
        Charset charset = Charset.defaultCharset();
        CharsetDecoder decoder = charset.newDecoder();
        ByteBuffer outgoingBuffer = ByteBuffer.wrap("World".getBytes());
    
        //Open Selector and ServerSocketChannel
        try (Selector selector = Selector.open();
             ServerSocketChannel serverSocketChannel = ServerSocketChannel.open())
        {
            //Check if both of them were opened
            if ((serverSocketChannel.isOpen()) && (selector.isOpen()))
            {
                //Configure non-blocking mode
                serverSocketChannel.configureBlocking(false);
                //Bind to the specific port number
                serverSocketChannel.bind(new InetSocketAddress(DEFAULT_PORT));
                //Register the current channel with the given selector
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            
                System.out.println("Waiting for incoming connections ...");
                while (true)
                {
                    //wait for incomming events
                    selector.select();
                    //there is something to process on selected keys
                    Iterator keys = selector.selectedKeys().iterator();
                    while (keys.hasNext())
                    {
                        SelectionKey key = (SelectionKey) keys.next();
                        //prevent the same key from coming up again
                        keys.remove();
                        if (!key.isValid())
                        {
                            continue;
                        }
                        //Accept incoming connection
                        if(key.isAcceptable())
                        {
                            ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
                            SocketChannel socketChannel = serverChannel.accept();
                            socketChannel.configureBlocking(false); 
                            System.out.println("Accept incomming connection from: " + socketChannel.getRemoteAddress());
                            socketChannel.register(selector, SelectionKey.OP_READ);
                        }
                        if (key.isReadable())
                        {
                            SocketChannel socketChannel = (SocketChannel) key.channel();
                            incomingBuffer.clear();
                            int numRead = -1;
                            try
                            {
                                numRead = socketChannel.read(incomingBuffer);
                            }
                            catch (Exception e)
                            {
                                e.printStackTrace();
                                key.cancel();
                                continue;
                            }
                            incomingBuffer.flip();
                            String requestMsg = decoder.decode(incomingBuffer).toString();
                            System.out.println("Request from " + socketChannel.getRemoteAddress() + " : " + requestMsg);
                            Thread.sleep(1000);
                            socketChannel.write(outgoingBuffer);
                            outgoingBuffer.flip();
                            socketChannel.shutdownOutput();
                            key.cancel();
                        }
                    }
                }
            }
            else
            {
                System.out.println("The server socket channel or selector cannot be opened!");
            }
        }
        catch (IOException ex)
        {
            System.err.println(ex);
        }
    }
}

Non Blocking TCP Client
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Iterator;
import java.util.Set;

public class NonBlockingTcpClient
{
    public static void main(String[] args)
    {
        final int DEFAULT_PORT = 9001;
        final String IP = "127.0.0.1";
        ByteBuffer receivingBuffer = ByteBuffer.allocateDirect(1024);
        ByteBuffer sendingBuffer = ByteBuffer.wrap("Hello".getBytes());
        Charset charset = Charset.defaultCharset();
        CharsetDecoder decoder = charset.newDecoder();
        String responseMsg = "";
        //open Selector and ServerSocketChannel
        try (Selector selector = Selector.open();
             SocketChannel socketChannel = SocketChannel.open())
        {
            //check that both of them were opened
            if ((socketChannel.isOpen()) && (selector.isOpen()))
            {
                //configure non-blocking mode
                socketChannel.configureBlocking(false);
                //register the current channel with the given selector
                socketChannel.register(selector, SelectionKey.OP_CONNECT);
                socketChannel.connect(new java.net.InetSocketAddress(IP, DEFAULT_PORT));

                //waiting for the connection
                while(selector.select(1000) > 0)
                {
                    //get keys
                    Set keys = selector.selectedKeys();
                    Iterator its = keys.iterator();
                    //process each key
                    while (its.hasNext())
                    {
                        SelectionKey key = (SelectionKey) its.next();
                        //remove the current key
                        its.remove();
                        //get the socket channel for this key
                        try (SocketChannel keySocketChannel = (SocketChannel) key.channel())
                        {
                            //attempt a connection
                            if (key.isConnectable())
                            {
                                //make sure the connection estqablishment has been finished
                                if (keySocketChannel.isConnectionPending())
                                {
                                    keySocketChannel.finishConnect();
                                }
                                keySocketChannel.write(sendingBuffer);
                                keySocketChannel.shutdownOutput();
                                long startTime = System.currentTimeMillis();
                                while (keySocketChannel.read(receivingBuffer) != -1)
                                {
                                    long elapsedTime = System.currentTimeMillis() - startTime;
                                    System.out.println("elapsedTime=" + elapsedTime);
                                    receivingBuffer.flip();
                                    String msgReceived = decoder.decode(receivingBuffer).toString();
                                    System.out.println("Msg received in this loop : " + msgReceived);
                                    responseMsg = responseMsg + msgReceived;
                                    if (receivingBuffer.hasRemaining())
                                    {
                                        receivingBuffer.compact();
                                    }
                                    else
                                    {
                                        receivingBuffer.clear();
                                    }
                                }
                                System.out.println("Response from server : " + responseMsg);
                            }
                            else
                            {
                                System.out.println("The connection cannot be established!");   
                            }
                        }
                        catch (IOException ex)
                        {
                            System.err.println(ex);
                        }
                    }
                }
            }
            else
            {
                System.out.println("The socket channel or selector cannot be opened!");
            }
        }
        catch (IOException ex)
        {
            System.err.println(ex);
        }
    }
}

Monday, January 16, 2012

Java 7 new features - 5. NIO2 : C. Blocking TCP server/client example


<< Previous Table of Categories Next>>


Java 7 introduces a new interface - NetworkChannel that provides common methods to all network channel classes and a new SocketOption interface and StandardSocketOptions class. Please check the following exmaple. To demonstrate how ByteBuffer works, I just set buffer size as 2.

Blocking TCP Server
 
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;

public class BlockingTcpServer
{
    public static void main(String[] args)
    {
        final int SERVER_PORT = 9001;
        final String SERVER_IP = "127.0.0.1";
        
        ByteBuffer incomingBuffer = ByteBuffer.allocateDirect(2);
        ByteBuffer outgoingBuffer = ByteBuffer.wrap("World".getBytes());
        Charset charset = Charset.defaultCharset();
        CharsetDecoder decoder = charset.newDecoder();
        String requestMsg = "";
        
        try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open())
        {
            if (serverSocketChannel.isOpen())
            {
                serverSocketChannel.configureBlocking(true);
                
                //set options
                serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 1024);
                serverSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
                
                //bind the server socket channel to local address
                serverSocketChannel.bind(new InetSocketAddress(SERVER_IP, SERVER_PORT));
                while (true)
                {
                    try (SocketChannel socketChannel = serverSocketChannel.accept())
                    {
                        
                        while (socketChannel.read(incomingBuffer) != -1)
                        {
                            incomingBuffer.flip();
                            String msgReceived = decoder.decode(incomingBuffer).toString();
                            System.out.println("Msg received in this loop : " + msgReceived);
                            requestMsg = requestMsg + msgReceived;
                            if (incomingBuffer.hasRemaining())
                            {
                                incomingBuffer.compact();
                            }
                            else
                            {
                                incomingBuffer.clear();
                            }
                        }
                        System.out.println("Request from " + socketChannel.getRemoteAddress() 
                                                           + " : " + requestMsg);
                        socketChannel.write(outgoingBuffer);
                        outgoingBuffer.flip();
                    }
                    catch (IOException ex)
                    {
                        ex.printStackTrace();
                    }
                }
            }
            else
            {
                System.out.println("The server socket channel cannot be opened!");
            }
        }
        catch (IOException ex)
        {
            System.err.println(ex);
        }
    }
}
Blocking TCP Client
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;

public class BlockingTcpClient
{
    public static void main(String[] args)
        throws IOException
    {
        final int SERVER_PORT = 9001;
        final String SERVER_IP = "127.0.0.1";
        ByteBuffer receivingBuffer = ByteBuffer.allocateDirect(2);
        ByteBuffer sendingBuffer = ByteBuffer.wrap("Hello".getBytes());
        Charset charset = Charset.defaultCharset();
        CharsetDecoder decoder = charset.newDecoder();
        String responseMsg = "";
        //create a new socket channel
        try (SocketChannel socketChannel = SocketChannel.open())
        {
            if (socketChannel.isOpen())
            {
                //set the blocking mode
                socketChannel.configureBlocking(true);
                
                socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, 1024);
                socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 1024);
                socketChannel.setOption(StandardSocketOptions.SO_LINGER, 10);
            
                //establish channel connection
                socketChannel.connect(new InetSocketAddress(SERVER_IP, SERVER_PORT));
                if (socketChannel.isConnected())
                {
                    //sending data
                    socketChannel.write(sendingBuffer);
                    socketChannel.shutdownOutput();
                    //receving data
                    while (socketChannel.read(receivingBuffer) != -1)
                    {
                        receivingBuffer.flip();
                        String msgReceived = decoder.decode(receivingBuffer).toString();
                        System.out.println("Msg received in this loop : " + msgReceived);
                        responseMsg = responseMsg + msgReceived; 
                        if (receivingBuffer.hasRemaining())
                        {
                            receivingBuffer.compact();
                        }
                        else
                        {
                            receivingBuffer.clear();
                        }
                    }
                   
                    System.out.println("Response from server : "+ responseMsg);
                }
                else
                {
                    System.out.println("The connection cannot be established!");
                }
            }
            else
            {
                System.out.println("The socket channel cannot be opened!");
            }
        }
        catch (IOException ex)
        {
            System.err.println(ex);
        }


    }
}