并发容器与框架——Fork/Join框架

1. Fork/Join框架概念

    Fork/Join框架是Java提供的一个用于并行执行任务的框架,它会将一个大任务分成多个小任务,并且将每个小任务的最终结果汇总得到大任务结果的框架。比如对1+2+3+····+100求和,可以分成十个子任务分别对10个数求和,最后再汇总这十个子任务的结果。

2.工作窃取算法

    工作窃取算法是指某个线程从其他任务队列里窃取任务来执行。

    假如我们可以将一个总任务分割成多个互不相干的子任务,为了减少线程的竞争,我们会将这些子任务放在不同的队列中,并为每个队列都建造一个线程执行该队列的任务,线程和队列一一对应。但是有些线程可能会很早的执行完自己队列中的所有任务,而其他线程还会处理自己拥有的队列中的任务,此时已处理完任务的线程与其等待其他线程执行任务,不如帮助其他线程一起执行剩余任务。这时他们会从其他线程的队列里窃取一个线程执行任务,所以为了避免因为工作窃取引起的两个线程之间的竞争,通常任务队列会使用双端队列。任务队列线程从头部取任务,窃取线程从尾部取任务。

    优点是充分利用线程进行并行运算,并减少了竞争,缺点就是还是存在竞争情况,比如队列中任务数为1时,还会因为创建多个线程和队列造成更多的资源消耗。

3. Fork/Join框架设计实现类

    通过前面的介绍我们可以了解到Fork/Join框架主要实现两个步骤:分割任务以及执行任务并汇总结果。

  1. 分割任务:我们需要一个Fork类来分割任务,并且要将大任务分割的足够小。
  2. 执行任务并汇总结果:分割的子任务分别放在双端队列里,然后几个启动线程分 别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程 从队列里拿数据,然后合并这些数据。

    Fork/Join框架设计了两个类来完成以上两个步骤的功能。

  1. ForkJoinTask:这是一个抽象类,主要使用该类来创建一个ForkJoin任务,它提供在任务中执行fork()和join()操作的机制。但Fork/Join框架提供了ForkJoinTask的两个抽象子类, RecursiveAction(用于没有返回结果的任务); RecursiveTask(用于有返回结果的任务)。通过继承这两个子类来创建一个ForkJoin任务。
  2. ②ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当 一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。

4.简单使用Fork/Join框架

    用Fork/Join框架来计算一个1+2+3+····+n的结果。注意,我们必须先想好如何分割任务,分割计算1+2+3+····+n时,必须至少分割为两个数字一组才能计算,但如果n过大两个数字一组会造成极大资源消耗,所以应考虑好任务分割的程度。此处仅以1+2+3+4为例。

