| << Previous | Table of Categories |
In the telecommunication environment, CDR(Call Detail Record) files are widely used. It is very important to load CDR files efficiently. Fortunately, the fork/join framework helps developers take advantage of multiple processors which are used in almost every server.
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
public class CDRLoadingForkJoin
extends RecursiveAction
{
static final int FILE_COUNT_THRESHOLD = 1;
File[] cdrFiles = null;
public CDRLoadingForkJoin(File[] cdrFiles)
{
this.cdrFiles = cdrFiles;
}
@Override
protected void compute()
{
try
{
// Check the number of files
if (cdrFiles.length <= FILE_COUNT_THRESHOLD)
{
loadCDRFiles(cdrFiles);
}
else
{
// Split the array of CDR files into two equal parts
int center = cdrFiles.length / 2;
File[] part1 = splitArray(cdrFiles, 0, center);
File[] part2 = splitArray(cdrFiles, center, cdrFiles.length);
invokeAll(new CDRLoadingForkJoin(part1), new CDRLoadingForkJoin(part2));
}
}
catch (Exception e)
{
e.printStackTrace();
}
}
protected File[] splitArray(File[] array, int start, int end)
{
int length = end - start;
File[] part = new File[length];
for (int i = start; i < end; i++)
{
part[i - start] = array[i];
}
return part;
}
protected void loadCDRFiles(File[] filesToLoad)
{
for (File file: filesToLoad)
{
if (file.getName().endsWith(".cdr"))
{
try
{
BufferedReader reader = new BufferedReader(new FileReader(file));
String line = null;
while ((line = reader.readLine()) != null)
{
//load to database ..
}
}
catch (Exception e)
{
e.printStackTrace();
}
}
}
}
// Start the CDR file loading process with the Java SE 7 Fork/Join framework
public static void main(String[] args)
{
String cdrDirPath = "D:\\CDR\\";
File cdrDir = new File(cdrDirPath);
if(!cdrDir.isDirectory())
{
System.err.println(cdrDirPath + " is not a valid directory");
}
File[] files = cdrDir.listFiles();
CDRLoadingForkJoin process = new CDRLoadingForkJoin(files);
int processors = Runtime.getRuntime().availableProcessors();
ForkJoinPool pool = new ForkJoinPool(processors);
pool.invoke(process);
}
}