《Java动手撸源码》手写实现Future设计模式

《Java动手撸源码》手写实现Future设计模式


文章目录


前言

最近经常遇见Future设计模式,比如在看Tomcat源码的HostConfig等XXXConfig类的deployDescriptors方法时;下面给出HostConfig的deployDescriptors源码:

 /**
     * Deploy XML context descriptors.
     *
     * @param configBase The config base
     * @param files      The XML descriptors which should be deployed
     */
    protected void deployDescriptors(File configBase, String[] files) {

        if (files == null)
            return;

        ExecutorService es = host.getStartStopExecutor();
        //Future设计模式
        List<Future<?>> results = new ArrayList<>();

        for (String file : files) {
            File contextXml = new File(configBase, file);

            if (file.toLowerCase(Locale.ENGLISH).endsWith(".xml")) {
                ContextName cn = new ContextName(file, true);

                if (isServiced(cn.getName()) || deploymentExists(cn.getName()))
                    continue;

                results.add(
                        es.submit(new DeployDescriptor(this, cn, contextXml)));
            }
        }

        for (Future<?> result : results) {
            try {
                result.get();
            } catch (Exception e) {
                log.error(sm.getString(
                        "hostConfig.deployDescriptor.threaded.error"), e);
            }
        }
    }

那么到底什么是Future设计模式呢?
假设有个任务需要执行较长的时间,通常需要等待任务执行结束或者出错才能返回结果,在此期间调用者只能陷入阻塞苦苦等待,对此,Future设计模式提供了一种凭据式的解决方案。在我们日常生活中,关于凭据的使用非常的多见,比如你去某西服手工作坊想定做一身合身的西服,西服的制作过程比较漫长,少则一个礼拜,多则一个月,你不可能一直在原地等待,一般来说作坊会给你一个凭据,此凭据就是Future,在接下来的任意日子里你可以凭这个凭据去拿西服。

本作者维护了一个仓库,名称叫Thread,打算在这个仓库里面手写实现Java多线程的一些经典技术,欢迎大家的star,本博文的代码已经上传到了该仓库,在com.thread.future包下。
链接: 仓库地址。欢迎大家的star,您的star是我继续下去的动力。


一、UML设计

《Java动手撸源码》手写实现Future设计模式
(1)Future提供了获取计算结果判断任务是否完成的两个接口,其中获取计算结果将会导致调用阻塞。
(2)FutureService主要用于提交任务,提交的任务有三种,一种可以返回值,一种不需要返回值,还有一种可以执行回调函数
(3)Task接口主要是提供给调用者实现计算逻辑用的,可以接受一个参数并返回最后的执行结果。
(4)FutureTask是Task的一个实现,除了实现Future的get和done方法之外,还增加了protected类型的finish方法,该方法主要用于接口任务被完成的通知。
(5)FutureServiceImpl类的主要作用是当提交任务时创建一个新的线程来受理该任务,进而达到任务异步执行的效果。
(6)CallBack接口的实现很简单,类似于jdk8中的Consumer函数式接口(接受一个参数,执行,不返回值)

二、代码实现

最后实现的代码,生成的UML类图如下所示:
《Java动手撸源码》手写实现Future设计模式
下面贴一下关键代码:

2.1 FutureService

代码如下(示例):

package com.thread.future;

public interface FutureService<IN, OUT> {

    //提交不需要返回值得任务,Future.get方法返回null
    Future<?> submit(Runnable runnable);

    // 提交需要返回值得任务
    Future<OUT> submit(Task<IN, OUT> task, IN in);

    // 提交需要返回值得任务
    Future<OUT> submit(Task<IN, OUT> taskM, IN in, CallBack<OUT> callBack);

    //使用静态方法创建一个FutureService的实现
    static <T, R> FutureService<T, R> newService() {
        return new FutureServiceImpl<>();
    }

}

2.3 FutureTask

代码如下(示例):

package com.thread.future;

public class FutureTask<T> implements Future<T> {

    //计算结果
    private T result;

    //任务是否完成
    private boolean isDone = false;

    //定义对象锁
    private final Object LOCK = new Object();


    //该方法会造成阻塞,直到任务结束
    @Override
    public T get() throws InterruptedException {
        synchronized (LOCK) {
            while (!isDone) {
                LOCK.wait();
            }
            return result;
        }
    }

    //任务执行结束,将结果赋值给FutureTask
    protected void finish(T result) {
        synchronized (LOCK) {
            if (this.isDone) return;
            this.result = result;
            this.isDone = true;
            LOCK.notifyAll();
        }
    }

    @Override
    public boolean done() {
        return isDone;
    }
}

2.2 FutureServiceImpl

代码如下(示例):

package com.thread.future;

import java.util.concurrent.atomic.AtomicInteger;

public class FutureServiceImpl<IN, OUT> implements FutureService<IN, OUT> {

    private final static String FUTURE_THREAD_PREFIX = "FUTURE-";
    private final AtomicInteger nextCounter = new AtomicInteger(0);

    private String getNextName() {
        return FUTURE_THREAD_PREFIX + nextCounter.getAndIncrement();
    }

    //没有返回值的提交方法,执行一个run单元即可。
    @Override
    public Future<?> submit(Runnable runnable) {
        //创建一个票据
        final FutureTask<Void> futureTask = new FutureTask<>();
        new Thread(() -> {
            // 执行逻辑单元
            runnable.run();
            // 执行结束之后,将结果返回给票据,因为是没有返回值的所以返回值为null
            futureTask.finish(null);
        }).start();
        return futureTask;
    }

    //有返回值的提交方法,使用自定义的Task任务,执行get方法,并返回计算结果
    @Override
    public Future<OUT> submit(Task<IN, OUT> task, IN in) {
        //创建一个票据
        final FutureTask<OUT> futureTask = new FutureTask<>();
        new Thread(() -> {
            // 执行Task逻辑单元,输入一个值,进行计算,然后输出值
            OUT out = task.get(in);
            // 执行结束之后,将结果返回给票据,因为是没有返回值的所以返回值为null
            futureTask.finish(out);
        }).start();
        return futureTask;
    }

    //含有回调函数的提交方法
    @Override
    public Future<OUT> submit(Task<IN, OUT> task, IN in, CallBack<OUT> callBack) {
        //创建一个票据
        final FutureTask<OUT> futureTask = new FutureTask<>();
        new Thread(() -> {
            // 执行Task逻辑单元,输入一个值,进行计算,然后输出值
            OUT out = task.get(in);
            // 执行结束之后,将结果返回给票据,因为是没有返回值的所以返回值为null
            futureTask.finish(out);
            // 执行结束之后,执行回调函数
            if (callBack != null) {
                callBack.call(out);
            }
        }).start();
        return futureTask;
    }

}

具体代码,去我的仓库看吧,我将在我的仓库里面不定时更新关于Java多线程的一些技术源码实现。比如线程池、ThreaLocal等。

三、代码测试

测试代码如下:

package com.thread.future;

public class FutureTest {

    public static void main(String[] args) throws InterruptedException {
        System.out.println("下面测试三种不同的回调方法");
        System.out.println("首先第一种不需要返回值的:");
        FutureService<Integer, Void> futureService1 = new FutureServiceImpl<>();
        Future<?> future1 = futureService1.submit(() -> {
            System.out.println("西服做完了。。。。。");
        });
        System.out.println("不需要返回值的返回:" + future1.get());


        System.out.println("其次第二种需要返回值的:");
        FutureService<Integer, String> futureService2 = new FutureServiceImpl<>();
        Future<?> future2 = futureService2.submit(input -> {
            System.out.println("西服根据您的身高" + input + "做完了,将该消息通知给用户");
            return "西服做完了";
        }, 175);
        System.out.println("需要返回值的返回:" + future2.get());

        System.out.println("其次第三种基于时间回调的:");
        FutureService<Integer, String> futureService3 = new FutureServiceImpl<>();
        Future<?> future3 = futureService2.submit(input -> {
            System.out.println("西服根据您的身高" + input + "做完了");
            return "西服做完了";
        }, 175, input -> {
            System.out.println("西服做完了,并根据您填写的地址,给您送到了家里面");
        });
        System.out.println("回调函数的get方法就失去意义了,因为想做的事情都在回调函数里面执行了");
    }

}

输出结果如下:

下面测试三种不同的回调方法
首先第一种不需要返回值的:
西服做完了。。。。。
不需要返回值的返回:null
其次第二种需要返回值的:
西服根据您的身高175做完了,将该消息通知给用户
需要返回值的返回:西服做完了
其次第三种基于时间回调的:
西服根据您的身高175做完了
回调函数的get方法就失去意义了,因为想做的事情都在回调函数里面执行了
西服做完了,并根据您填写的地址,给您送到了家里面

总结

当某个任务执行需要较长时间的时候,当前线程盲目等待该任务结束,对应CPU资源是一种浪费,在等待的这段时间里面,完全可以去做其他的事情,所以就出现了Future设计模型。Future设计模型虽然在提交的时候不会堵塞,然后在之后获取结果的时候依然可能会造成阻塞,本文写了三种Future设计模型的应用,分别是带返回值的,不带返回值的以及回调函数的。

上一篇:Python并发编程之线程池与ThreadPoolExecutor探究


下一篇:dubbo之网络通讯