`
m635674608
  • 浏览: 4929108 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

Java7 ForkJoin 源码分析

    博客分类:
  • java
 
阅读更多

ForkJoin框架是在JDK7中新追加的ExecutorService实现 与其它的ExecutorService实现的最大区别在于ForkJoin不是基于 java.util.concurrent.ThreadPoolExecutor实现的线程重用 而是一个基于”work stealing”设计实现的低并发冲突、高性能的并发框架

涉及到的类简介

java.util.concurrent.ForkJoinPool

该类实现了java.util.concurrent.ExecutorService接口 负责维护全部的工作线程 并接收调用者分配的task 本身持有一个全局的task队列

java.util.concurrent.ForkJoinWorkerThread

具体执行task的工作线程 在创建之后注册到具体的pool

java.util.concurrent.ForkJoinTask

工作线程中执行的task 实现了java.util.concurrent.Future接口 可以在实现中fork子task 并join到自己fork的task运行结束 所以要求实现该类获取/设置中间结果的方法


关键属性简介

ForkJoinTask<?>[] submissionQueue;// java.util.concurrent.ForkJoinPool// Pool的task队列 初始容量为8192 由于submissionQueue是环形队列 而作者使用了特殊的求余算法 导致的容量必须为2的幂次方 // 通过`java.util.concurrent.ForkJoinPool#growSubmissionQueue()`方法拓展 调用者线程submit的task都会提交到这个队列// 然后唤醒worker去该队列steal task 如果在worker线程调用submit提交直接提交调用提交方法的worker线程的task队列中ForkJoinTask<?>[] queue;// java.util.concurrent.ForkJoinWorkerThread   // Worker的task队列 初始容量为8192 基本上与submissionQueue的维护方式一样 不过只能通过在worker线程调用fork()才能将// task添加到这个队列中volatileint queueBase;// java.util.concurrent.ForkJoinPool & java.util.concurrent.ForkJoinWorkerThread// 队列尾部索引 task窃取时会更改此值 由于几个线程可能同时访问 所以修饰符是volatileint queueTop;// java.util.concurrent.ForkJoinPool & java.util.concurrent.ForkJoinWorkerThread// task push与pop时会更改的索引 根据上面对队列属性的描述会发觉所有入队操作都是由"一个线程"来完成的(调用者线程往pool中// push task 而worker线程自己给自己fork task) 所以其是非线程安全的 另 submissionLock 保证了向pool中提交task的安全性// 但是这个保护只是防止调用者作死而存在的(比如并发往pool中提交task) 如果保证调用者单线程入数据 则不需要这个锁volatileint status;// java.util.concurrent.ForkJoinTask// task执行的状态 对应本类的四个状态常量 已完成(NORMAL),被取消(CANCELLED),信号(SIGNAL)和出现异常(EXCEPTIONAL)

入口

由于方法的分支很多 所以前面讲一些必然会走的方法 在后边从调用者角度和worker的角度来分析代码
请先参考有关forkjoin的例子

在例子中 我通过ForkJoinPool pool = new ForkJoinPool();方法创建了pool实例 随后通过pool.submit(new ForkJoinFastSort(nums, 0, 8));方法向pool中提交了第一个task

java.util.concurrent.ForkJoinPool#forkOrSubmit(ForkJoinTask task)

