并发编程-FutureTask&CompletableFuture
今天会聊到【Future/callable】并且分析他们的原理,同时也会聊到【CompletableFuture】的使用和原理,在这一章中,我们聊并发就到此结束,下面我可能会去关注一些中间件,因为这些在分布式系统中起到了很重要的作用。
Future/callable使用
它是在原来线程的基础上,提供了一个带有返回值的线程。
public class FutureCallableExample { public static void main(String[] args) throws ExecutionException, InterruptedException { CalculationCallable calculationCallable=new CalculationCallable(1,2); FutureTask<Integer> futureTask=new FutureTask(calculationCallable); System.out.println("开始--->"+new Date()); //FutureTask本质上还是一个线程 new Thread(futureTask).start(); //这里是一个阻塞方法 ,我们要拿到返回结果,肯定要等到call方法执行完成才能进行返回,所以会阻塞 Integer result= futureTask.get(); System.out.println(result); System.out.println("结束--->"+new Date()); } static class CalculationCallable implements Callable<Integer>{ int x; int y; public CalculationCallable(int x, int y) { this.x = x; this.y = y; } @Override public Integer call() throws Exception { System.out.println("begin call--->"+new Date()); TimeUnit.SECONDS.sleep(2); return x+y; } } }
如何实现&源码解析
把重写Callable中的call方法传递到FutureTask中,然后把FutureTask放在线程中启动->FutureTas本身就是一个实现了Runnable的类,所以才能被启动,而传递这个对象进去,肯定是要调用他的call方法,那这个时候就要有一个共享变量记录他的状态,从而可以判断时候执行完毕or not
get 方法去获取结果的时候->通过上面的状态判读时候执行完成or not 如果没有执行完成则加入一个队列中进行等待,因为可能有多个线程来请求get,
整体流程:
- 当执行run的时候执行call方法,把执行结果放在一个全局变量中,在set结果的时候唤醒被阻塞的线程,上下文切换的时候线程阻塞到了get方法中,这个时候进入get方法的下一次自旋中,然后获得结果从而返回结果
- 当执行get方法的时候判断当前状态时候是执行完毕状态,是的话则直接返回数据,不是,则把当前节点加入到队列中进行阻塞
run()
public void run() { //这里就是进行状态的判断 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { //执行传递进来的Callable中的call方法 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }进行线程的唤醒
//这里进行唤醒操作 private void finishCompletion() { // assert state > COMPLETING; //遍历整个链表 for (WaitNode q; (q = waiters) != null;) { //改成null,->不断的把节点释放掉 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; //进行释放 LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }get()
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) //需要进行等待 s = awaitDone(false, 0L); return report(s); }进行线程的阻塞
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; //自旋,因为可能有多个线程来调用get方法,cas的时候可能失败,所以自旋重试 for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; //这里是已经执行完成就直接返回 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) //把当前节点设置到waiters(一个队列中) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else //阻塞,在set方法完成结果的时候就会去唤醒 LockSupport.park(this); } }
CompletableFuture
我们发现FutureTask在获取结果的时候会阻塞 ,我们是否可以让一个线程执行完成结果后主动通知我们呢(异步回调),是的,它就是做这个事情的,异步回调你的方法,不进行阻塞的获得结果,完成你获得结果后想做的事情。这个类图可以看出CompletableFuture融合了两者的实现,既可以通过future等待执行结果,又可以使用completionStage去增强异步回调的功能。
构建一个CompletableFuture
- supplyAsync(Supplier<U> supplier):没有返回值
- runAsync(Runnable runnable,Executor executor) :异步执行一个任务,并且可以自定义线程池,默认用【ForkJoinPool.commonPool】
//不带返回值(但是在调用get方法的时候也会阻塞) CompletableFuture.runAsync(() -> System.out.println("异步执行一个任务:" + Thread.currentThread().getName())).get(); //带返回值(但是在调用get方法的时候也会阻塞) // System.out.println(CompletableFuture.supplyAsync(() -> "Glen is the handsomest person").get()); //使用链式调用 CompletableFuture.supplyAsync(() -> "Glen is the handsomest person").thenAccept(System.out::println);=CompletionStage的api
【纯消费类型】:只消费上个的执行结果,不返回新的数据 【一定包含了accept关键字】
这里只演示几个用法大致相同,除了包含either的方法,他是只要前面任何一个执行完成,就完成。
public class AcceptExample { public static void main(String[] args) { CompletableFuture.supplyAsync(()->"Then accept message").thenAcceptAsync(System.out::println); CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> "Then accept both"); CompletableFuture<String> stringCompletableFuture1 = CompletableFuture.supplyAsync(() -> "message"); //这里指的是两个任务都执行结束,可以在函数中获取两个函数的结果进行操作 stringCompletableFuture.thenAcceptBoth(stringCompletableFuture1,(x1,x2)-> System.out.println(x1+"-->"+x2)); } }【有返回值的方法】: 即消费上个任务,也返回新的数据【包含apply】
public class ApplyExample { public static void main(String[] args) throws ExecutionException, InterruptedException { //还是获取上一个的执行结果,在下一个链中操作,然后返回 System.out.println(CompletableFuture.supplyAsync(() -> "apply").thenApply((result) -> result + "->Glen").get()); //这里你可以增加一个CompletableFuture,和上面相似 System.out.println(CompletableFuture.supplyAsync(() -> "combine") .thenCombine(CompletableFuture.supplyAsync(() -> "Glen"), (x1, x2) -> x1 + ":" + x2).get()); } }【不消费也不返回】: 上个任务的结果我不需要,我只是想等你结束后,然后我执行,我的结果业务也不需要,我也不用返回
public class RunExample { public static void main(String[] args) { //注意:这里不消费也不返回,并且接受不到之前的结果 CompletableFuture.supplyAsync(()->"Both").runAfterBoth(CompletableFuture.supplyAsync(()->"message"),()->{ System.out.println("there isn‘t previous result"); }); } }【组合类型】:组合多个类型任务,比如多个任务执行完成了,后面的任务才能执行 ,这个和上面的那些差不多
【针对异常处理】
public class ExceptionExample { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture completableFuture= CompletableFuture.supplyAsync(()->{ throw new RuntimeException("occur exception"); });/*.runAfterBoth(CompletableFuture.supplyAsync(()->"message"),()->{ System.out.println("I have done all"); });*/ //这个时候获取就会抛出异常 /*System.out.println(completableFuture.get());*/ completableFuture.whenComplete((rs,exception)->{ if (exception!=null){ System.out.println("前置任务正常异常"); }else { System.out.println("前置任务正常执行!!"+rs); } }); System.out.println(completableFuture.handleAsync((result, exception) -> exception != null ? "任务异常" : result).get()); } }
应用在实际中(模拟一个查询商品,但是现实中应该不会这样用,这里只是把上面的这些东西整合使用一下)
@Data @ToString public class Commodity { public Commodity(Integer id, String name, BigDecimal price) { this.id = id; this.name = name; this.price = price; } Integer id; String name; BigDecimal price; Integer repo; Integer buyerNum; List<String> remarks; }几个模拟获取数据的service
@Service public class CommodityService { List<Commodity> getRemarkById(){ return Arrays.asList( new Commodity(1,"电视",new BigDecimal(500)), new Commodity(2,"手机",new BigDecimal(1000)), new Commodity(3,"电脑",new BigDecimal(2000)), new Commodity(4,"台灯",new BigDecimal(50)), new Commodity(5,"水杯",new BigDecimal(56)) ); } } @Service public class RemarkService { List<String> getRemarkById(Integer goodId){ return Arrays.asList("好","非常好","还可以"); } } @Service public class RepoService { Integer getRepoById(Integer goodId){ return new Random().nextInt(1000); } }controller层,对上面的技术进行使用
/** * @author : lizi * @date : 2021-07-02 23:34 **/ @RestController public class GoodsController { @Autowired CommodityService commodityService; @Autowired RemarkService remarkService; @Autowired RepoService repoService; @GetMapping("/goods") public List<Commodity> goods() throws ExecutionException, InterruptedException { //获取物品列表 CompletableFuture<List<Commodity>> listCompletableFuture = CompletableFuture.supplyAsync(() -> commodityService.getRemarkById()); //这里面都是在异步执行 //thenApply 指的是获取上面的listCompletableFuture后再进行操作 CompletableFuture<List<Commodity>> listCompletableFuture1 = listCompletableFuture.thenApply(goods -> { goods.stream().map(goods1 -> CompletableFuture.supplyAsync(() -> { goods1.setRepo(repoService.getRepoById(goods1.getId())); return goods1; }).thenCompose(goods2 -> CompletableFuture.supplyAsync(() -> { goods2.setRemarks(remarkService.getRemarkById(goods2.getId())); return goods2; }))).toArray(); return goods; }); /* for (;;){ System.out.println("这里可以写我们的逻辑,但是上面的东西是交给一个线程去跑的,所以就可以直接执行向下执行,这样从某种意义上讲反应就快了"); }*/ //在这里get的时候进行阻塞 return (List<Commodity>) listCompletableFuture1.handleAsync((good, exception) -> exception != null ? "系统繁忙" : good).get(); } }
原理分析
他是基于cas的无锁并发栈(Completion是一个栈结构,存储的我们传递进去的任务,这里的任务指的是我们传递进CompletableFuture中方法的一系列操作)
我们看completion的的关系类图,他是使用【forkjoin】来作为底层的,具体如下:
比如你使用
【thenApply】:这种传递传递进来的任务会使用【UniCompletion】他下面的相关的类进行包装,
【CoCompletion】:针对两个任务的combination的实现,比如【applyToEither】的任务 so on
因为不同类型链式的结构执行的方式不一样,包装完成后,当【forkjoin】从上向下走的时候,他才能对不同的类型使用不同的执行策略,那是如何组装这些的呢?
举个例子(这就类似一个树的样子,我们每次点出一个新的方法,他就想当于树上的树枝,然后再次点出来的则数据树枝上的叶子,或者树枝)
/** * @author : lizi * @date : 2021-07-03 13:09 **/ public class PrincipleExample { public static void main(String[] args) { CompletableFuture<String> base_future = CompletableFuture.completedFuture("Base Future"); //这个时候构建的一定是一个UniCompletion对他进行包装(因为他是一个只有单个任务的 ) base_future.thenApply(r->"then apply:"+r); //这里的第一个thenAccept,就是入栈(Completion)到base_future中的第二个,然后他返回一个新的CompletableFuture //这里的第二个thenAccept则是入栈到新的CompletableFuture中。 base_future.thenAccept(r-> System.out.println(r)).thenAccept(none-> System.out.println("no result")); //这里的第一个thenApply则成为第三个入栈到到base_future中的,然后同理返回新的,thenAccept则进去新的completableFuture中, // 也就是现在在base_future中有三个子节点由UniCompletion进行包装,而其中的两个都有一个新的子节点,这样就类似于一个树的样子 base_future.thenApply(r->"message").thenAccept(r-> System.out.println("where should i go ")); } }
当任务执行的时候,则是逐步出栈(等于一个个树枝进行捋,如果树枝下面有还有树枝,则继续)