Future实现分析

Future实现分析

还是先来小demo

 @Test
    public void testFuture() throws ExecutionException, InterruptedException {
        FutureTask<String> test = new FutureTask<>(() -> {
            logMessage("要睡了");

            TimeUnit.SECONDS.sleep(5);
            logMessage("睡醒了");
            return "1";
        });
        Thread thread = new Thread(test);
        thread.start();

        logMessage(test.get());

    }

    public static void logMessage(Object o) {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println(simpleDateFormat.format(new Date()) + "-" + Thread.currentThread().getName() + ":" + o);
    }

FutureTask解析

Future实现分析

还是从他的继承关系来看,Future接口是什么?Runnable我知道,但是Future是什么?

/*
 * @see FutureTask
 * @see Executor
 * @since 1.5
 * @author Doug Lea
 * @param <V> The result type returned by this Future's {@code get} method
 * Future表示异步执行。提供了几个方法,用于,检查计算是否完成、等待计算完成以及获取计算结果。
 * 主要记住,他就是用来用来执行异步任务的。并且提供了一些有用的api
 */
public interface Future<V> {

    /*
     1. 尝试取消任务的执行,如果这个任务还没有执行,直接取消,这个任务就不会再运行了
     2. 如果任务已经完成,或者已经被取消了,或者说因为一些其他的原因导致不能取消,返回false
     3. 如果任务已经开始运行,mayInterruptIfRunning参数表示是否要中断执行这个任务的线程来停止任务
     
     在这个方法调用之后,后面调用isDone的方法就会返回true,如果这方法返回true,后序调用isCancelled方法也会返回true
    */
    boolean cancel(boolean mayInterruptIfRunning);

    /*
     判断这个任务是否被取消掉
    */
    boolean isCancelled();

    /**
     如果这个任务完成,或者在完成的时候被正常的终止,或者异常,或者取消,这些情况下,都会返回true
     */
    boolean isDone();

    /**
    等待结果,一直等
     */
    V get() throws InterruptedException, ExecutionException;

    /**
      等待结果,增加超时时间
     */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Future表示异步获取结果,规定了一些方法,还得看具体的实现是怎么做的。所以,这里直接看FutureTask

看看属性

   // 表示当前这个异步计算的状态, 
   private volatile int state;

 // 下面都是一些不同状态的码值。
    private static final int NEW          = 0; //新建
    private static final int COMPLETING   = 1; // 完成中
    private static final int NORMAL       = 2; // 正常
    private static final int EXCEPTIONAL  = 3;// 异常
    private static final int CANCELLED    = 4;// 取消
    private static final int INTERRUPTING = 5;// 中断中
    private static final int INTERRUPTED  = 6;// 中断

   // 等会要运行的callable
    private Callable<V> callable;

 //结果存放的地方,并且不是volatile修饰的。是为了保护读和写的状态
     // 在有结果的时候,这就是结果,如果异常的话,这就是异常。所以在Future里面异常的话,得通过get方法获取到
 // 这让我想到了线程池,submit方法也会返回一个future,线程池异常异常了会怎么办?
    private Object outcome; 
   // 运行的线程
    private volatile Thread runner;
  // 等待的节点队列, 
    private volatile WaitNode waiters;

这些属性下面的代码和文章里面会不间断的引用。

从构造方法开始

  1. public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
    

    这种构造方法肯定是见过的最多的,只是一个赋值操作,将传递进来的callable赋值,并且将state变为NEW。

  2.   public FutureTask(Runnable runnable, V result) {
            this.callable = Executors.callable(runnable, result);
            this.state = NEW;       // ensure visibility of callable
        }
    

    这我可没用过,之前也没在意过,这里确实很特殊。有一个resutl,并且将这个两个参数包装成Executors.callable(runnable, result);,那就看看这个方法。

      static final class RunnableAdapter<T> implements Callable<T> {
            final Runnable task;
            final T result;
            RunnableAdapter(Runnable task, T result) {
                this.task = task;
                this.result = result;
            }
            public T call() {
                task.run();
                return result;
            }
        }
    

    运行原来的task,返回的是result,那是不是意味着,传递一个Runnable,对result一顿操作,最后又将这个result返回来。比如下面的这个样子

      @Test
        public void testFuture() throws ExecutionException, InterruptedException {
            HashMap<String, String> map = new HashMap<>();
            FutureTask<Map> test = new FutureTask<>(() -> {
                logMessage("要睡了");
                map.put("a","a");
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                logMessage("睡醒了");
            },map);
            
            Thread thread = new Thread(test);
            thread.start();
            logMessage(test.get());
        }
    

Future实现分析

传递一个空map进去,然后一顿操作,返回返回。么的一点点的问题。

要知道,他还是继承了Runnable接口,并且传递给了Thread类,那么Runnable肯定要看run方法,并且Future肯定实在run方法里面做手脚的。

run方法分析

