在如下笔记中提到,无论是join还是FutureTask都会阻塞主线程,无法实现真正的异步处理
https://www.cnblogs.com/qq931399960/p/15555152.html
Guava可提供了一种异步回调方案,不会阻塞主线程,Guava中添加了几个相关接口
FutureCallback: 主要对异步任务结束后的一些处理,在异步任务执行结束后被调用,包括onSuccess和onFailure两个方法,前者在任务结束后调用,无论异步任务是否发生了异常,结束后都会调用onSuccess,接口名字中带有Future,但和jdk中Future没有关系。
ListeningExecutorService:guava自己的连接池,对jdk中ExecutorService的封装
ListenableFuture:只有一个addListener接口,作用为将FutureCallback和异步任务绑定,使异步任务完成之后可以触发FutureCallback,但一般不会直接使用addListener方法,可以通过ListeningExecutorService。submit方法获取该接口对应的实现类对象,通过Futures的addCallback方法,将ListenableFuture和FutureCallback以及异步任务进行绑定。
使用Guava实现https://www.cnblogs.com/qq931399960/p/15555152.html中业务处理
private final static Logger logger = LoggerFactory.getLogger(OrderMealPlatformGuava.class); static Boolean courierArrive = null; // 默认null表示当前线程还未执行结束。true/false为最终执行结果 static Boolean merchantReadyMeal = null; public static void main(String[] args) { ThreadPoolExecutor pool = null; try { pool = new ThreadPoolExecutor(2, 2, 1000, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(4)); ListeningExecutorService gpool = MoreExecutors.listeningDecorator(pool); ListenableFuture<Boolean> merchantFuture = gpool.submit(new Callable<Boolean>() { @Override public Boolean call() throws Exception { try { logger.info("起锅烧油"); logger.info("炒菜"); // 5s炒菜时间 // int i =1/0; // 厨师请假 Thread.sleep(5000); logger.info("盛饭"); logger.info("打包"); } catch (Exception e) { logger.error("", e); return false; } return true; } }); ListenableFuture<Boolean> courierFuture = gpool.submit(new Callable<Boolean>() { @Override public Boolean call() throws Exception { try { logger.info("抢单"); logger.info("规划路线"); // 3s赶路时间 // int i=1/0; // 车子被偷 Thread.sleep(3000); logger.info("赶路"); logger.info("到店"); } catch (Exception e) { logger.error("", e); return false; } return true; } }); /* * 当异步任务失败,该方法中onSuccess和onFailure都会被执行,前者指的是异步任务结束了,并不是指异步任务执行成功, * 异步任务的执行成功与否,需要根据onSuccess的参数result来确定 */ Futures.addCallback(merchantFuture, new FutureCallback<Boolean>() { @Override public void onSuccess(Boolean result) { // 异步任务执行完成无论结果如何都会进入到该方法 // 走到这里,结束可能为true也可能为false,不能直接认为merchantReadyMeal=true // result为异步任务的执行结果 merchantReadyMeal = result; if (courierArrive != null) { // 快递员的流程已经结束 sendMeal(merchantReadyMeal, courierArrive); } } @Override public void onFailure(Throwable t) { // t为异步任务抛出的异常 logger.error("", t); } }, /* 使用公共的pool,也可以使用的新的pool */pool); Futures.addCallback(courierFuture, new FutureCallback<Boolean>() { @Override public void onSuccess(Boolean result) { courierArrive = result; // 处理成功时被调用 if (merchantReadyMeal != null) { // 商家的流程已经结束 sendMeal(merchantReadyMeal, courierArrive); } } @Override public void onFailure(Throwable t) { logger.error("", t); } }, pool); logger.info("继续发布订单消息"); } catch (Exception e) { logger.error("", e); } finally { while (true) { // getActiveCount返回的为正在执行任务的线程的数据,如果线程都未执行任务,则会返回0 if (pool != null && pool.getActiveCount() == 0) { // 对于本例中,需要所有线程执行完,才能结束线程池,并且也不会再有新任务添加,因此可以这么关闭 pool.shutdown(); break; } LockSupport.parkNanos(1000 * 1000 * 1000 * 1); } // 如果按照如下关闭线程池,则异步线程,则可能会出现异常 // shutdownThreadPoolGracefully(pool); } } private static void sendMeal(boolean merchantResult, boolean courierResult) { if (merchantResult && courierResult) { logger.info("快递员开始送餐 。。。"); } else if (merchantResult && !courierResult) { logger.error("外卖员车子被偷了,不能够送餐"); } else if (!merchantResult && courierResult) { logger.error("商家厨师家里临时有事,请假了,做不了饭"); } else { logger.error("外卖员车子被偷,商家厨师请假了 。。。 "); } }
执行结果
18:20:46.719 [pool-1-thread-1] INFO com.demo.order.OrderMealPlatformGuava - 起锅烧油 18:20:46.723 [main] INFO com.demo.order.OrderMealPlatformGuava - 继续发布订单消息 18:20:46.723 [pool-1-thread-1] INFO com.demo.order.OrderMealPlatformGuava - 炒菜 18:20:46.719 [pool-1-thread-2] INFO com.demo.order.OrderMealPlatformGuava - 抢单 18:20:46.723 [pool-1-thread-2] INFO com.demo.order.OrderMealPlatformGuava - 规划路线 18:20:49.723 [pool-1-thread-2] INFO com.demo.order.OrderMealPlatformGuava - 赶路 18:20:49.723 [pool-1-thread-2] INFO com.demo.order.OrderMealPlatformGuava - 到店 18:20:51.723 [pool-1-thread-1] INFO com.demo.order.OrderMealPlatformGuava - 盛饭 18:20:51.723 [pool-1-thread-1] INFO com.demo.order.OrderMealPlatformGuava - 打包 18:20:51.723 [pool-1-thread-1] INFO com.demo.order.OrderMealPlatformGuava - 快递员开始送餐 。。。
从运行结果看,主线程在发布了订单消息之后,就继续再发布其他的订单,不会因为前面的订单没有完成而阻塞