public interface CallBack { /* 为什么要写这个回调接口呢? *因为可能不止主调A需要用到被调的处理过程,如果很多地方需要用到被调程序 * 那么传入被调的方法就不可能只传主调A类,所以要定义一个接口, * 传入被调的处理方法的参数就是这个接口对象 * */ public void solve(String result); }主调程序:
public class CallbackRequest implements Callback{ private CallbackResponse callbackResponse; public CallbackRequest(CallbackResponse callbackResponse) { this.callbackResponse = callbackResponse; } //主调需要解决一个问题,所以他把问题交给被调处理,被调单独创建一个线程,不影响主调程序的运行 public void request(final String question){ System.out.println("主调程序问了一个问题"); new Thread(()->{ //B想要帮A处理东西,就必须知道谁让自己处理的,所以要传入a,也要知道a想处理什么,所以要传入question callbackResponse.handler(this, question); }).start(); //A把要处理的事情交给b之后,就可以自己去玩耍了,或者去处理其他事情 afterAsk(); } private void afterAsk(){ System.out.println("主调程序继续处理其他事情"); } @Override public void solve(String result) { System.out.println("被调程序接到答案后进行处理" + result); } }
被调程序:
public class CallbackResponse { public void handler(Callback callback, String request) { System.out.println(callback.getClass()+"问的问题是:"+ request); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } String result="\n答案是2"; callback.solve(result); } }测试:
public class CallbackTest { public static void main(String[] args) { CallbackResponse callbackResponse = new CallbackResponse(); CallbackRequest callbackRequest = new CallbackRequest(callbackResponse); callbackRequest.request("1+1"); } }输出: 主调程序问了一个问题 主调程序继续处理其他事情 class javapratice.CallbackRequest问的问题是:1+1 被调程序接到答案后进行处理 答案是2
3、异步回调 异步回调的实现依赖于多线程或者多进程。软件模块之间总是存在着一定的接口,从调用方式上,可以把他们分为三类:同步调用、回调和异步调用。同步调用是一种阻塞式调用,调用方要等待对方执行完毕才返回,它是一种单向调用;回调是一种双向调用模式,也就是说,被调用方在接口被调用时也会调用对方的接口;异步调用是一种类似消息或事件的机制,不过它的调用方向刚好相反,接口的服务在收到某种讯息或发生某种事件时,会主动通知客户方(即调用客户方的接口)。回调和异步调用的关系非常紧密,通常我们使用回调来实现异步消息的注册,通过异步调用来实现消息的通知。 3.1、多线程中的“回调” (JDK8之前) Java多线程中可以通过callable和future或futuretask结合来获取线程执行后的返回值。实现方法是通过get方法来调用callable的call方法获取返回值。其实这种方法本质上不是回调,回调要求的是任务完成以后被调用者主动回调调用者的接口,而这里是调用者主动使用get方法阻塞获取返回值。一般情况下,我们会结合Callable和Future一起使用,通过ExecutorService的submit方法执行Callable,并返回Future。
//多线程中的“回调” public class CallBackMultiThread { //这里简单地使用future和callable实现了线程执行完后 public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newCachedThreadPool(); Future<String> future = executor.submit(new Callable<String>() { @Override public String call() throws Exception { System.out.println("call"); TimeUnit.SECONDS.sleep(1); return "str"; } }); //手动阻塞调用get通过call方法获得返回值。 System.out.println(future.get()); //需要手动关闭,不然线程池的线程会继续执行。 executor.shutdown(); //使用futuretask同时作为线程执行单元和数据请求单元。 FutureTask<Integer> futureTask = new FutureTask(new Callable<Integer>() { @Override public Integer call() throws Exception { System.out.println("dasds"); return new Random().nextInt(); } }); new Thread(futureTask).start(); //阻塞获取返回值 System.out.println(futureTask.get()); } }
注:比起future.get(),其实更推荐使用get (long timeout, TimeUnit unit)方法,设置了超时时间可以防止程序无限制的等待future的结果。
3.2、Java8中新增的CompletableFuture CompletableFuture在Java里面被用于异步编程,异步通常意味着非阻塞,可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。CompletableFuture实现了Future, CompletionStage接口,实现了Future接口就可以兼容现在有线程池框架,而CompletionStage接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的CompletableFuture类。 Future vs CompletableFuture Futrue在Java里面,通常用来表示一个异步任务的引用,比如我们将任务提交到线程池里面,然后我们会得到一个Futrue,在Future里面有isDone方法来 判断任务是否处理结束,还有get方法可以一直阻塞直到任务结束然后获取结果,但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成。 但是Future的主要缺点如下:- 不支持手动完成:这个意思指的是,我提交了一个任务,但是执行太慢了,我通过其他路径已经获取到了任务结果,现在没法把这个任务结果,通知到正在执行的线程,所以必须主动取消或者一直等待它执行完成。
- 不支持进一步的非阻塞调用:这个指的是我们通过Future的get方法会一直阻塞到任务完成,但是我还想在获取任务之后,执行额外的任务,因为Future不支持回调函数,所以无法实现这个功能。
- 不支持链式调用:这个指的是对于Future的执行结果,我们想继续传到下一个Future处理使用,从而形成一个链式的pipline调用,这在Future中是没法实现的。
- 不支持多个Future合并:比如我们有10个Future并行执行,我们想在所有的Future运行完毕之后,执行某些函数,是没法通过Future实现的。
- 不支持异常处理:Future的API没有任何的异常处理的api,所以在异步运行时,如果出了问题是不好定位的。
public class TestCompletableFuture { public static void main(String[] args) throws Exception{ CompletableFuture<String> completableFuture=new CompletableFuture<String>(); Runnable runnable=new Runnable() { @Override public void run() { try { TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName()+" 执行....."); completableFuture.complete("success");//在子线程中完成主线程completableFuture的完成 } catch (InterruptedException e) { e.printStackTrace(); } } }; Thread t1=new Thread(runnable); t1.start();//启动子线程 String result=completableFuture.get();//主线程阻塞,等待完成 System.out.println(Thread.currentThread().getName()+" 1x: "+result); } } 输出结果: Thread-0 执行..... main 1x: success
2、运行一个简单的没有返回值的异步任务
public class TestCompletableFuture { public static void main(String[] args) throws Exception{ CompletableFuture<Void> future=CompletableFuture.runAsync(new Runnable() { @Override public void run() { try { System.out.println(Thread.currentThread().getName()+"正在执行一个没有返回值的异步任务。"); TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } }); future.get(); System.out.println(Thread.currentThread().getName()+" 结束。"); } } 输出如下: ForkJoinPool.commonPool-worker-1正在执行一个没有返回值的异步任务。 main 结束。
从上面代码我们可以看到CompletableFuture默认运行使用的是ForkJoin的的线程池。当然,你也可以用lambda表达式使得代码更精简。
3,运行一个有返回值的异步任务public class TestCompletableFuture { public static void main(String[] args) throws Exception{ CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>(){ @Override public String get() { try { System.out.println(Thread.currentThread().getName()+"正在执行一个有返回值的异步任务。"); TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return "OK"; } }); String result=future.get(); System.out.println(Thread.currentThread().getName()+" 结果:"+result); } } 输出结果: ForkJoinPool.commonPool-worker-1正在执行一个有返回值的异步任务。 main 结果:OK
当然,上面默认的都是ForkJoinPool我们也可以换成Executor相关的Pool,其api都有支持如下:
static CompletableFuture<Void> runAsync(Runnable runnable) static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
2)高级使用CompletableFuture 上面提到的几种使用方法是使用异步编程最简单的步骤,CompletableFuture.get()的方法会阻塞直到任务完成,这其实还是同步的概念,这对于一个异步系统是不够的,因为真正的异步是需要支持回调函数,这样以来,我们就可以直接在某个任务干完之后,接着执行回调里面的函数,从而做到真正的异步概念。在CompletableFuture里面,通过thenApply(), thenAccept(),thenRun()方法,来运行一个回调函数。 1、thenApply() 这个方法,其实用过函数式编程的人非常容易理解,类似于scala和spark的map算子,通过这个方法可以进行多次链式转化并返回最终的加工结果。 看下面一个例子:
public class TestCompletableFuture { public static void asyncCallback() throws ExecutionException, InterruptedException { CompletableFuture<String> task=CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { System.out.println("线程" + Thread.currentThread().getName() + " supplyAsync"); return "123"; } }); CompletableFuture<Integer> result1 = task.thenApply(number->{ System.out.println("线程" + Thread.currentThread().getName() + " thenApply1 "); return Integer.parseInt(number); }); CompletableFuture<Integer> result2 = result1.thenApply(number->{ System.out.println("线程" + Thread.currentThread().getName() + " thenApply2 "); return number*2; }); System.out.println("线程" + Thread.currentThread().getName()+" => "+result2.get()); } public static void main(String[] args) throws Exception{ asyncCallback(); } } 输出结果: 线程ForkJoinPool.commonPool-worker-1 supplyAsync 线程main thenApply1 线程main thenApply2 线程main => 246
2、thenAccept()
这个方法,可以接受Futrue的一个返回值,但是本身不在返回任何值,适合用于多个callback函数的最后一步操作使用。例子如下:public class TestCompletableFuture { public static void asyncCallback() throws ExecutionException, InterruptedException { CompletableFuture<String> task=CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { System.out.println(Thread.currentThread().getName()+" supplyAsync"); return "123"; } }); CompletableFuture<Integer> chain1 = task.thenApply(number->{ System.out.println(Thread.currentThread().getName()+" thenApply1"); return Integer.parseInt(number); }); CompletableFuture<Integer> chain2 = chain1.thenApply(number->{ System.out.println(Thread.currentThread().getName()+" thenApply2"); return number*2; }); CompletableFuture<Void> result=chain2.thenAccept(product->{ System.out.println(Thread.currentThread().getName()+" thenAccept="+product); }); result.get(); System.out.println(Thread.currentThread().getName()+" end"); } public static void main(String[] args) throws Exception { asyncCallback(); } } 结果如下: ForkJoinPool.commonPool-worker-1 supplyAsync main thenApply1 main thenApply2 main thenAccept=246 main end
3、thenRun() 这个方法与上一个方法类似,一般也用于回调函数最后的执行,但这个方法不接受回调函数的返回值,纯粹就代表执行任务的最后一个步骤:
public class TestCompletableFuture { public static void asyncCallback() throws ExecutionException, InterruptedException { CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName()+"supplyAsync: 一阶段任务"); return null; }).thenRun(()->{ System.out.println(Thread.currentThread().getName()+"thenRun: 收尾任务"); }).get(); } public static void main(String[] args) throws Exception { asyncCallback(); } } 结果: ForkJoinPool.commonPool-worker-1supplyAsync: 一阶段任务 mainthenRun: 收尾任务
这里注意,截止到目前,前面的例子代码只会涉及两个线程,一个是主线程一个是ForkJoinPool池的线程,但其实上面的每一步都是支持异步运行的,其api如下:
// thenApply() variants <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
我们看下改造后的一个例子:
public class TestCompletableFuture { public static void asyncCallback() throws ExecutionException, InterruptedException { CompletableFuture<String> ref1= CompletableFuture.supplyAsync(()->{ try { System.out.println(Thread.currentThread().getName() + " supplyAsync开始执行任务1.... "); TimeUnit.SECONDS.sleep(1); } catch (Exception e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " supplyAsync: 任务1"); return null; }); CompletableFuture<String> ref2= CompletableFuture.supplyAsync(()->{ try { } catch (Exception e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " thenApplyAsync: 任务2"); return null; }); CompletableFuture<String> ref3=ref2.thenApplyAsync(value->{ System.out.println(Thread.currentThread().getName() +" thenApplyAsync: 任务2的子任务"); return " finish"; }); Thread.sleep(4000); System.out.println(Thread.currentThread().getName() + ref3.get()); } public static void main(String[] args) throws Exception { asyncCallback(); } } 输出结果如下: ForkJoinPool.commonPool-worker-1 supplyAsync开始执行任务1.... ForkJoinPool.commonPool-worker-2 thenApplyAsync: 任务2 ForkJoinPool.commonPool-worker-2 thenApplyAsync: 任务2的子任务 ForkJoinPool.commonPool-worker-1 supplyAsync: 任务1 main finish
我们可以看到,ForkJoin池的线程1,执行了前面的三个任务,但是第二个任务的子任务,因为我们了使用也异步提交所以它用的线程是ForkJoin池的线程2,最终由于main线程处执行了get是最后结束的。
还有一点需要注意: ForkJoinPool所有的工作线程都是守护模式的,也就是说如果主线程退出,那么整个处理任务都会结束,而不管你当前的任务是否执行完。如果需要主线程等待结束,可采用ExecutorsThreadPool,如下:ExecutorService pool = Executors.newFixedThreadPool(5); final CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { ... }, pool);
4、thenCompose():合并两个有依赖关系的CompletableFutures的执行结果
CompletableFutures在执行两个依赖的任务合并时,会返回一个嵌套的结果列表,为了避免这种情况我们可以使用thenCompose来返回,直接获取最顶层的结果数据即可:public class TestCompletableFuture { public static void asyncCompose() throws ExecutionException, InterruptedException { CompletableFuture<String> future1=CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { return "1"; } }); CompletableFuture<String>nestedResult = future1.thenCompose(value-> CompletableFuture.supplyAsync(()->{ return value+"2"; })); System.out.println(nestedResult.get()); } public static void main(String[] args) throws Exception { asyncCompose(); } } 输出结果:12
5、thenCombine:组合两个没有依赖关系的CompletableFutures任务
public class TestCompletableFuture { public static void asyncCombine() throws ExecutionException, InterruptedException { CompletableFuture<Double> d1= CompletableFuture.supplyAsync(new Supplier<Double>() { @Override public Double get() { return 1d; } }); CompletableFuture<Double> d2= CompletableFuture.supplyAsync(new Supplier<Double>() { @Override public Double get() { return 2d; } }); CompletableFuture<Double> result= d1.thenCombine(d2,(number1,number2)->{ return number1+number2; }); System.out.println(result.get()); } public static void main(String[] args) throws Exception { asyncCombine(); } } 输出结果:3d
6、合并多个任务的结果allOf与anyOf
上面说的是两个任务的合并,那么多个任务需要使用allOf或者anyOf方法。allOf适用于,你有一系列独立的future任务,你想等其所有的任务执行完后做一些事情。举个例子,比如我想下载100个网页,传统的串行,性能肯定不行,这里我们采用异步模式,同时对100个网页进行下载,当所有的任务下载完成之后,我们想判断每个网页是否包含某个关键词。 下面我们通过随机数来模拟上面的这个场景如下:public class TestCompletableFuture { public static void mutilTaskTest() throws ExecutionException, InterruptedException { //添加n个任务 CompletableFuture<Double> array[]=new CompletableFuture[3]; for ( int i = 0; i < 3; i++) { array[i]=CompletableFuture.supplyAsync(new Supplier<Double>() { @Override public Double get() { return Math.random(); } }); } //获取结果的方式一 // CompletableFuture.allOf(array).get(); // for(CompletableFuture<Double> cf:array){ // if(cf.get()>0.6){ // System.out.println(cf.get()); // } // } //获取结果的方式二,过滤大于指定数字,在收集输出 List<Double> rs= Stream.of(array).map(CompletableFuture::join).filter(number->number>0.6).collect(Collectors.toList()); System.out.println(rs); } public static void main(String[] args) throws Exception { mutilTaskTest(); } } 结果如下(结果可能不一致): [0.85538057702618, 0.7692532053269862, 0.6441387373310598]
注意其中的join方法和get方法类似,仅仅在于在Future不能正常完成的时候抛出一个unchecked的exception,这可以确保它用在Stream的map方法中,直接使用get是没法在map里面运行的。
anyOf方法,也比较简单,意思就是只要在多个future里面有一个返回,整个任务就可以结束,而不需要等到每一个future结束。public class TestCompletableFuture { public static void mutilTaskTest() throws ExecutionException, InterruptedException { CompletableFuture<String> f1 = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } return "wait 4 seconds"; } }); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return "wait 2 seconds"; } }); CompletableFuture<String> f3 = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } return "wait 10 seconds"; } }); CompletableFuture<Object> result = CompletableFuture.anyOf(f1, f2, f3); System.out.println(result.get()); } public static void main(String[] args) throws Exception { mutilTaskTest(); } } 输出结果: wait 2 seconds
注意由于Anyof返回的是其中任意一个Future所以这里没有明确的返回类型,统一使用Object接受,留给使用端处理。
7、exceptionally异常处理 异常处理是异步计算的一个重要环节,下面看看如何在CompletableFuture中使用:public class TestCompletableFuture { public static void exceptionProcess() throws ExecutionException, InterruptedException { int age=-1; CompletableFuture<String> task= CompletableFuture.supplyAsync(new Supplier<String>(){ @Override public String get(){ if(age<0){ throw new IllegalArgumentException("性别必须大于0"); } if(age<18){ return "未成年人"; } return "成年人"; } }).exceptionally(ex->{ System.out.println(ex.getMessage()); return "发生 异常"+ex.getMessage(); }); System.out.println(task.get()); } public static void main(String[] args) throws Exception { exceptionProcess(); } } 结果如下: java.lang.IllegalArgumentException: 性别必须大于0 发生 异常java.lang.IllegalArgumentException: 性别必须大于0
此外还有另外一种异常捕捉方法handle,无论发生异常都会执行,示例如下:
public class TestCompletableFuture { public static void exceptionProcess() throws ExecutionException, InterruptedException { int age = -10; CompletableFuture<String> task= CompletableFuture.supplyAsync(new Supplier<String>(){ @Override public String get(){ if(age<0){ throw new IllegalArgumentException("性别必须大于0"); } if(age<18){ return "未成年人"; } return "成年人"; } }).handle((res,ex)->{ System.out.println("执行handle"); if(ex!=null){ System.out.println("发生异常"); return "发生 异常"+ex.getMessage(); } return res; }); System.out.println(task.get()); } public static void main(String[] args) throws Exception { exceptionProcess(); } } 输出结果: 执行handle 发生异常 发生 异常java.lang.IllegalArgumentException: 性别必须大于0
注意上面的方法如果正常执行,也会执行handle方法。
3.3、JDK9 CompletableFuture 类增强的主要内容 (1)支持对异步方法的超时调用- orTimeout()
- completeOnTimeout()
- Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)
- Executor delayedExecutor(long delay, TimeUnit unit)