 public void run() {
       //如果当前的状态不是NEw或者替换cas将 runner变为当前线程失败,直接返回,不需要运行。
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
           // 拿到callable
            Callable<V> c = callable;
            // 准备开始运行了
            if (c != null && state == NEW) {
               // 接受结果
                V result;
                boolean ran;
                try {
                  // 运行方法,
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                   //如果出现了任何的异常,将result变为null,将ran变为false。
                    result = null;
                    ran = false;
                   //
                    setException(ex);
                }
              // 如果除了异常,就不会走到这个方法,
               // 正常的话肯定是这个方法,这里肯定是设置结果给outcome。
                if (ran)
                    set(result);
            }
        } finally {
            // 这里用的就不是cas操作了,直接变为null,防止再次调用。
            runner = null;
            // 在runner变为null,之后,重新读取state的值。
            // 这里为啥要再次读取,state是volatile修饰的,难道说这里是为了防止发生指令重排序。
            int s = state;
            // 如果说状态是 INTERRUPTING之前的状态,(new,COMPLETING,NORMAL,EXCEPTIONAL,CANCELLED,INTERRUPTING)只要不是中断。
            if (s >= INTERRUPTING)
              
              // 在看看这个方法是干了什么事情?
                handlePossibleCancellationInterrupt(s);
        }
    }
  

setException (运行出现异常,设置值,并且更改state)

  // 出现异常了之后,就调用这个方法,这个操作也很清晰了,
   protected void setException(Throwable t) {
      // 使用cas操作将当前的future的状态变为完成中
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
           // 将异常信息赋值给outcome
            outcome = t;
            //将state变为exception
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
           // 再看看这个干了什么事情
            finishCompletion();
        }
    }
 

handlePossibleCancellationInterrupt(如果state为INTERRUPTING,让出cpu的使用权。)

 //这个方法的作用没有看懂。
 // 看看源码上面的注释写的把
 // 这个方法是为了确保调用cancel(true)时候产生的中断,只有在task run或者task runAndReset的时候才能投递。
 private void handlePossibleCancellationInterrupt(int s) {
        // 如果说当前future的状态是INTERRUPTING,就说明,当前的future正在中断中,那么直接就等等。
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
               // 当前线程放弃执行,让出cpu的使用权。
                Thread.yield(); 

        // assert state == INTERRUPTED;

        // We want to clear any interrupt we may have received from
        // cancel(true).  However, it is permissible to use interrupts
        // as an independent mechanism for a task to communicate with
        // its caller, and there is no way to clear only the
        // cancellation interrupt.
        //
        // Thread.interrupted();
    } 

set(任务执行成功。设置值,并且更改状态)

  // 还是同样的操作,state先变为COMPLETING,最后变为NORMAL,设置值,之后调用finishCompletion方法,唤醒等待。
  protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

finishCompletion(唤醒等待的节点,并且移除等待队列)

  // 看看这个方法里面做了什么事情。
 // 看了这个操作是拿到了等待的所有线程,并且唤醒移除他们,并且调用done方法。
  private void finishCompletion() {
        // assert state > COMPLETING;
    
     // 拿到等待队列,从头开始,循环遍历,,unpark,,移除是从头开始的,
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                  
                    // 移除,help gc,
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        //这个方法是留给拓展的
        done();

        callable = null;        // to reduce footprint
    }

总结:

Future里面本质还是一个Runnable,在run方法里面,会将当前线程设置为执行future的线程。调用callable的call方法,在出现异常的时候会将state先变为完成中,然后将异常设置为结果,之后将state变为异常。并且唤醒等待的线程(从头开始循环,唤醒),从等待队列里面移除掉(从头开始移除)。如果没有异常,state先是完成中,设置值,最后变为normal(正常)。

上面执行之后,在最后,将 runner 变为null,并且判断如果状态是中断中(INTERRUPTING)当前线程等待,等等。

问题:

  1. 在finally方法里面,为什么要再次判断获取state。

    这个我着实没有看懂,是不是为了防止指令重排序。

  2. 在finally方法里面,为什么要在这里面将 runner 变为null

    因为 finally 在方法的最后执行,所以,不管成功还是失败,这里都会将 runner 变为null。能保证任务执行一次之后,就将当前执行的线程赋值为null

  3. handlePossibleCancellationInterrupt 方法是在什么样具体的场景下面才会出现。

get 方法分析

 // get方法,如果state<=  COMPLETING,说明还没有运行结束。就要等待,如果不是,就report
public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

awaitDone分析

 // 这个方法就是将当前线程添加到等待队列里面去,注意,WaitNode是用 volatile修饰的。我猜肯定cas的操作。