private<T>void forkOrSubmit(ForkJoinTask<T> task){ForkJoinWorkerThread w;Thread t =Thread.currentThread();if(shutdown)thrownewRejectedExecutionException();// 已经被shutdown后则不再接收新的task 与传统线程池不同的是task最大数量是由ForkJoin自行管理的 外部不可更改// 环形队列把这点限制死了 不过也不错if((t instanceofForkJoinWorkerThread)&&(w =(ForkJoinWorkerThread)t).pool ==this)
        w.pushTask(task);// 如果提交task的线程是worker线程并且属于当前的pool 则直接将task添加到这个worker中// 这个方法由于面向具体的线程 所以不需要锁else
        addSubmission(task);// 否则将task添加到pool队列中}

java.util.concurrent.ForkJoinPool#invoke(ForkJoinTask task)

public<T> T invoke(ForkJoinTask<T> task){Thread t =Thread.currentThread();if(task ==null)thrownewNullPointerException();if(shutdown)thrownewRejectedExecutionException();if((t instanceofForkJoinWorkerThread)&&((ForkJoinWorkerThread)t).pool ==this)return task.invoke();// 如果提交task的线程对象是当前pool中的worker 则直接让当前worker自己处理taskelse{
        addSubmission(task);// 所有非worker提交的task全部由pool保存return task.join();//等待task执行完毕}}// 从我的代码例子来看 他肯定会走后面的分支 因为提交task的是调用者线程 而不是worker线程

调用者线程流程

java.util.concurrent.ForkJoinPool#addSubmission(ForkJoinTask<?> t)

privatevoid addSubmission(ForkJoinTask<?> t){finalReentrantLocklock=this.submissionLock;// 这里这个锁是防止调用者乱搞 task的生成大多都在worker中发生 所以调用这个方法的只有调用者lock.lock();try{ForkJoinTask<?>[] q;int s, m;if((q = submissionQueue)!=null){long u =(((s = queueTop)&(m = q.length-1))<< ASHIFT)+ABASE;// 获取top对应的数组位置(内存地址)
            UNSAFE.putOrderedObject(q, u, t);// 将新的task添加到队列中
            queueTop = s +1;if(s - queueBase == m)
                growSubmissionQueue();// 在添加之后检查队列长度是否到极限 如果是则扩容}}finally{lock.unlock();}
    signalWork();// 唤醒工作线程// 方法结尾处会唤醒(或创建) worker线程 (创建用addWorker()) 而worker线程被创建之后 就会不断的调用scan方法去// 窃取其他worker或pool中的task 直到全部task结束 这里需要特别说明的地方有2点// 1. (s = queueTop) & (m = q.length-1) 返回的是队列下标(queueTop或queueBase)在环形队列数组中的真实位置// 2. (((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE; 通过直接访问指定内存地址来替换和获取元素// Java会为了具体的类生成对应该类的数组类型 它本身也是个类 而ABASE就是这个类的对象头的偏移量// ASHIFT也是一个偏移量 代表着数组元素类型的占用长度 原始类型就是原始类型本身占用字节的长度// 而引用类型就是地址长度 假设ForkJoinTask的引用地址长度为4 想在找到元素后获取整个元素就需要这个偏移量// 整个过程中queueBase和queueTop没有发生任何变化 因为是根据queueBase和mask(即q.length-1)求具体地址的 // 扩容的时候mask会随着长度变动 所以不需要任何更改// 以上描述不再复述}

java.util.concurrent.ForkJoinPool#addWorker()

privatevoid addWorker(){Throwable ex =null;ForkJoinWorkerThread t =null;try{
        t = factory.newThread(this);// 创建worker线程 下面的代码不解释了 只是根据发生异常的情况来决定是否告知调用者}catch(Throwable e){
        ex = e;}if(t ==null){// null or exceptional factory returnlong c;// adjust countsdo{}while(!UNSAFE.compareAndSwapLong
                     (this, ctlOffset, c = ctl,(((c - AC_UNIT)& AC_MASK)|((c - TC_UNIT)& TC_MASK)|(c &~(AC_MASK|TC_MASK)))));// Propagate exception if originating from an external callerif(!tryTerminate(false)&& ex !=null&&!(Thread.currentThread()instanceofForkJoinWorkerThread))
            UNSAFE.throwException(ex);}else
        t.start();// 启动线程}

java.util.concurrent.ForkJoinPool#growSubmissionQueue()

/**
     * Creates or doubles submissionQueue array.
     * Basically identical to ForkJoinWorkerThread version.
     */privatevoid growSubmissionQueue(){ForkJoinTask[] oldQ = submissionQueue;int size = oldQ !=null? oldQ.length <<1: INITIAL_QUEUE_CAPACITY;if(size > MAXIMUM_QUEUE_CAPACITY)thrownewRejectedExecutionException("Queue capacity exceeded");// 再说一次 ForkJoin的task总数是自己维护的 不能通过外界更改if(size < INITIAL_QUEUE_CAPACITY)
        size = INITIAL_QUEUE_CAPACITY;ForkJoinTask<?>[] q = submissionQueue =newForkJoinTask<?>[size];int mask = size -1;int top = queueTop;int oldMask;if(oldQ !=null&&(oldMask = oldQ.length -1)>=0){// 如果旧队列中有数据 则将其数据填充到新的队列数组中 for(int b = queueBase; b != top;++b){long u =((b & oldMask)<< ASHIFT)+ ABASE;// 根据旧的mask获取元素在旧队列中的位置Object x = UNSAFE.getObjectVolatile(oldQ, u);// 获取元素// 获取后判断旧的队列中是否存在此元素 如果不存在则取消将其加入新的队列 因为在两个步骤之间 // task可能已经被执行过了 (其他线程或许会持有旧队列数组的引用) 反之 如果存在 则加入到新的队列中if(x !=null&& UNSAFE.compareAndSwapObject(oldQ, u, x,null))
                UNSAFE.putObjectVolatile
                    (q,((b & mask)<< ASHIFT)+ ABASE, x);}}}

worker线程流程

java.util.concurrent.ForkJoinWorkerThread#run()

publicvoid run(){Throwable exception =null;try{
        onStart();// 首先初始化当前worker线程
        pool.work(this);// 调用pool将自己注册到pool中并表示自己可以开始工作}catch(Throwable ex){
        exception = ex;}finally{
        onTermination(exception);// 这个方法主要是将当前线程置为终结状态 在work方法种可以看到线程在获取task的时候是根据这个状态轮询的 // 一旦设置为false 就不再接收其他的task 另外也记录了这个线程发生的异常}}

java.util.concurrent.ForkJoinWorkerThread#onStart()

protectedvoid onStart(){
    queue =newForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];// 初始化队列int r = pool.workerSeedGenerator.nextInt();// 原谅我实在是不知道这个种子生成器的作用 呵呵呵呵..
    seed =(r ==0)?1: r;//  must be nonzero}

java.util.concurrent.ForkJoinPool#work(ForkJoinWorkerThread w)

finalvoid work(ForkJoinWorkerThread w){boolean swept =false;// true on empty scanslong c;// 每个worker线程基本上在调用到这个方法后 就会一直循环 重复着steal task、执行线程本身队列task的过程 // 除非在执行task过程中发生异常 或者pool被shutdown 或者按需调整工作线程总数 导致该线程被回收while(!w.terminate &&(int)(c = ctl)>=0){// 这个terminate就是上面onTermination(exeception)方法设置的状态位int a;// active countif(!swept &&(a =(int)(c >> AC_SHIFT))<=0)
            swept = scan(w, a);// 扫描elseif(tryAwaitWork(w, c))
            swept =false;}}

java.util.concurrent.ForkJoinPool#scan(ForkJoinWorkerThread w, int a)

privateboolean scan(ForkJoinWorkerThread w,int a){// 关于scan的代码在下看不懂的实在太多 所以只说几处主要的 // 参数传递进来的thread对象是调用该方法的worker 这个方法如果返回false// 则会让外面的循环继续调用scan 返回true则会调用tryAwaitWork() 也就是等待task 迟早还会调用这个方法int g = scanGuard;// mask 0 avoids useless scans if only one activeint m =(parallelism ==1- a && blockedCount ==0)?0: g & SMASK;ForkJoinWorkerThread[] ws = workers;if(ws ==null|| ws.length <= m)// staleness checkreturnfalse;// 安全检查 for(int r = w.seed, k = r, j =-(m + m); j <= m + m;++j){ForkJoinTask<?> t;ForkJoinTask<?>[] q;int b, i;ForkJoinWorkerThread v = ws[k & m];// 以下部分是task窃取的一部分 新启动的worker线程会通过work方法调用到scan方法并且传递自己的对象(也就是w)// 到方法中 在下面的代码中他会尝试从现存的worker的队列中窃取一个task ForkJoin框架是在运行的期间不断的分裂// 在ForkJoinTask的实现类里调用fork()就可能会创建新的worker 并走到这个分支 所以 在worker数不足的情况下// 窃取的几率很高 但是当worker数稳定后 每个worker会给自己分配task 而不是再这样窃取其他线程的taskif(v !=null&&(b = v.queueBase)!= v.queueTop &&(q = v.queue)!=null&&(i =(q.length -1)& b)>=0){long u =(i << ASHIFT)+ ABASE;if((t = q[i])!=null&& v.queueBase == b &&
                UNSAFE.compareAndSwapObject(q, u, t,null)){int d =(v.queueBase = b +1)- v.queueTop;// 看 他窃取了 他窃取了!
                v.stealHint = w.poolIndex;if(d !=0)
                    signalWork();// propagate if nonempty
                w.execTask(t);}

            r ^= r <<13; r ^= r >>>17; w.seed = r ^(r <<5);returnfalse;// store next seed}elseif(j <0){// xorshift
            r ^= r <<13; r ^= r >>>17; k = r ^= r <<5;}else++k;}if(scanGuard != g)// staleness checkreturnfalse;else{// try to take submission// 第一个task并不是直接被分配到worker的线程里(因为创建task的并不是worker本身) 而是直接进入pool的队列中 // 然后调用者线程会主动创建一个新的worker 在上面的逻辑(说实话我看不懂上边的逻辑) 中无法从其他worker中// 窃取到task的时候 或者是其他worker分配的task已经执行完毕后 再从pool的队列中获取taskForkJoinTask<?> t;ForkJoinTask<?>[] q;int b, i;if((b = queueBase)!= queueTop &&(q = submissionQueue)!=null&&(i =(q.length -1)& b)>=0){long u =(i << ASHIFT)+ ABASE;if((t = q[i])!=null&& queueBase == b &&
                UNSAFE.compareAndSwapObject(q, u, t,null)){
                queueBase = b +1;
                w.execTask(t);}returnfalse;}// 下面那行注释并不是我加的..returntrue;// all queues empty}// 无论走了哪个分支 最终都会调用 w.execTask(t); (没task的情况下除外)}