public class TestFork_JoinFrame extends RecursiveTask<Integer>{
	private static final int THRESHOLD=2;//设置阀值,也就是任务最小分割程度为2个数字相加
	private int start;
	private int end;
	public TestFork_JoinFrame(int s,int e) {
		start=s;
		end=e;
	}
	public static void main(String[] args) {
		ForkJoinPool pool=new ForkJoinPool();
		TestFork_JoinFrame task=new TestFork_JoinFrame(1, 4);
		
		//执行任务
		Future<Integer> f=pool.submit(task);
		try {
			System.out.println("最终结果:"+f.get());
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (ExecutionException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	@Override
	protected Integer compute() {
		int sum=0;
		boolean canCompute=(end-start)<=THRESHOLD;//如果已经是最小分割值,则进行运算,否则继续分割
		if(canCompute){
			for(int i=start;i<=end;i++){
				sum+=i;
			}
		}else{
			//分隔成两个子任务
			int middle=(start+end)/2;
			TestFork_JoinFrame leftTask=new TestFork_JoinFrame(start, middle);
			TestFork_JoinFrame rightTask=new TestFork_JoinFrame(middle+1, end);
			//执行子任务
			leftTask.fork();
			rightTask.fork();
			//等待子任务完成并获取结果汇总
			sum=leftTask.join()+rightTask.join();
		}
		return sum;
	}
}

5.异常处理

    ForkJoinTask在执行过程中可能会抛出异常,但是与普通线程任务一样,我们无法在主线程中对其进行捕获,所以以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被 取消了,并且可以通过ForkJoinTask的getException方法获取异常。getException方法返回Throwable对象,如果任务被取消了则返回CancellationException。如 果任务没有完成或者没有抛出异常则返回null。

public class TestFork_JoinFrame extends RecursiveTask<Integer>{
	private static final int THRESHOLD=2;//设置阀值,也就是任务最小分割程度为2个数字相加
	private int start;
	private int end;
	public TestFork_JoinFrame(int s,int e) {
		start=s;
		end=e;
	}
	public static void main(String[] args) {
		ForkJoinPool pool=new ForkJoinPool();
		TestFork_JoinFrame task=new TestFork_JoinFrame(1, 4);
		
		//执行任务
		Future<Integer> f=pool.submit(task);
		try {
			if(task.isCompletedAbnormally()){
				System.out.println(task.getException());
			}else{
				System.out.println("最终结果:"+f.get());
			}
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (ExecutionException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	@Override
	protected Integer compute() {
		int sum=0;
		boolean canCompute=(end-start)<=THRESHOLD;//如果已经是最小分割值,则进行运算,否则继续分割
		if(canCompute){
			for(int i=start;i<=end;i++){
				sum+=i;
			}
		}else{
			//分隔成两个子任务
			int middle=(start+end)/2;
			TestFork_JoinFrame leftTask=new TestFork_JoinFrame(start, middle);
			TestFork_JoinFrame rightTask=new TestFork_JoinFrame(middle+1, end);
			//执行子任务
			leftTask.fork();
			rightTask.fork();
			//等待子任务完成并获取结果汇总
			sum=leftTask.join()+rightTask.join();
		}
		return sum;
	}
}
	}

6.框架实现原理

    ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责将存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。

   1.ForkJoinTask的fork()方法原理:调用ForkJoinTask的fork方法时,程序会调用ForkJoinWorkerThread的pushTask方法异步地执行这个任务,然后立即返回结果。源码如下

public final ForkJoinTask<V> fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }

      pushTask方法把当前任务存放在ForkJoinTask数组队列里。然后再调用ForkJoinPool的 signalWork()方法唤醒或创建一个工作线程来执行任务。源码码如下。

 final void push(ForkJoinTask<?> task) {
            ForkJoinTask<?>[] a; ForkJoinPool p;
            int b = base, s = top, n;
            if ((a = array) != null) {    // ignore if queue removed
                int m = a.length - 1;     // fenced write for task visibility
                U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
                U.putOrderedInt(this, QTOP, s + 1);
                if ((n = s - b) <= 1) {
                    if ((p = pool) != null)
                        p.signalWork(p.workQueues, this);
                }
                else if (n >= m)
                    growArray();
            }
        }

    2.ForkJoinTask的join方法实现原理:Join方法的主要作用是阻塞当前线程并等待获取结果。源码如下

public final V join() {
        int s;
        if ((s = doJoin() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
    }
 private int doJoin() {
        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
        return (s = status) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (w = (wt = (ForkJoinWorkerThread)t).workQueue).
            tryUnpush(this) && (s = doExec()) < 0 ? s :
            wt.pool.awaitJoin(w, this, 0L) :
            externalAwaitDone();
    }

        首先,它调用了doJoin()方法,通过doJoin()方法得到当前任务的状态来判断返回什么结 果,任务状态有4种:已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和出现异常 (EXCEPTIONAL)。如果任务状态是已完成,则直接返回任务结果。如果任务状态是被取消,则直接抛出CancellationException。如果任务状态是抛出异常,则直接抛出对应的异常。

        doJoin()方法里,首先通过查看任务的状态,看任务是否已经执行完成,如果执行完成, 则直接返回任务状态;如果没有执行完,则从任务数组里取出任务并执行。如果任务顺利执行完成,则设置任务状态为NORMAL,如果出现异常,则记录异常,并将任务状态设置为 EXCEPTIONAL。

上一篇:深入学习Lock锁(3)——重入锁ReentrantLock


下一篇:零售ERP系统方案选型―IT只是一个工具