// 需要注意的是,这个方法返回的state,真正获取值的操作是通过s的不同的值,在report方法里面获取的。
private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
  
      // 死循环
        for (;;) {
             // 当前的线程是否中断了。interrupted的方法是属于Thread类的,并且这个方法会清楚掉thread intercept states的状态。
           // 如果当前线程中断过,
            if (Thread.interrupted()) {
               // 移除掉,并且抛出异常
                removeWaiter(q);
                throw new InterruptedException();
            }
            
           // future已经完成了。直接返回s
            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // 就再等等,放弃cpu的执行权。
                Thread.yield();
          
            //如果q是null,这就是第一次循环吗,到这里说明他 stats <= COMPLETING,说明任务还没有执行或者才new出来,
           // 构建等待节点,连接队列,入队,
           // 第一次循环来,q肯定是null,所以构建等待队列
           // 第一次循环就结束了,
           // 第二次循环来的时候就不会q就!=null了
            else if (q == null)
               //如果说第一次循环构建了q,然后发送中断了,将他传递到removeWaiter方法里面,会造成无效的链表的查找。
                q = new WaitNode();
          
           // 第二次循环来了就开始入队了。
            else if (!queued)
               //这里采用的头插入法。
               // 如果插入成功,queued就是true,并且第三次循环来的时候就不会走到这里了。
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
           // 在入队之后,如果有time,就unpark住当前线程,将当前线程阻塞在当前对象上面,
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
               //堵塞在当前对象上面
                LockSupport.parkNanos(this, nanos);
            }
            else
               // 一直堵塞,等待unpark。
                LockSupport.park(this);
        }
    }

report方法分析

 //通过传递进来的状态判断,如果当前的状态是 NORMAL,
private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
   // 如果取消,中断中,中断,抛异常
        if (s >= CANCELLED)
            throw new CancellationException();
  
  
      // 如果不是,就抛出ExecutionException,在ExecutionException里面包装原始异常
        throw new ExecutionException((Throwable)x);
    }

removeWaiter分析

  // 这个前提是当前当前的节点已经构建好了, 如果node为null那就确实没有意义
/**
 这个前提是,当前线程已经中断了。
1. 一开始先把node的thread变为null。在awaitDone方法拿到WaitNode的节点的引用了,先把他变为null,这个WaitNode的thread肯定是volatile修饰的,所以,这里能直接变为null。
2. 一开始变为null,在后面的操作里面,才会循环遍历找出来,然后从等待队列里面移除。
3. 这里是单链表的删除操作,肯定有两个指针(前置节点和当前节点)
	- 分为两种情况
		1. 要删除的节点是头结点,如果要删除是头结点,前两个if都不会进去,直接到第四个,将q变为s,也就是将头结点变为头结点的下一个。
    2. 如果不是头结点
         那第一个if一直可以进去,一直找到了,就会进去第二个if,先直接删除,pred.next = s;(要删除的是p)。
         		在看看后面的判断,如果前置节点变为null,这意味着,此时此刻也有别的线程要删除pred节点,那当前的这个就直接放弃
        
*/
private void removeWaiter(WaitNode node) {
        if (node != null) {
            node.thread = null;
            retry:
            for (;;) {
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    s = q.next;
                    if (q.thread != null)
                        pred = q;
                    else if (pred != null) {
                        pred.next = s;
                        if (pred.thread == null) // check for race
                            continue retry;
                    }
                    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                          q, s))
                        continue retry;
                }
                break;
            }
        }
    }

总结

  1. get方法,调用的时候,会通过 state 判断状态,如果 state完成,就会走到 report 方法,还是继续通过 state判断,如果是正常的结果,就直接返回,如果不是正常的,就抛出异常。

    没有完成,就会到awaitDone方法里面,在这个方法里面会构建等待节点,通过cas操作添加到等待队列里面,然后park住。在这个过程中,还会判断当前线程是否发生中断,如果发送中断,还会将当前线程从头结点中移除。

  2. 也就是说,在get里面一个线程到阻塞会循环最少会循环两次,一个是构建节点,一次是cas替换,之后才是堵塞,如果cas替换失败了,还会继续,所以,最少有两次尝试。

cancel方法分析


    public boolean cancel(boolean mayInterruptIfRunning) {
       // 不满足条件
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    
           // 
            if (mayInterruptIfRunning) {
                try {
                   //调用运行future线程的中断方法
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                   // 状态变为INTERRUPTED
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
          // 上面已经分析过了
            finishCompletion();
        }
        return true;
    }

关于Future的分析就分析到这里了。 如有不正确的地方,欢迎指出。谢谢。

上一篇:Java——线程池(二)


下一篇:Salesforce入门教程(中文)-002