JAVA8: Multi Core Parallel Computing Feature..

The below code is self explanatory and also needful descriptions are provided as part of comments against each line of code.

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

public class MultiCoreParallelComputation {
  public static void main(String[] args) {
     //In compute intensive task the max number of threads[worker threads + main thread] 
     //created by user should be equal to the number of logical processors or 2*no of cores 
     //which is already considered by the default ForkJoinPool used by parallel stream. 
     //Thus no of core*2 = available logical processors.
     System.out.println("Available logical processors in this system: 
     "+Runtime.getRuntime().availableProcessors());

     //Thus parallelism value in below commonPool, is the no of worker threads available 
     //in this pool to execute the assigned task. In our case in below code, 8 or 20 tasks 
     //will be carried out by available threads[worker+main] in Fork Join Pool thus 
     //"worker+main" number of task will be executed as a badge at a time, then next badge 
     //will be executed and so on up to the end of all tasks.
     System.out.println("FJP Parallelism default possibility info: 
     "+ForkJoinPool.commonPool());

     //Declaring an Integer array. We can user bigger array by uncommenting next one also.
     Integer[] arr = new Integer[] {1, 2, 3, 4, 5, 6, 7, 8};
     //Integer[] arr = new Integer[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 
     //16, 17, 18, 19, 20};

     //Converted the Integer array to List containing integer.
     List<Integer> lst = Arrays.asList(arr);

     //Created a synchronous/sequential stream of integers, then multiplied each integer 
     //value with 2 and printed each output element.Notice each multiplication task is done 
     //by as single main thread sequentially so result is also getting displayed 
     //sequentially. We can comment below line to witness parallel stream behavior next.
     lst.stream().map(x->transformElements(x)).forEach(z->printElements(z));

     //Created a parallel stream of integers, then multiplied each integer value with 2 
     //and printed each output element. Here each task of multiplication will be carried 
     //out by different thread from default ForkJoinPool on different available cores so 
     //outcome will be very fast but with random display or print. We can comment below
     //line to witness same parallel stream behavior in orderly fashion next.
     lst.parallelStream().map(x->transformElements(x)).forEach(z->printElements(z));

     //If you wanna print element orderly after getting executed parallely through
     //different threads and different available cores then use below forEachOrdered method 
     //to print. Remember forEachOrdered will guarranty ordering only if stream guarranties 
     //ordering as in our case stream is constructed out of List which is an order data 
     //structure. We can comment below line to witness usage of custom FJP next.
     lst.parallelStream().map(x->transformElements(x)).forEachOrdered(z->printElements(z));

     //Executing only printing task in a separate customized fork join pool having 
     //customized no of worker threads.
     processElements(lst.parallelStream().map(x->transformElements(x)), 8);
}

private static void processElements(Stream<Integer> strm, int noOfThreads) {
    ForkJoinPool fjp = new ForkJoinPool(noOfThreads);
    fjp.submit(()->strm.forEachOrdered(z->printElements(z)));
    fjp.shutdown();
    try {
        fjp.awaitTermination(10, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

private static int transformElements(Integer ele) {
    sleep(1200);
    int res = ele*2;
    System.out.println("Transforming: "+ele+" by : "+Thread.currentThread()+" to : "+res);
    return res;
}

private static boolean sleep(int ms) {
    try {
        Thread.sleep(ms);
        return true;            
    } catch (InterruptedException e) {
        e.printStackTrace();
        return false;
    }

}

private static void printElements(int z) {
    System.out.println("Printing by "+Thread.currentThread()+" : "+z);
}
}

Related posts