Fork/Join并行计算框架
简介
Fork/Join框架:在必要时将一个大任务进行拆分(fork)成若干个小任务(拆到不可再拆时),再将一个个
小任务的运行结果进行join合并,一般都是在大数据搜索中使用
Fork/Join采用“工作窃取” 模式(work-stealing) :
当执行新的任务时它可以将其拆分分成更小的任务执行,并将小任务加到线程队列中。在程序执行过程中如果有闲置线程,闲置线程会把其他线程的线程队列末尾的任务偷偷拿到自己的线程中执行,注意线程队列是双关,即队列里最前面的任务会被直接,最后的任务会被闲置线程拿走去执行。这种方式减少了线程的等待时间,提高了性能
具体使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| public class Test1 { @Test public void test01() { ForkJoinPool pool = new ForkJoinPool(); Long sum = pool.invoke(new ForkJoinCalculateTask(0,10000000L)); System.out.println(sum); } }
class ForkJoinCalculateTask extends RecursiveTask<Long>{
private static final long serialVersionUID = 1975909145028788652L; private long startNum; private long endNum; private static final long THRESHOLD = 10000;
public ForkJoinCalculateTask(long startNum, long endNum) { this.startNum = startNum; this.endNum = endNum; }
@Override protected Long compute() { long len = endNum-startNum; if(len <= THRESHOLD){ long sum = 0; for (long i = startNum; i <= endNum; i++) { sum += i; } return sum; }else{ long mid = (startNum+endNum)/2; ForkJoinCalculateTask left = new ForkJoinCalculateTask(startNum, mid); left.fork(); ForkJoinCalculateTask right = new ForkJoinCalculateTask(mid+1, endNum); right.fork(); return left.join() + right.join(); } } }
|