使用ThreadPoolExecutor 创建线程池,完成并行操作

日常工作中很多地方很多效率极低的操作,往往可以改串行为并行,执行效率往往提高数倍,废话不多说先上代码

1、用到的guava坐标

使用ThreadPoolExecutor 创建线程池,完成并行操作
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>18.0</version>
        </dependency>
View Code

2、创建一个枚举保证线程池是单例

使用ThreadPoolExecutor 创建线程池,完成并行操作
package com.hao.service;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

public enum ExecutorManager {

    INSTANCE;

    private ExecutorManager() {

    }

    private static int AVAILABLEPROCESSORS = Runtime.getRuntime().availableProcessors();

    public static final ThreadPoolExecutor threadPoolExecutor =
        new ThreadPoolExecutor(AVAILABLEPROCESSORS * 50, AVAILABLEPROCESSORS * 80, 0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(AVAILABLEPROCESSORS * 2000),
            new ThreadFactoryBuilder().setNameFormat("ExecutorManager-pool-Thread-%d").build());
    
    

}
View Code

3、创建一个方法类

使用ThreadPoolExecutor 创建线程池,完成并行操作
package com.hao.service;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;

import org.springframework.stereotype.Service;

import com.google.common.base.Preconditions;

@Service
public class ExecutorContext {

    public ExecutorService executorService;
    private int DEFAULT_WAIT_SECONDS = 2;

    @PostConstruct
    public void init() {
        executorService = ExecutorManager.threadPoolExecutor;
    }

    public <T> List<T> waitAllFutures(List<Callable<T>> calls, int milliseconds) throws Exception {
        Preconditions.checkArgument(null != calls && !calls.isEmpty(), "callable empty.");
        LatchedCallables<T> latchAndCallables = wrapCallables(calls);
        List<Future<T>> futurres = new LinkedList<>();
        for (CountdownedCallable<T> callable : latchAndCallables.wrappedCallables) {
            if (null != callable) {
                futurres.add(executorService.submit(callable));
            }
        }
        List<T> rets = new ArrayList<>();
        if (latchAndCallables.latch.await(milliseconds, TimeUnit.MILLISECONDS)) {
            for (CountdownedCallable<T> call : latchAndCallables.wrappedCallables) {
                rets.add(call.getResult());
            }
        } else {
            for (Future<T> future : futurres) {
                if (!future.isDone()) {
                    future.cancel(true);
                }
            }
        }
        return rets;
    }

    public <T> List<T> waitAllCallables(List<Callable<T>> calls, int seconds) throws Exception {
        Preconditions.checkArgument(null != calls && !calls.isEmpty(), "callable empty.");
        LatchedCallables<T> latchAndCallables = wrapCallables(calls);
        for (CountdownedCallable<T> callable : latchAndCallables.wrappedCallables) {
            executorService.submit(callable);
        }
        List<T> rets = new ArrayList<>();
        if (latchAndCallables.latch.await(seconds, TimeUnit.SECONDS)) {
            for (CountdownedCallable<T> call : latchAndCallables.wrappedCallables) {
                rets.add(call.getResult());
            }
        }
        return rets;
    }

    public <T> List<T> waitAllCallables(@SuppressWarnings("unchecked") Callable<T>... calls) throws Exception {
        Preconditions.checkNotNull(calls, "callable empty.");
        return waitAllCallables(Arrays.asList(calls), DEFAULT_WAIT_SECONDS);
    }

    private static <T> LatchedCallables<T> wrapCallables(List<Callable<T>> callables) {
        CountDownLatch latch = new CountDownLatch(callables.size());
        List<CountdownedCallable<T>> wrapped = new ArrayList<>(callables.size());
        for (Callable<T> callable : callables) {
            wrapped.add(new CountdownedCallable<>(callable, latch));
        }

        LatchedCallables<T> returnVal = new LatchedCallables<>();
        returnVal.latch = latch;
        returnVal.wrappedCallables = wrapped;
        return returnVal;
    }

    public static class LatchedCallables<T> {
        public CountDownLatch latch;
        public List<CountdownedCallable<T>> wrappedCallables;
    }

    public static class CountdownedCallable<T> implements Callable<T> {
        private final Callable<T> wrapped;
        private final CountDownLatch latch;
        private T result;

        public CountdownedCallable(Callable<T> wrapped, CountDownLatch latch) {
            this.wrapped = wrapped;
            this.latch = latch;
        }

        @Override
        public T call() throws Exception {
            try {
                result = wrapped.call();
                return result;
            } finally {
                latch.countDown();
            }
        }

        public T getResult() {
            return result;
        }
    }

}
View Code

4、创建一个测试类

使用ThreadPoolExecutor 创建线程池,完成并行操作
package com.hao;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;

import com.hao.bean.Employee;
import com.hao.service.EmployeeService;
import com.hao.service.ExecutorContext;

public class ExecutorTest extends BaseTest {

    @Autowired
    ExecutorContext executorContext;
    
    @Autowired
    EmployeeService employeeService;

    @Test
    public void test01() {
        long t0 = System.currentTimeMillis();
        List<Employee> employees = new ArrayList<Employee>();
        try {
            List<Callable<Integer>> calls = new ArrayList<Callable<Integer>>();
            Callable<Integer> able1 = new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    Thread.sleep(5000);
                    Employee employee = employeeService.getById(1L);
                    employees.add(employee);
                    return 1;
                }

            };
            calls.add(able1);
            Callable<Integer> able2 = new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    Thread.sleep(5000);
                    Employee employee = employeeService.getById(2L);
                    employees.add(employee);
                    return 2;
                }

            };
            calls.add(able2);
            Callable<Integer> able3 = new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    Thread.sleep(5000);
                    Employee employee = employeeService.getById(3L);
                    employees.add(employee);
                    return 3;
                }

            };
            calls.add(able3);

            executorContext.waitAllCallables(calls, 5000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        for (Employee employee : employees) {
            System.out.println(employee);
        }
        System.out.println(System.currentTimeMillis() - t0);
    }

}
View Code

5、执行结果如下

使用ThreadPoolExecutor 创建线程池,完成并行操作

 

 次工具类的好处在于能够像使用普通 service一样使用线程池完成并行操作,当然不要忘记将 ExecutorContext 置于能被sping扫描到的地方,

否则不能直接使用@Autowired 依赖注入

 

上一篇:6.1.1 局部对象-局部静态对象


下一篇:OKHttp源码分析 part 1