java.util.concurrent.ForkJoinWorkerThread#execTask(ForkJoinTask<?> t)

finalvoid execTask(ForkJoinTask<?> t){
    currentSteal = t;// 当前窃取到的task 在join的时候有用for(;;){if(t !=null)
            t.doExec();// 执行具体的task 比如我的代码里 就是排序的过程 通常在编写forkjoin的时候 都会在运行期间分裂出其他taskif(queueTop == queueBase)// 判断自己的队列task是否已经全部完成 如果是则退出方法 // 这里需要特殊说明的是 只有当前线程会给自己的队列添加task 也就是说 当前线程如果不再fork task// queueTop就不会发生变化 所以这个方法的判断是安全的break;
        t = locallyFifo ? locallyDeqTask(): popTask();// 弹出task// 这里唯一需要注意的是 ForkJoin支持FIFO(在外面设置) 如果设置了FIFO 就会跟其它"steal task"的线程一起// 从queueBase开始获取task 顺带一提 locallyDeqTask有两个版本 一个是针对其他线程的steal task实现// 另外一个是当前线程的实现}++stealCount;
    currentSteal =null;}

写到这里 需要切回我的代码例子 在我的代码例子中 会在运行期间调用task对象的fork方法 而fork方法会获取当前worker线程的对象并调用该对象的pushTask方法

