Java 7开始引入了一种新的Fork/Join线程池,它可以执行一种特殊的任务:把一个大任务拆成多个小任务并行执行。
我们举个例子:如果要计算一个超大数组的和,最简单的做法是用一个循环在一个线程内完成:
┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐ └─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘
还有一种方法,可以把数组拆成两部分,分别计算,最后加起来就是最终结果,这样可以用两个线程并行执行:
┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐ └─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘ ┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐ └─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘
如果拆成两部分还是很大,我们还可以继续拆,用4个线程并行执行:
┌─┬─┬─┬─┬─┬─┐ └─┴─┴─┴─┴─┴─┘ ┌─┬─┬─┬─┬─┬─┐ └─┴─┴─┴─┴─┴─┘ ┌─┬─┬─┬─┬─┬─┐ └─┴─┴─┴─┴─┴─┘ ┌─┬─┬─┬─┬─┬─┐ └─┴─┴─┴─┴─┴─┘
这就是Fork/Join任务的原理:判断一个任务是否足够小,如果是,直接计算,否则,就分拆成几个小任务分别计算。这个过程可以反复“裂变”成一系列小任务。
我们来看如何使用Fork/Join对大数据进行并行求和:
import java.util.Random; import java.util.concurrent.*; public class Main { public static void main(String[] args) throws Exception { // 创建2000个随机数组成的数组: long[] array = new long[2000]; long expectedSum = 0; for (int i = 0; i < array.length; i++) { array[i] = random(); expectedSum += array[i]; } System.out.println("Expected sum: " + expectedSum); // fork/join: ForkJoinTask<Long> task = new SumTask(array, 0, array.length); long startTime = System.currentTimeMillis(); Long result = ForkJoinPool.commonPool().invoke(task); long endTime = System.currentTimeMillis(); System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms."); } static Random random = new Random(0); static long random() { return random.nextInt(10000); } } class SumTask extends RecursiveTask<Long> { static final int THRESHOLD = 500; long[] array; int start; int end; SumTask(long[] array, int start, int end) { this.array = array; this.start = start; this.end = end; } @Override protected Long compute() { if (end - start <= THRESHOLD) { // 如果任务足够小,直接计算: long sum = 0; for (int i = start; i < end; i++) { sum += this.array[i]; // 故意放慢计算速度: try { Thread.sleep(1); } catch (InterruptedException e) { } } return sum; } // 任务太大,一分为二: int middle = (end + start) / 2; System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end)); SumTask subtask1 = new SumTask(this.array, start, middle); SumTask subtask2 = new SumTask(this.array, middle, end); invokeAll(subtask1, subtask2); Long subresult1 = subtask1.join(); Long subresult2 = subtask2.join(); Long result = subresult1 + subresult2; System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result); return result; } }
观察上述代码的执行过程,一个大的计算任务0~2000首先分裂为两个小任务0~1000和1000~2000,这两个小任务仍然太大,继续分裂为更小的0~500,500~1000,1000~1500,1500~2000,最后,计算结果被依次合并,得到最终结果。
因此,核心代码SumTask
继承自RecursiveTask
,在compute()
方法中,关键是如何“分裂”出子任务并且提交子任务: