《Java动手撸源码》手写实现Future设计模式
文章目录
前言
最近经常遇见Future设计模式,比如在看Tomcat源码的HostConfig等XXXConfig类的deployDescriptors方法时;下面给出HostConfig的deployDescriptors源码:
/**
* Deploy XML context descriptors.
*
* @param configBase The config base
* @param files The XML descriptors which should be deployed
*/
protected void deployDescriptors(File configBase, String[] files) {
if (files == null)
return;
ExecutorService es = host.getStartStopExecutor();
//Future设计模式
List<Future<?>> results = new ArrayList<>();
for (String file : files) {
File contextXml = new File(configBase, file);
if (file.toLowerCase(Locale.ENGLISH).endsWith(".xml")) {
ContextName cn = new ContextName(file, true);
if (isServiced(cn.getName()) || deploymentExists(cn.getName()))
continue;
results.add(
es.submit(new DeployDescriptor(this, cn, contextXml)));
}
}
for (Future<?> result : results) {
try {
result.get();
} catch (Exception e) {
log.error(sm.getString(
"hostConfig.deployDescriptor.threaded.error"), e);
}
}
}
那么到底什么是Future设计模式呢?
假设有个任务需要执行较长的时间,通常需要等待任务执行结束或者出错才能返回结果,在此期间调用者只能陷入阻塞苦苦等待,对此,Future设计模式提供了一种凭据式的解决方案。在我们日常生活中,关于凭据的使用非常的多见,比如你去某西服手工作坊想定做一身合身的西服,西服的制作过程比较漫长,少则一个礼拜,多则一个月,你不可能一直在原地等待,一般来说作坊会给你一个凭据,此凭据就是Future,在接下来的任意日子里你可以凭这个凭据去拿西服。
本作者维护了一个仓库,名称叫Thread,打算在这个仓库里面手写实现Java多线程的一些经典技术,欢迎大家的star,本博文的代码已经上传到了该仓库,在com.thread.future包下。
链接: 仓库地址。欢迎大家的star,您的star是我继续下去的动力。
一、UML设计
(1)Future提供了获取计算结果和判断任务是否完成的两个接口,其中获取计算结果将会导致调用阻塞。
(2)FutureService主要用于提交任务,提交的任务有三种,一种可以返回值,一种不需要返回值,还有一种可以执行回调函数
(3)Task接口主要是提供给调用者实现计算逻辑用的,可以接受一个参数并返回最后的执行结果。
(4)FutureTask是Task的一个实现,除了实现Future的get和done方法之外,还增加了protected类型的finish方法,该方法主要用于接口任务被完成的通知。
(5)FutureServiceImpl类的主要作用是当提交任务时创建一个新的线程来受理该任务,进而达到任务异步执行的效果。
(6)CallBack接口的实现很简单,类似于jdk8中的Consumer函数式接口(接受一个参数,执行,不返回值)
二、代码实现
最后实现的代码,生成的UML类图如下所示:
下面贴一下关键代码:
2.1 FutureService
代码如下(示例):
package com.thread.future;
public interface FutureService<IN, OUT> {
//提交不需要返回值得任务,Future.get方法返回null
Future<?> submit(Runnable runnable);
// 提交需要返回值得任务
Future<OUT> submit(Task<IN, OUT> task, IN in);
// 提交需要返回值得任务
Future<OUT> submit(Task<IN, OUT> taskM, IN in, CallBack<OUT> callBack);
//使用静态方法创建一个FutureService的实现
static <T, R> FutureService<T, R> newService() {
return new FutureServiceImpl<>();
}
}
2.3 FutureTask
代码如下(示例):
package com.thread.future;
public class FutureTask<T> implements Future<T> {
//计算结果
private T result;
//任务是否完成
private boolean isDone = false;
//定义对象锁
private final Object LOCK = new Object();
//该方法会造成阻塞,直到任务结束
@Override
public T get() throws InterruptedException {
synchronized (LOCK) {
while (!isDone) {
LOCK.wait();
}
return result;
}
}
//任务执行结束,将结果赋值给FutureTask
protected void finish(T result) {
synchronized (LOCK) {
if (this.isDone) return;
this.result = result;
this.isDone = true;
LOCK.notifyAll();
}
}
@Override
public boolean done() {
return isDone;
}
}
2.2 FutureServiceImpl
代码如下(示例):
package com.thread.future;
import java.util.concurrent.atomic.AtomicInteger;
public class FutureServiceImpl<IN, OUT> implements FutureService<IN, OUT> {
private final static String FUTURE_THREAD_PREFIX = "FUTURE-";
private final AtomicInteger nextCounter = new AtomicInteger(0);
private String getNextName() {
return FUTURE_THREAD_PREFIX + nextCounter.getAndIncrement();
}
//没有返回值的提交方法,执行一个run单元即可。
@Override
public Future<?> submit(Runnable runnable) {
//创建一个票据
final FutureTask<Void> futureTask = new FutureTask<>();
new Thread(() -> {
// 执行逻辑单元
runnable.run();
// 执行结束之后,将结果返回给票据,因为是没有返回值的所以返回值为null
futureTask.finish(null);
}).start();
return futureTask;
}
//有返回值的提交方法,使用自定义的Task任务,执行get方法,并返回计算结果
@Override
public Future<OUT> submit(Task<IN, OUT> task, IN in) {
//创建一个票据
final FutureTask<OUT> futureTask = new FutureTask<>();
new Thread(() -> {
// 执行Task逻辑单元,输入一个值,进行计算,然后输出值
OUT out = task.get(in);
// 执行结束之后,将结果返回给票据,因为是没有返回值的所以返回值为null
futureTask.finish(out);
}).start();
return futureTask;
}
//含有回调函数的提交方法
@Override
public Future<OUT> submit(Task<IN, OUT> task, IN in, CallBack<OUT> callBack) {
//创建一个票据
final FutureTask<OUT> futureTask = new FutureTask<>();
new Thread(() -> {
// 执行Task逻辑单元,输入一个值,进行计算,然后输出值
OUT out = task.get(in);
// 执行结束之后,将结果返回给票据,因为是没有返回值的所以返回值为null
futureTask.finish(out);
// 执行结束之后,执行回调函数
if (callBack != null) {
callBack.call(out);
}
}).start();
return futureTask;
}
}
具体代码,去我的仓库看吧,我将在我的仓库里面不定时更新关于Java多线程的一些技术源码实现。比如线程池、ThreaLocal等。
三、代码测试
测试代码如下:
package com.thread.future;
public class FutureTest {
public static void main(String[] args) throws InterruptedException {
System.out.println("下面测试三种不同的回调方法");
System.out.println("首先第一种不需要返回值的:");
FutureService<Integer, Void> futureService1 = new FutureServiceImpl<>();
Future<?> future1 = futureService1.submit(() -> {
System.out.println("西服做完了。。。。。");
});
System.out.println("不需要返回值的返回:" + future1.get());
System.out.println("其次第二种需要返回值的:");
FutureService<Integer, String> futureService2 = new FutureServiceImpl<>();
Future<?> future2 = futureService2.submit(input -> {
System.out.println("西服根据您的身高" + input + "做完了,将该消息通知给用户");
return "西服做完了";
}, 175);
System.out.println("需要返回值的返回:" + future2.get());
System.out.println("其次第三种基于时间回调的:");
FutureService<Integer, String> futureService3 = new FutureServiceImpl<>();
Future<?> future3 = futureService2.submit(input -> {
System.out.println("西服根据您的身高" + input + "做完了");
return "西服做完了";
}, 175, input -> {
System.out.println("西服做完了,并根据您填写的地址,给您送到了家里面");
});
System.out.println("回调函数的get方法就失去意义了,因为想做的事情都在回调函数里面执行了");
}
}
输出结果如下:
下面测试三种不同的回调方法
首先第一种不需要返回值的:
西服做完了。。。。。
不需要返回值的返回:null
其次第二种需要返回值的:
西服根据您的身高175做完了,将该消息通知给用户
需要返回值的返回:西服做完了
其次第三种基于时间回调的:
西服根据您的身高175做完了
回调函数的get方法就失去意义了,因为想做的事情都在回调函数里面执行了
西服做完了,并根据您填写的地址,给您送到了家里面
总结
当某个任务执行需要较长时间的时候,当前线程盲目等待该任务结束,对应CPU资源是一种浪费,在等待的这段时间里面,完全可以去做其他的事情,所以就出现了Future设计模型。Future设计模型虽然在提交的时候不会堵塞,然后在之后获取结果的时候依然可能会造成阻塞,本文写了三种Future设计模型的应用,分别是带返回值的,不带返回值的以及回调函数的。