Wednesday, February 22, 2012

Java 7 new features - 9. Fork/Join : CDR Loading example


<< Previous Table of Categories


In the telecommunication environment, CDR(Call Detail Record) files are widely used. It is very important to load CDR files efficiently. Fortunately, the fork/join framework helps developers take advantage of multiple processors which are used in almost every server.
 
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

public class CDRLoadingForkJoin
    extends RecursiveAction
{
    static final int FILE_COUNT_THRESHOLD = 1;
    File[] cdrFiles = null;

    public CDRLoadingForkJoin(File[] cdrFiles)
    {
        this.cdrFiles = cdrFiles;
    }

    @Override
    protected void compute()
    {
        try
        {
            // Check the number of files
            if (cdrFiles.length <= FILE_COUNT_THRESHOLD)
            {
                loadCDRFiles(cdrFiles);
            }
            else
            {
                // Split the array of CDR files into two equal parts
                int center = cdrFiles.length / 2;
                File[] part1 = splitArray(cdrFiles, 0, center);
                File[] part2 = splitArray(cdrFiles, center, cdrFiles.length);
                invokeAll(new CDRLoadingForkJoin(part1), new CDRLoadingForkJoin(part2));
            }
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
    }

    protected File[] splitArray(File[] array, int start, int end)
    {
        int length = end - start;
        File[] part = new File[length];
        for (int i = start; i < end; i++)
        {
            part[i - start] = array[i];
        }
        return part;
    }

    protected void loadCDRFiles(File[] filesToLoad)
    {
        for (File file: filesToLoad)
        {
            if (file.getName().endsWith(".cdr"))
            {
                try
                {
                    BufferedReader reader = new BufferedReader(new FileReader(file));
                    String line = null;
                    while ((line = reader.readLine()) != null)
                    {
                        //load to database ..
                    }
                }
                catch (Exception e)
                {
                    e.printStackTrace();
                }
            }
        }
    }

    // Start the CDR file loading process with the Java SE 7 Fork/Join framework
    public static void main(String[] args)
    {
        String cdrDirPath = "D:\\CDR\\";
        File cdrDir = new File(cdrDirPath);
        if(!cdrDir.isDirectory())
        {
            System.err.println(cdrDirPath + " is not a valid directory");
        }
        File[] files = cdrDir.listFiles();
        CDRLoadingForkJoin process = new CDRLoadingForkJoin(files);
        int processors = Runtime.getRuntime().availableProcessors();
        ForkJoinPool pool = new ForkJoinPool(processors);
        pool.invoke(process);
    }
}

Tuesday, February 7, 2012

Java 7 new features - 8. NIO2 : F. Review - Traditional TCP server/client example


<< Previous Table of Categories Next >>


After we learnt how the NIO2 works, let's review the traditional socket programming
Traditional TCP Server
 
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;

public class TraditionalTcpSocketServer
{
    public static void main(String[] argv)
        throws Exception
    {
        String clientSentence;
        ServerSocket welcomeSocket = new ServerSocket(9001);
        while (true)
        {
            Socket connectionSocket = welcomeSocket.accept();
            BufferedReader inFromClient = new BufferedReader(new InputStreamReader(connectionSocket.getInputStream()));
            DataOutputStream outToClient = new DataOutputStream(connectionSocket.getOutputStream());
            clientSentence = inFromClient.readLine();
            System.out.println("FROM CLIENT: " + clientSentence);
            outToClient.writeBytes("World" + System.getProperty("line.separator"));
        }
    }
}
Traditional TCP Client
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.Socket;

public class TraditionalTcpSocketClient
{
    public static void main(String[] argv)
        throws Exception
    {
        String sentence = "Hello";
        String modifiedSentence;
        Socket clientSocket = new Socket("localhost", 9001);
        DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
        BufferedReader inFromServer = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
        outToServer.writeBytes(sentence + System.getProperty("line.separator"));
        modifiedSentence = inFromServer.readLine();
        System.out.println("FROM SERVER: " + modifiedSentence);
        clientSocket.close();
    }
}

Monday, February 6, 2012

Java 7 new features - 7. NIO2 : E. Asynchronous TCP server/client example


<< Previous Table of Categories Next>>


To illustrate how Asynchronous works, there are some delay in both the server and the client. You can easily play this on your PC to understand how the Asynchronous channels work.
Asynchronous TCP Server
 
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class AsynchronousTcpServer
{
    public static void main(String[] args)
    {

        final int SERVER_PORT = 9001;
        final String SERVER_IP = "127.0.0.1";

        //create asynchronous server-socket channel bound to the default group
        try (AsynchronousServerSocketChannel asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open())
        {
            if (asynchronousServerSocketChannel.isOpen())
            {
                //bind to local address
                asynchronousServerSocketChannel.bind(new InetSocketAddress(SERVER_IP, SERVER_PORT));

                //display a waiting message 
                System.out.println("Waiting for connections ...");
                while (true)
                {
                    Future<AsynchronousSocketChannel> asynchronousSocketChannelFuture =
                        asynchronousServerSocketChannel.accept();
                    try (AsynchronousSocketChannel asynchronousSocketChannel = asynchronousSocketChannelFuture.get())
                    {
                        System.out.println("Incoming connection from: " + asynchronousSocketChannel.getRemoteAddress());
                        ByteBuffer incomingBuffer = ByteBuffer.allocateDirect(1024);
                        //receiving data
                        asynchronousSocketChannel.read(incomingBuffer, incomingBuffer,  new CompletionHandler<Integer, ByteBuffer>()
                        {
                            public void completed(Integer result, ByteBuffer buffer)
                            {
                                buffer.flip();
                                String msgReceived = Charset.defaultCharset().decode(buffer).toString();
                                System.out.println("Msg received from the client : " + msgReceived);
                            }
                            public void failed(Throwable exc, ByteBuffer buffer)
                            {
                                throw new UnsupportedOperationException("read failed!");
                            }                                             
                        });
                        try
                        {
                            Thread.sleep(5000);
                        }
                        catch(Exception e){}
                        
                        //replying data
                        ByteBuffer outgoingBuffer = ByteBuffer.wrap("World".getBytes());
                        asynchronousSocketChannel.write(outgoingBuffer).get();
                    }
                    catch (IOException | InterruptedException | ExecutionException ex)
                    {
                        System.err.println(ex);
                    }
                }
            }
            else
            {
                System.out.println("The asynchronous server-socket channel cannot be opened!");
            }
        }
        catch (IOException ex)
        {
            System.err.println(ex);
        }
    }
}
Asynchronous TCP Client
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutionException;

public class AsynchronousTcpClient
{
    static boolean completed = false;
    public static void main(String[] args)
    {
        
        final int SERVER_PORT = 9001;
        final String SERVER_IP = "127.0.0.1";
       
        ByteBuffer receivingBuffer = ByteBuffer.allocateDirect(1024);
        ByteBuffer sendingBuffer = ByteBuffer.wrap("Hello".getBytes());
               
        //create asynchronous socket channel
        try (final AsynchronousSocketChannel asynchronousSocketChannel = AsynchronousSocketChannel.open())
        {
            if (asynchronousSocketChannel.isOpen())
            {
                //connect this channel's socket
                Void connect = asynchronousSocketChannel.connect(new InetSocketAddress(SERVER_IP, SERVER_PORT)).get();
                if (connect == null)
                {
                    System.out.println("Local address: " + asynchronousSocketChannel.getLocalAddress());
                    
                    //sending data
                    asynchronousSocketChannel.write(sendingBuffer).get();
                    asynchronousSocketChannel.read(receivingBuffer, receivingBuffer,  new CompletionHandler<Integer, ByteBuffer>()
                    {
                        public void completed(Integer result, ByteBuffer buffer)
                        {
                            buffer.flip();
                            String msgReceived = Charset.defaultCharset().decode(buffer).toString();
                            System.out.println("Msg received from server : " + msgReceived);
                            completed = true;
                        }
                        public void failed(Throwable exc, ByteBuffer buffer)
                        {
                            completed = false;
                            throw new UnsupportedOperationException("read failed!");
                        }                                             
                  });
                  
                  while(!completed)
                  {
                      try
                      {
                          Thread.sleep(1000);
                      }
                      catch(Exception e){}
                      System.out.println("Waiting for response from the server");
                  } 
                  
                }
                else
                {
                    System.out.println("The connection cannot be established!");
                }
            }
            else
            {
                System.out.println("The asynchronous socket channel cannot be opened!");
            }
        }
        catch (IOException | InterruptedException | ExecutionException ex)
        {
            System.err.println(ex);
        }
      
    }
}