java.util.concurrent.ForkJoinWorkerThread#pushTask(ForkJoinTask t)

finalvoid pushTask(ForkJoinTask t){ForkJoinTask[] q;int s, m;if((q = queue)!=null){// ignore if queue removed long u =(((s = queueTop)&(m = q.length -1))<< ASHIFT)+ ABASE; UNSAFE.putOrderedObject(q, u, t);// 将task放入当前线程的队列中 
        queueTop = s +1;// or use putOrderedIntif((s -= queueBase)<=2) 
            pool.signalWork();// 如果队列中的未处理task小于2 则唤醒新的
            worker 
        elseif(s == m) 
            growQueue();// 扩容 }}// 关于(s -= queueBase) <= 2 我并没有理解 所以只是猜测成根据这个属性来判断forkjoin是否处于运行初期

java.util.concurrent.ForkJoinWorkerThread#popTask()

privateForkJoinTask<?> popTask(){int m;ForkJoinTask<?>[] q = queue;if(q !=null&&(m = q.length -1)>=0){for(int s;(s = queueTop)!= queueBase;){// 轮循自己的队列 获取没有被窃取的task 是出栈的过程 // 通过steal task的索引和当前worker自己弹出、压入队列的索引来判断是否有剩余元素int i = m &--s;long u =(i << ASHIFT)+ ABASE;// raw offsetForkJoinTask<?> t = q[i];if(t ==null)// 获取task的时候如果task已经为空 则被其他线程窃取掉 此时break 将剩下的判断操作委托给外围的execTask处理break;if(UNSAFE.compareAndSwapObject(q, u, t,null)){// 置为null 防止steal task的线程重复执行task
                queueTop = s;// or putOrderedInt // 更新topreturn t;}}}returnnull;}

java.util.concurrent.ForkJoinWorkerThread#pollTask()

finalForkJoinTask<?> pollTask(){ForkJoinWorkerThread[] ws;ForkJoinTask<?> t = pollLocalTask();// 尝试从自己的队列中获取taskif(t !=null||(ws = pool.workers)==null)return t;int n = ws.length;// cheap version of FJP.scanint steps = n <<1;// 这里限制了尝试从其他workersteal task的次数 看起来貌似是worker容器长度的2倍int r = nextSeed();int i =0;while(i < steps){// 从其他的worker的task队列中steal taskForkJoinWorkerThread w = ws[(i+++ r)&(n -1)];// 先判断具体的worker队列中是否存在task 不存在则换到其他的worker if(w !=null&& w.queueBase != w.queueTop && w.queue !=null){// 如果窃取到task则返回 否则重置steps  if((t = w.deqTask())!=null)return t;
            i =0;}}returnnull;}

到此 流程已描述完毕

获取执行结果

在样例代码中 调用者线程最后调用了task对象的join()方法等待结果返回

java.util.concurrent.ForkJoinTask#join()

publicfinal V join(){if(doJoin()!= NORMAL)// doExec的代码我就不贴了.. 就是调用子类实现的exec方法然后根据运行状况 设置不同的状态位// (比如发生异常设置一个状态 比如task取消是另外一个状态..)return reportResult();elsereturn getRawResult();// 状态为NORMAL 的时候说明执行完成 直接返回结果即可}

java.util.concurrent.ForkJoinTask#doJoin()

privateint doJoin(){Thread t;ForkJoinWorkerThread w;int s;boolean completed;if((t =Thread.currentThread())instanceofForkJoinWorkerThread){// 如果调用join方法的线程为worker 则尝试让worker本身执行if((s = status)<0)// 判断task是否已经得到结果 得到结果则直接返回return s;if((w =(ForkJoinWorkerThread)t).unpushTask(this)){// unpushTask会检查queueTop位置上是不是当前task 如果是则直接执行try{
                completed =exec();// 这里的流程基本上跟doExec()一样}catch(Throwable rex){return setExceptionalCompletion(rex);}if(completed)return setCompletion(NORMAL);}return w.joinTask(this);// 调用当前worker线程的joinTask等待其安排task执行}elsereturn externalAwaitDone();// 否则object wait}

java.util.concurrent.ForkJoinWorkerThread#joinTask(ForkJoinTask<?> joinMe)

finalint joinTask(ForkJoinTask<?> joinMe){// 这个替换有点类似方法调用 假设worker在执行task的时候执行了join() 被join的task同样也调用了join() 所以 像个栈..ForkJoinTask<?> prevJoin = currentJoin;
    currentJoin = joinMe;for(int s, retries = MAX_HELP;;){if((s = joinMe.status)<0){
            currentJoin = prevJoin;// 所以 当前task的join一旦有了结果 则将currentJoin替换回之前的taskreturn s;}if(retries >0){if(queueTop != queueBase){if(!localHelpJoinTask(joinMe)) 
                    retries =0;// cannot help// 检查当前queueTop位置上的task是否是被join的task 如果不是 并且task已经完成 则直接将retries置为0 // 进入下一轮循环 如果queueTop位置就是当前task 则执行 返回true后会重新进入for循环并返回执行结果}elseif(retries == MAX_HELP >>>1){--retries;// check uncommon case if(tryDeqAndExec(joinMe)>=0)// 当前task队列中已经没有task 明显要被join的task已经被窃取 所以去其他worker的将task窃取回来并执行Thread.yield();// for politeness}else
                retries = helpJoinTask(joinMe)? MAX_HELP : retries -1;// 帮助其他的worker完成task 走到这个分支只能去steal task了// (steal task的代码已经示范过多次 这里就不示范了..)}else{
            retries = MAX_HELP;// restart if not done
            pool.tryAwaitJoin(joinMe);}}// 这里需要注意的是 如果当前worker的task没有执行完毕 绝对不会去其他workersteal task// 其次 每次循环的第一个判断是判断调用join()的task的状态是否为已完成 也就是说在这期间如果task已经被处理完了 // 则直接退出方法 不再尝试steal task 而这个task可能是由子task执行的 也可能是当前线程自己执行的 }



http://dev.cmcm.com/archives/87
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics