深入剖析JDK源码之ForkJoinTask的fork/join!

B站影视 内地电影 2025-05-13 20:19 2

摘要:但问题是,对于分治算法来说,分解出来的一个个任务并不是独立的,而是相互依赖,一个任务的完成要依赖另一个前置任务的完成。

如果局部队列、全局中的任务全部是相互独立的,就很简单了。

但问题是,对于分治算法来说,分解出来的一个个任务并不是独立的,而是相互依赖,一个任务的完成要依赖另一个前置任务的完成。

这种依赖关系是通过ForkJoinTask中的join来体现的。回到前面使用的案例,有这样的代码。

线程在执行当前ForkJoinTask的时候,产生了left、right 两个子Task。所谓fork,是指把这两个子Task放入队列里面,join则是要等待2个子Task完成。而子Task在执行过程中,会再次产生两个子Task。如此层层嵌套,类似于递归调用,直到最底层的Task计算完成,再一级级返回。

fork的代码很简单,就是把自己放入当前线程所在的局部队列中。

图7-7 层层嵌套的join示意图

线程1 在执行 ForkJoinTask1,在执行过程中调用了 forkJoinTask2.join,所以要等ForkJoinTask2完成,线程1才能返回;

线程2在执行ForkJoinTask2,但由于调用了forkJoinTask3.join,只有等ForkJoinTask3完成后,线程2才能返回;

线程3在执行ForkJoinTask3。

结果是:线程3首先执行完,然后线程2才能执行完,最后线程1再执行完。所有的任务其实组成一个有向无环图DAG。如果线程3调用了forkJoinTask1.join,那么会形成环,造成死锁。

那么,这种层次依赖、层次通知的 DAG,在 ForkJoinTask 内部是如何实现的呢?站在ForkJoinTask的角度来看,每1个ForkJoinTask,都可能有多个线程在等待它完成,有1个线程在执行它。所以每个ForkJoinTask就是一个同步对象,线程在调用join的时候,阻塞在这个同步对象上面,执行完成之后,再通过这个同步对象通知所有等待的线程。

如图7-8所示,利用synchronized关键字和Java原生的wait/notify机制,实现了线程的等待-唤醒机制。调用join的这些线程,内部其实是调用ForkJoinTask这个对象的wait;执行该任务的Worker线程,在任务执行完毕之后,顺便调用notifyAll。

图7-8 ForkJoinTask同步对象示意图

2.ForkJoinTask的状态解析

要实现fork/join的这种线程间的同步,对应的ForkJoinTask一定是有各种状态的,这个状态变量是实现fork/join的基础。

初始时,status=0。共有五种状态,可以分为两大类:

(1)未完成:0,SIGNAL=1。即status>=0。

0为初始未完成状态,1表示有线程阻塞在任务上,等待任务完成。

(2)已完成:NORMAL=-1,CANCELLED=-2,EXCEPTIONAL=-3。即status<0。

NORMAL:正常完成;

CANCELLED:任务被取消,完成。

EXCEPTIONAL:任务在执行过程中发生异常,退出也是完成。

所以,通过判断是status>=0,还是status<0,就可知道任务是否完成,进而决定调用join的线程是否需要被阻塞。

3.join的详细实现

下面看一下代码的详细实现。

getRawResult是ForkJoinTask中的一个模板方法,分别被RecusiveAction和RecursiveTask实现,前者没有返回值,所以返回null,后者返回一个类型为V的result变量。

reportResult只是对 getRawResult的一个包装,里面多了对 CANCELLED 和EXCEPTIONAL这两种异常完成状态的处理。

阻塞主要发生在上面的doJoin函数里面。在dojoin里调用t.join的线程会阻塞,然后等待任务t执行完成,再唤醒该阻塞线程,doJoin返回。

注意:当 doJoin返回的时候,就是该任务执行完成的时候,doJoin的返回值就是任务的完成状态,也就是上面的-1、-2、-3三种状态。

先看一下externalAwaitDone,即外部线程的阻塞过程,相对简单。

可以看到,首先通过synchronized关键字对ForkJoinTask加锁,之后做了两件事:

(1)把status从0改成1(SIGNAL);

(2)调用Java原生的wait函数,阻塞该线程。

内部Worker线程的阻塞,即上面的w.joinTask(this),相比外部线程的阻塞要做更多工作。它的现不在ForkJoinTask里面,而是在ForkJoinWorkerThread里面。

上面的函数有个关键点:for里面是死循环,并且只有一个返回点,即只有在joinMe.status<0,任务完成之后才可能返回。否则会不断自旋;若自旋之后还不行,就会调用pool.tryAwaitJoin(joinMe)阻塞。

tryAwaitJoin(joinMe)的代码如下。注意:这里是 tryAwaitJoin(..),而不是 awaitJoin..),也就是说,不能保证一定会阻塞成功。如果阻塞不成功,就会返回到for循环里,可能再次进入该函数。

在上面的函数中,有三个关键部分:阻塞之前、阻塞、阻塞之后。在阻塞之前要做一些准备工作,在阻塞之后要做一些清理工作。

具体是哪些工作呢?

上面的postBlock比较简单,只是把活跃线程数加1,阻塞线程数减1。

而tryPreBlock相比更复杂:

首先是把阻塞线程数加1,如果增加失败,整个tryPreBlock就返回false;如果增加成功,根据线程池的状态ctl变量,执行各种对应的操作:

(1)若有空闲线程,从Treiber Stack栈顶取出,唤醒。

(2)若无空闲线程,有活跃线程,则只把活跃线程数减1。这是因为当前的活跃线程马上就要被阻塞了。

(3)如果既无空闲线程,也无活跃线程,意味着所有线程都处于阻塞状态。此时必须开一个新线程,以应对后续的任务。

在tryPreBlock和postBlock之间,就是实际执行阻塞的地方。上文已讲,外部线程的阻塞是通过调用ForkJoinTask的externalAwaitDone 完 成 的 ; 内 部 线 程 的 阻 塞 调 用 了 tryAwaitDone(..),代码如下。两个函数的实现基本类似,都是做了两件事:首先把status从0改到1;其次,调用Java原生的wait函数,阻塞该线程。读者可以翻阅前面的部分对两个函数进行比较。

4.join的唤醒

调用t.join之后,线程会被阻塞。接下来看另外一个线程在任务t执行完毕后如何唤醒阻塞的线程。

任务的执行发生在doExec函数里面,任务执行完成后,调用一个setCompletion(..)通知所有等待的线程。这里也做了两件事:

(1)把status置为完成状态。也就是NORMAL、CANCELLED或者EXCEPTIONAL。

(2)如果s!=0,即 s=SIGNAL,说明有线程正在等待这个任务执行完成。调用Java原生的notifyAll通知所有线程。如果s=0,说明没有线程等待这个任务,不需要通知。

来源:程序员高级码农II一点号

相关推荐