<< Previous | Table of Categories |
In the telecommunication environment, CDR(Call Detail Record) files are widely used. It is very important to load CDR files efficiently. Fortunately, the fork/join framework helps developers take advantage of multiple processors which are used in almost every server.
import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; public class CDRLoadingForkJoin extends RecursiveAction { static final int FILE_COUNT_THRESHOLD = 1; File[] cdrFiles = null; public CDRLoadingForkJoin(File[] cdrFiles) { this.cdrFiles = cdrFiles; } @Override protected void compute() { try { // Check the number of files if (cdrFiles.length <= FILE_COUNT_THRESHOLD) { loadCDRFiles(cdrFiles); } else { // Split the array of CDR files into two equal parts int center = cdrFiles.length / 2; File[] part1 = splitArray(cdrFiles, 0, center); File[] part2 = splitArray(cdrFiles, center, cdrFiles.length); invokeAll(new CDRLoadingForkJoin(part1), new CDRLoadingForkJoin(part2)); } } catch (Exception e) { e.printStackTrace(); } } protected File[] splitArray(File[] array, int start, int end) { int length = end - start; File[] part = new File[length]; for (int i = start; i < end; i++) { part[i - start] = array[i]; } return part; } protected void loadCDRFiles(File[] filesToLoad) { for (File file: filesToLoad) { if (file.getName().endsWith(".cdr")) { try { BufferedReader reader = new BufferedReader(new FileReader(file)); String line = null; while ((line = reader.readLine()) != null) { //load to database .. } } catch (Exception e) { e.printStackTrace(); } } } } // Start the CDR file loading process with the Java SE 7 Fork/Join framework public static void main(String[] args) { String cdrDirPath = "D:\\CDR\\"; File cdrDir = new File(cdrDirPath); if(!cdrDir.isDirectory()) { System.err.println(cdrDirPath + " is not a valid directory"); } File[] files = cdrDir.listFiles(); CDRLoadingForkJoin process = new CDRLoadingForkJoin(files); int processors = Runtime.getRuntime().availableProcessors(); ForkJoinPool pool = new ForkJoinPool(processors); pool.invoke(process); } }