介绍
随着多核芯片逐渐成为主流,大多数软件开发人员不可避免地需要了解并行编程的知识。而同时,主流程序语言正在将越来越多的并行特性合并到标准库或者语言本身之中。我们可以看到,JDK 在这方面同样走在潮流的前方。在 JDK 标准版 5 中,由 Doug Lea 提供的并行框架成为了标准库的一部分(JSR-166)。随后,在 JDK 6 中,一些新的并行特性,例如并行 collection 框架,合并到了标准库中(JSR-166x)。直到今天,尽管 Java SE 7 还没有正式发布,一些并行相关的新特性已经出现在 JSR-166y 中:
- Fork/Join 模式;
- TransferQueue,它继承自 BlockingQueue 并能在队列满时阻塞“生产者”;
- ArrayTasks/ListTasks,用于并行执行某些数组/列表相关任务的类;
- IntTasks/LongTasks/DoubleTasks,用于并行处理数字类型数组的工具类,提供了排序、查找、求和、求最小值、求最大值等功能;
其中,对 Fork/Join 模式的支持可能是对开发并行软件来说最通用的新特性。在 JSR-166y 中,Doug Lea 实现 ArrayTasks/ListTasks/IntTasks/LongTasks/DoubleTasks 时就大量的用到了 Fork/Join 模式。读者还需要注意一点,因为 JDK 7 还没有正式发布,因此本文涉及到的功能和发布版本有可能不一样。
Fork/Join 模式有自己的适用范围。如果一个应用能被分解成多个子任务,并且组合多个子任务的结果就能够获得最终的答案,那么这个应用就适合用 Fork/Join 模式来解决。图 1 给出了一个 Fork/Join 模式的示意图,位于图上部的 Task 依赖于位于其下的 Task 的执行,只有当所有的子任务都完成之后,调用者才能获得 Task 0 的返回结果。
图 1. Fork/Join 模式示意图
可以说,Fork/Join 模式能够解决很多种类的并行问题。通过使用 Doug Lea 提供的 Fork/Join 框架,软件开发人员只需要关注任务的划分和中间结果的组合就能充分利用并行平台的优良性能。其他和并行相关的诸多难于处理的问题,例如负载平衡、同步等,都可以由框架采用统一的方式解决。这样,我们就能够轻松地获得并行的好处而避免了并行编程的困难且容易出错的缺点。
使用 Fork/Join 模式
在开始尝试 Fork/Join 模式之前,我们需要从 Doug Lea 主持的 Concurrency JSR-166 Interest Site 上下载 JSR-166y 的源代码,并且我们还需要安装最新版本的 JDK 6(下载网址请参阅 参考资源)。Fork/Join 模式的使用方式非常直观。首先,我们需要编写一个 ForkJoinTask 来完成子任务的分割、中间结果的合并等工作。随后,我们将这个 ForkJoinTask 交给 ForkJoinPool 来完成应用的执行。
通常我们并不直接继承 ForkJoinTask,它包含了太多的抽象方法。针对特定的问题,我们可以选择 ForkJoinTask 的不同子类来完成任务。RecursiveAction 是 ForkJoinTask 的一个子类,它代表了一类最简单的 ForkJoinTask:不需要返回值,当子任务都执行完毕之后,不需要进行中间结果的组合。如果我们从 RecursiveAction 开始继承,那么我们只需要重载 protected void compute()
方法。下面,我们来看看怎么为快速排序算法建立一个 ForkJoinTask 的子类:
清单 1. ForkJoinTask 的子类
classSortTaskextendsRecursiveAction...{
finallong[]array;
finalintlo;
finalinthi;
privateintTHRESHOLD=30;
publicSortTask(long[]array)...{
this.array=array;
this.lo=0;
this.hi=array.length-1;
}
publicSortTask(long[]array,intlo,inthi)...{
this.array=array;
this.lo=lo;
this.hi=hi;
}
protectedvoidcompute()...{
if(hi-lo<THRESHOLD)
sequentiallySort(array,lo,hi);
else...{
intpivot=partition(array,lo,hi);
coInvoke(newSortTask(array,lo,pivot-1),newSortTask(array,
pivot+1,hi));
}
}
privateintpartition(long[]array,intlo,inthi)...{
longx=array[hi];
inti=lo-1;
for(intj=lo;j<hi;j++)...{
if(array[j]<=x)...{
i++;
swap(array,i,j);
}
}
swap(array,i+1,hi);
returni+1;
}
privatevoidswap(long[]array,inti,intj)...{
if(i!=j)...{
longtemp=array[i];
array[i]=array[j];
array[j]=temp;
}
}
privatevoidsequentiallySort(long[]array,intlo,inthi)...{
Arrays.sort(array,lo,hi+1);
}
}
在 清单 1 中,SortTask 首先通过 partition()
方法将数组分成两个部分。随后,两个子任务将被生成并分别排序数组的两个部分。当子任务足够小时,再将其分割为更小的任务反而引起性能的降低。因此,这里我们使用一个 THRESHOLD
,限定在子任务规模较小时,使用直接排序,而不是再将其分割成为更小的任务。其中,我们用到了 RecursiveAction 提供的方法 coInvoke()
。它表示:启动所有的任务,并在所有任务都正常结束后返回。如果其中一个任务出现异常,则其它所有的任务都取消。coInvoke()
的参数还可以是任务的数组。
现在剩下的工作就是将 SortTask 提交到 ForkJoinPool 了。ForkJoinPool()
默认建立具有与 CPU 可使用线程数相等线程个数的线程池。我们在一个 JUnit 的 test
方法中将 SortTask 提交给一个新建的 ForkJoinPool:
清单 2. 新建的 ForkJoinPool
@Test
publicvoidtestSort()throwsException...{
ForkJoinTasksort=newSortTask(array);
ForkJoinPoolfjpool=newForkJoinPool();
fjpool.submit(sort);
fjpool.shutdown();
fjpool.awaitTermination(30,TimeUnit.SECONDS);
assertTrue(checkSorted(array));
}
在上面的代码中,我们用到了 ForkJoinPool 提供的如下函数:
-
submit()
:将 ForkJoinTask 类的对象提交给 ForkJoinPool,ForkJoinPool 将立刻开始执行 ForkJoinTask。
-
shutdown()
:执行此方法之后,ForkJoinPool 不再接受新的任务,但是已经提交的任务可以继续执行。如果希望立刻停止所有的任务,可以尝试 shutdownNow()
方法。
-
awaitTermination()
:阻塞当前线程直到 ForkJoinPool 中所有的任务都执行结束。
并行快速排序的完整代码如下所示:
清单 3. 并行快速排序的完整代码
packagetests;
importstaticorg.junit.Assert.*;
importjava.util.Arrays;
importjava.util.Random;
importjava.util.concurrent.TimeUnit;
importjsr166y.forkjoin.ForkJoinPool;
importjsr166y.forkjoin.ForkJoinTask;
importjsr166y.forkjoin.RecursiveAction;
importorg.junit.Before;
importorg.junit.Test;
classSortTaskextendsRecursiveAction...{
finallong[]array;
finalintlo;
finalinthi;
privateintTHRESHOLD=0;//Fordemoonly
publicSortTask(long[]array)...{
this.array=array;
this.lo=0;
this.hi=array.length-1;
}
publicSortTask(long[]array,intlo,inthi)...{
this.array=array;
this.lo=lo;
this.hi=hi;
}
protectedvoidcompute()...{
if(hi-lo<THRESHOLD)
sequentiallySort(array,lo,hi);
else...{
intpivot=partition(array,lo,hi);
System.out.println(" pivot="+pivot+",low="+lo+",high="+hi);
System.out.println("array"+Arrays.toString(array));
coInvoke(newSortTask(array,lo,pivot-1),newSortTask(array,
pivot+1,hi));
}
}
privateintpartition(long[]array,intlo,inthi)...{
longx=array[hi];
inti=lo-1;
for(intj=lo;j<hi;j++)...{
if(array[j]<=x)...{
i++;
swap(array,i,j);
}
}
swap(array,i+1,hi);
returni+1;
}
privatevoidswap(long[]array,inti,intj)...{
if(i!=j)...{
longtemp=array[i];
array[i]=array[j];
array[j]=temp;
}
}
privatevoidsequentiallySort(long[]array,intlo,inthi)...{
Arrays.sort(array,lo,hi+1);
}
}
publicclassTestForkJoinSimple...{
privatestaticfinalintNARRAY=16;//Fordemoonly
long[]array=newlong[NARRAY];
Randomrand=newRandom();
@Before
publicvoidsetUp()...{
for(inti=0;i<array.length;i++)...{
array[i]=rand.nextLong()%100;//Fordemoonly
}
System.out.println("InitialArray:"+Arrays.toString(array));
}
@Test
publicvoidtestSort()throwsException...{
ForkJoinTasksort=newSortTask(array);
ForkJoinPoolfjpool=newForkJoinPool();
fjpool.submit(sort);
fjpool.shutdown();
fjpool.awaitTermination(30,TimeUnit.SECONDS);
assertTrue(checkSorted(array));
}
booleancheckSorted(long[]a)...{
for(inti=0;i<a.length-1;i++)...{
if(a[i]>(a[i+1]))...{
returnfalse;
}
}
returntrue;
}
}
运行以上代码,我们可以得到以下结果:
InitialArray:[46,-12,74,-67,76,-13,-91,-96]
pivot=0,low=0,high=7
array[-96,-12,74,-67,76,-13,-91,46]
pivot=5,low=1,high=7
array[-96,-12,-67,-13,-91,46,76,74]
pivot=1,low=1,high=4
array[-96,-91,-67,-13,-12,46,74,76]
pivot=4,low=2,high=4
array[-96,-91,-67,-13,-12,46,74,76]
pivot=3,low=2,high=3
array[-96,-91,-67,-13,-12,46,74,76]
pivot=2,low=2,high=2
array[-96,-91,-67,-13,-12,46,74,76]
pivot=6,low=6,high=7
array[-96,-91,-67,-13,-12,46,74,76]
pivot=7,low=7,high=7
array[-96,-91,-67,-13,-12,46,74,76]
Fork/Join 模式高级特性
使用 RecursiveTask
除了 RecursiveAction,Fork/Join 框架还提供了其他 ForkJoinTask 子类:带有返回值的 RecursiveTask,使用 finish()
方法显式中止的 AsyncAction 和 LinkedAsyncAction,以及可使用 TaskBarrier 为每个任务设置不同中止条件的 CyclicAction。
从 RecursiveTask 继承的子类同样需要重载 protected void compute()
方法。与 RecursiveAction 稍有不同的是,它可使用泛型指定一个返回值的类型。下面,我们来看看如何使用 RecursiveTask 的子类。
清单 4. RecursiveTask 的子类
classFibonacciextendsRecursiveTask<Integer>...{
finalintn;
Fibonacci(intn)...{
this.n=n;
}
privateintcompute(intsmall)...{
finalint[]results=...{1,1,2,3,5,8,13,21,34,55,89};
returnresults[small];
}
publicIntegercompute()...{
if(n<=10)...{
returncompute(n);
}
Fibonaccif1=newFibonacci(n-1);
Fibonaccif2=newFibonacci(n-2);
f1.fork();
f2.fork();
returnf1.join()+f2.join();
}
}
在 清单 4 中, Fibonacci 的返回值为 Integer 类型。其 compute()
函数首先建立两个子任务,启动子任务执行,阻塞以等待子任务的结果返回,相加后得到最终结果。同样,当子任务足够小时,通过查表得到其结果,以减小因过多地分割任务引起的性能降低。其中,我们用到了 RecursiveTask 提供的方法 fork()
和 join()
。它们分别表示:子任务的异步执行和阻塞等待结果完成。
现在剩下的工作就是将 Fibonacci 提交到 ForkJoinPool 了,我们在一个 JUnit 的 test
方法中作了如下处理:
清单 5. 将 Fibonacci 提交到 ForkJoinPool
@Test
publicvoidtestFibonacci()throwsInterruptedException,ExecutionException...{
ForkJoinTask<Integer>fjt=newFibonacci(45);
ForkJoinPoolfjpool=newForkJoinPool();
Future<Integer>result=fjpool.submit(fjt);
//dosomething
System.out.println(result.get());
}
使用 CyclicAction 来处理循环任务
CyclicAction 的用法稍微复杂一些。如果一个复杂任务需要几个线程协作完成,并且线程之间需要在某个点等待所有其他线程到达,那么我们就能方便的用 CyclicAction 和 TaskBarrier 来完成。图 2 描述了使用 CyclicAction 和 TaskBarrier 的一个典型场景。
图 2. 使用 CyclicAction 和 TaskBarrier 执行多线程任务
继承自 CyclicAction 的子类需要 TaskBarrier 为每个任务设置不同的中止条件。从 CyclicAction 继承的子类需要重载 protected void compute()
方法,定义在 barrier
的每个步骤需要执行的动作。compute()
方法将被反复执行直到 barrier
的 isTerminated()
方法返回 True
。TaskBarrier 的行为类似于 CyclicBarrier。下面,我们来看看如何使用 CyclicAction 的子类。
清单 6. 使用 CyclicAction 的子类
classConcurrentPrintextendsRecursiveAction...{
protectedvoidcompute()...{
TaskBarrierb=newTaskBarrier()...{
protectedbooleanterminate(intcycle,intregisteredParties)...{
System.out.println("Cycleis"+cycle+";"
+registeredParties+"parties");
returncycle>=10;
}
};
intn=3;
CyclicAction[]actions=newCyclicAction[n];
for(inti=0;i<n;++i)...{
finalintindex=i;
actions[i]=newCyclicAction(b)...{
protectedvoidcompute()...{
System.out.println("I'mworking"+getCycle()+""
+index);
try...{
Thread.sleep(500);
}catch(InterruptedExceptione)...{
e.printStackTrace();
}
}
};
}
for(inti=0;i<n;++i)
actions[i].fork();
for(inti=0;i<n;++i)
actions[i].join();
}
}
在 清单 6 中,CyclicAction[]
数组建立了三个任务,打印各自的工作次数和序号。而在 b.terminate()
方法中,我们设置的中止条件表示重复 10 次计算后中止。现在剩下的工作就是将 ConcurrentPrint 提交到 ForkJoinPool 了。我们可以在 ForkJoinPool 的构造函数中指定需要的线程数目,例如 ForkJoinPool(4)
就表明线程池包含 4 个线程。我们在一个 JUnit 的 test
方法中运行 ConcurrentPrint 的这个循环任务:
清单 7. 运行 ConcurrentPrint 循环任务
@Test
publicvoidtestBarrier()throwsInterruptedException,ExecutionException...{
ForkJoinTaskfjt=newConcurrentPrint();
ForkJoinPoolfjpool=newForkJoinPool(4);
fjpool.submit(fjt);
fjpool.shutdown();
}
RecursiveTask 和 CyclicAction 两个例子的完整代码如下所示:
清单 8. RecursiveTask 和 CyclicAction 两个例子的完整代码
packagetests;
importjava.util.concurrent.ExecutionException;
importjava.util.concurrent.Future;
importjsr166y.forkjoin.CyclicAction;
importjsr166y.forkjoin.ForkJoinPool;
importjsr166y.forkjoin.ForkJoinTask;
importjsr166y.forkjoin.RecursiveAction;
importjsr166y.forkjoin.RecursiveTask;
importjsr166y.forkjoin.TaskBarrier;
importorg.junit.Test;
classFibonacciextendsRecursiveTask<Integer>...{
finalintn;
Fibonacci(intn)...{
this.n=n;
}
privateintcompute(intsmall)...{
finalint[]results=...{1,1,2,3,5,8,13,21,34,55,89};
returnresults[small];
}
publicIntegercompute()...{
if(n<=10)...{
returncompute(n);
}
Fibonaccif1=newFibonacci(n-1);
Fibonaccif2=newFibonacci(n-2);
System.out.println("forknewthreadfor"+(n-1));
f1.fork();
System.out.println("forknewthreadfor"+(n-2));
f2.fork();
returnf1.join()+f2.join();
}
}
classConcurrentPrintextendsRecursiveAction...{
protectedvoidcompute()...{
TaskBarrierb=newTaskBarrier()...{
protectedbooleanterminate(intcycle,intregisteredParties)...{
System.out.println("Cycleis"+cycle+";"
+registeredParties+"parties");
returncycle>=10;
}
};
intn=3;
CyclicAction[]actions=newCyclicAction[n];
for(inti=0;i<n;++i)...{
finalintindex=i;
actions[i]=newCyclicAction(b)...{
protectedvoidcompute()...{
System.out.println("I'mworking"+getCycle()+""
+index);
try...{
Thread.sleep(500);
}catch(InterruptedExceptione)...{
e.printStackTrace();
}
}
};
}
for(inti=0;i<n;++i)
actions[i].fork();
for(inti=0;i<n;++i)
actions[i].join();
}
}
publicclassTestForkJoin...{
@Test
publicvoidtestBarrier()throwsInterruptedException,ExecutionException...{
System.out.println(" testingTaskBarrier...");
ForkJoinTaskfjt=newConcurrentPrint();
ForkJoinPoolfjpool=newForkJoinPool(4);
fjpool.submit(fjt);
fjpool.shutdown();
}
@Test
publicvoidtestFibonacci()throwsInterruptedException,ExecutionException...{
System.out.println(" testingFibonacci...");
finalintnum=14;//Fordemoonly
ForkJoinTask<Integer>fjt=newFibonacci(num);
ForkJoinPoolfjpool=newForkJoinPool();
Future<Integer>result=fjpool.submit(fjt);
//dosomething
System.out.println("Fibonacci("+num+")="+result.get());
}
}
运行以上代码,我们可以得到以下结果:
testingTaskBarrier...
I'mworking02
I'mworking00
I'mworking01
Cycleis0;3parties
I'mworking12
I'mworking10
I'mworking11
Cycleis1;3parties
I'mworking20
I'mworking21
I'mworking22
Cycleis2;3parties
I'mworking30
I'mworking32
I'mworking31
Cycleis3;3parties
I'mworking42
I'mworking40
I'mworking41
Cycleis4;3parties
I'mworking51
I'mworking50
I'mworking52
Cycleis5;3parties
I'mworking60
I'mworking62
I'mworking61
Cycleis6;3parties
I'mworking72
I'mworking70
I'mworking71
Cycleis7;3parties
I'mworking81
I'mworking80
I'mworking82
Cycleis8;3parties
I'mworking90
I'mworking92
testingFibonacci...
forknewthreadfor13
forknewthreadfor12
forknewthreadfor11
forknewthreadfor10
forknewthreadfor12
forknewthreadfor11
forknewthreadfor10
forknewthreadfor9
forknewthreadfor10
forknewthreadfor9
forknewthreadfor11
forknewthreadfor10
forknewthreadfor10
forknewthreadfor9
Fibonacci(14)=610
结论
从以上的例子中可以看到,通过使用 Fork/Join 模式,软件开发人员能够方便地利用多核平台的计算能力。尽管还没有做到对软件开发人员完全透明,Fork/Join 模式已经极大地简化了编写并发程序的琐碎工作。对于符合 Fork/Join 模式的应用,软件开发人员不再需要处理各种并行相关事务,例如同步、通信等,以难以调试而闻名的死锁和 data race 等错误也就不会出现,提升了思考问题的层次。你可以把 Fork/Join 模式看作并行版本的 Divide and Conquer 策略,仅仅关注如何划分任务和组合中间结果,将剩下的事情丢给 Fork/Join 框架。
在实际工作中利用 Fork/Join 模式,可以充分享受多核平台为应用带来的免费午餐。
参考资料
学习
获得产品和技术
-
访问 Doug Lea 的 JSR 166 站点获得最新的源代码。
- 从 Sun 公司 网站下载 Java SE 6。
分享到:
相关推荐
随着多核时代的来临,软件开发人员不得不开始关注并行编程领域。...本文将介绍使用 JDK 7 中 Fork/Join 模式的方法和其他相关改进。阅读本文之后,读者将能够独立地在软件开发中使用 Fork/Join 模式来改进程序的性能。
新特性主要涉及:对于JDK7中Fork/Join并行处理的升级;支持Lambda表达式;添加了Stream API;对于注解的拓展,加入了类型注解、重复注解;在G1回收器中支持字符串去重;内存空间中删除了永久代,引入了元空间。
1.1 JDK7新特性<一>概述 ....1.5 JDK7新特性<五> fork/join 框架 . . . . . 1.6 JDK7新特性<六> 监听文件系统的更改 1.7 JDK7新特性<七> 遍历文件树 . . . . . . . 1.8 JDK7新特性<八>异步io/AIO . . . . . . . .
在Java 7中,catch代码块得到了升级,用以在`单个catch块中处理多个异常`。如果你要捕获多个异常并且它们包含相似的代码,使用这一特性将会减少代码重复度。 ``` try { //xxx } catch (AException | BException e)...
JDK1.7新特性介绍 1. 对Java集合(Collections)的增强支持 2. 在Switch中可用String 在JDK7 的正式版本中,你可以在switch的表达式中用String类型 ...9. 增加了fork/join框架来增强对处理多核并行计算的支持
一、 Fork/Join框架的介绍 21 1、实现步骤: 22 2、工作窃取算法 22 3、分而治之 23 4、Fork/Join使用的标准范式 24 5、Fork/Join框架的异常处理 26 6、Fork/Join框架的实现原理 26 二、闭锁CountDownLatch 28 1、...
介绍了ForkJoin并发框架,供有java基础者学习,工作配合使用,附件带有PPT,介绍并发与并行区别,和ForkJoin代码范例,资源来自网络,分享分享!
7 特性,例如 和 。 它是内置的,只能与 . 随着 JDK 8 中 lambda 及其支持特性的引入,我们用 Java 构建软件的方式将发生变化。 如果您想了解几年后您的 Java 代码会是什么样子,您可以查看 Wordcounter。 与目前...
6、对多核处理器的支持:JDK 7提供了对多核处理器的支持,包括fork-join框架,以帮助开发人员更轻松地编写并行代码。 总的来说,JDK 7是一个重要的Java版本,具有许多改进的特性,提高了Java的性能,安全性和开发...
java7有一些比较重要的更新,如异常处理增加了被抑制的异常、捕获多异常、try-with-resource自动释放资源等,还有应用了G1垃圾回收器、switch可以使用String类型、泛型自动判断类型、fork/join框架把任务细分并使用...
I.背景实际项中,使并发的个case就是商品详情页的展了,个详情页的展,除了基本的商品数据之外,还有销量,地址,评价,推荐,店铺信息,装饰信息等,段伪代码来描述
同时,它提供串行和并行两种模式进行汇聚操作,并发模式能够充分利用多核处理器的优势,使用 fork/join 并行方式来拆分任务和加速处理过程。 所以说,Java 8 中首次出现的 java.util.stream 是一个函数式语言+多核...
java8 源码 SpringTree springboot mybatis mysql zookeeper ...采用工作窃取模式(当前线程任务执行完成,可窃取其他线程的执行任务),将大任务分解成多个小任务,最后将结果join 7:分布式锁 red
The directory <Java home>/sample/forkjoin/ contains samples that demonstrate the fork/join framework. The ThreadLocalRandom class eliminates contention among threads using pseudo-random numbers; see ...
1. 介绍JDK7比JDK6新增的Java语言特性.2. 介绍JDK7的库增强功能. 3. 提供使用场景,例子参考说明.4. 熟悉JDK7新特性和库对Android开发(基于Java语言)更加得心应手。 5. 熟悉JDK7新特性能方便进行Java底层开发,比如...
高性能并发编程:JDK 11引入了一批新的并发处理库和工具,如VarHandles和ForkJoin框架的改进,使得多线程编程更加高效和简洁。 安全性提升:JDK 11在安全性方面进行了大量改进,包括对密钥管理、证书验证、安全协议...
第43节Future设计模式实现(实现类似于JDK提供的Future)00:19:20分钟 | 第44节Future源码解读00:29:22分钟 | 第45节Fork/Join框架详解00:28:09分钟 | 第46节同步容器与并发容器00:18:44分钟 | 第47节并发容器...
第43节Future设计模式实现(实现类似于JDK提供的Future)00:19:20分钟 | 第44节Future源码解读00:29:22分钟 | 第45节Fork/Join框架详解00:28:09分钟 | 第46节同步容器与并发容器00:18:44分钟 | 第47节并发容器...
第43节Future设计模式实现(实现类似于JDK提供的Future)00:19:20分钟 | 第44节Future源码解读00:29:22分钟 | 第45节Fork/Join框架详解00:28:09分钟 | 第46节同步容器与并发容器00:18:44分钟 | 第47节并发容器...