初始化线程的四种办法
- 继承Thread类
- 实现Rannable 接口
- 实现callable 接口 + FutureTask (可以获取处理结果和处理异常)
- 线程池
继承Thread实现
public class ThreadTest {
public static void main(String[] args) {
System.out.println("主线程启动");
Thread1 thread1 = new Thread1();
thread1.start();
System.out.println("主线程结束");
}
public static class Thread1 extends Thread{
@Override
public void run() {
System.out.println("===========================");
System.out.println("当前线程---->"+Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("当前线程处理结果---->"+i);
System.out.println("===========================");
}
}
}
执行结果:
实现Runnable接口实现
public class ThreadTest {
public static void main(String[] args) {
System.out.println("主线程启动");
new Thread(new Thread1()).start();
System.out.println("主线程结束");
}
public static class Thread1 implements Runnable{
@Override
public void run() {
System.out.println("===========================");
System.out.println("当前线程---->"+Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("当前Runnable线程处理结果---->"+i);
System.out.println("===========================");
}
}
}
执行结果:
实现Runnable实现
可以使用FatureTask接收异步线程的执行结果,只有所有线程执行完才能得到结果,是阻塞等待
public class ThreadTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程启动");
FutureTask<Integer> futureTask = new FutureTask<>(new Thread1());
new Thread(futureTask).start();
//阻塞等待,当所有线程执行完才能得到结果
//等待整个线程执行完,获取执行结果
Integer integer = futureTask.get();
System.out.println("futureTask执行结果:"+integer);
System.out.println("主线程结束");
}
public static class Thread1 implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("===========================");
System.out.println("当前线程---->"+Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("当前Callable线程处理结果---->"+i);
System.out.println("===========================");
return i;
}
}
}
执行结果:
线程池
为什么使用线程池:
降低资源消耗:通过重复利用已经创建好的线程,减少线程创建销毁的损耗。
提高响应速度:因为线程池线程没有超过最大线程数时,有的线程处于空闲等待任务状态,当任务进来无需创建新的线程就能执行
提高线程的可管理性:线程池会根据系统的特点会对线程池内线程进行优化处理,减少创建和销毁带来的系统开销,无限的创建和销毁线程不仅损耗系统性能,还降低系统的稳定性,使用线程池来进行统一的管理分配
在我们的业务代码中,都不用以上三种但是,因为可能会导致我们的资源耗尽,这将会是没有更多的资源来处理核心业务。应该将所有的多线程异步任务都交给线程池,这样只有一个执行完才能继续拿到线程资源执行下一个任务,这样能有效的控制我们的资源。
创建线程池的方式
//固定长度的线程池
ExecutorService service = Executors.newFixedThreadPool(10);
service.execute(new Thread(()-> System.out.println("异步线程执行")));
这种方式有常见的四种线程池
- Executors.newCachedThreadPool():创建一个可缓存线程池,如果线程池长度超过需要处理,可灵活的回收空闲线程,没有则创建新线程
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- Executors.newFixedThreadPool():创建一个固定大小的线程池,可控制线程的最大并发量,超出的将会在队列中等待。核心线程数等于最大线程数
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- Executors.newScheduledThreadPool():创建一个定时线程池,支持定时或周期性的执行任务
- Executors.newSingleThreadExecutor():创建一个单线程的线程池,后台从队列中获取任务,挨个执行
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- int corePoolSize:核心线程数,线程池创建好就准备就绪的线程数,等待异步任务执行,这些线程将会一直存在,除非allowCoreThreadTimeOut
- int maximumPoolSize:最大线程数,用来控制资源并发
- long keepAliveTime:存活时间,当线程数大于核心时,这是多余空闲线程在终止前等待新任务的最长时间,超过存活时间将会释放空闲线程
- TimeUnit unit:时间单位,存活时间的时间单位
- BlockingQueue workQueue:阻塞队列,如果任务有很多,查出最大线程数,多余的任务将会放入队列,等待线程空闲从队列中取出并继续执行
- ThreadFactory threadFactory:线程的创建工厂
- RejectedExecutionHandler handler:拒绝处理控制器,如果队列的满了,按照我们指定的拒绝策略来拒绝执行任务
工作顺序:
- 线程池创建,准备好核心线程数的核心线程,准备接收任务
- 如果核心线程已经满了,后续进来的任务将放到阻塞队列中等待,当核心线程执行完以后就会从队列中获取任务执行
- 如果阻塞队列满了,将会启动新的线程执行任务,最大数不能超过最大线程数,来进行资源控制
- 如果最大线程数也用完,将会执行我们的拒绝策略RejectedExecutionHandler来拒绝任务
- 如果最大线程数都执行完,有很多空闲的线程,这些线程将会在执行的存活之间过后进行释放,释放最大线程数-核心线程数,核心线程会一直存在
创建线城池
创建一个核心数为10、最大线程数为100、存活时间为10秒,JUC包下LinkedBlockingDeque长度为1000的阻塞队列,JUC下默认的线程工厂defaultThreadFactory,ThreadPoolExecutor.AbortPolicy()直接丢弃的拒绝策略的线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
10,
100,
10,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(1000),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
比如一个线程池,核心线程8、最大线程20、阻塞队列50,如果此时进来100个请求进来将会怎么分配。
首先核心线程直接执行8个,这时候不会直接开其他线程,将会放到队列50个,队列满了以后将会开启12个线程,其他的任务都会被丢弃。
如果不想直接被丢弃,可以采用DiscardPolicy这种拒绝策略,这将会直接执行线程的run()方法,是同步执行
区别
继承Thread、实现Rannable接口创建出来的线程都不能得到返回值,Callable可以获取返回值
以上三种都不能有效的控制资源
使用线程池的方式来执行多线程任务可以有效控制资源,让性能更加稳定
CompletableFature异步编排
假设有三个异步任务A、 B、 C,异步任务都是同时执行,但是C任务需要A任务的执行结果来进行,这种情况就可以用到异步编排
* @author Doug Lea
* @since 1.8
*/
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
/**
* 可以看到CompletableFuture实现了Future,和我们的Callablel实现线程的FutureTask类似,
* 都实现了Fature接口,可以获取异步任务的结果
*/
public class FutureTask<V> implements RunnableFuture<V> {
public interface RunnableFuture<V> extends Runnable, Future<V> {
CompletableFature提供了四个方法给我们执行异步操作
/**
* 没有返回值的
*/
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
/**
* 指定线程池的
*/
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}
/**
* 有返回值的
*/
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
/**
* 指定线程池的
*/
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
runAsync测试
public class ThreadTest {
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
10,
100,
10,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(1000),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程启动");
CompletableFuture.runAsync(()->{
int i = 10 / 5;
System.out.println(i);
},executor);
System.out.println("主线程结束");
}
}
supplyAsync测试
public class ThreadTest {
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
10,
100,
10,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(1000),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程启动");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int i = 10 / 5;
System.out.println(i);
return i;
}, executor);
Integer integer = future.get();
System.out.println("异步执行结果:"+integer);
System.out.println("主线程结束");
}
}
线程执行完回调
public class ThreadTest {
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
10,
100,
10,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(1000),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程启动");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int i = 10 / 0;
return i;
}, executor).whenComplete((result,exception)->{
System.out.println("线程执行结果:"+result);
System.out.println("线程执行异常:"+exception);
}).exceptionally(throwable -> {
return 0;
});
Integer integer = future.get();
System.out.println("异步执行结果:"+integer);
System.out.println("主线程结束");
}
}
链式调用whenComplete只能感知是否发生异常,并不能处理异常
使用handle来处理结果
public class ThreadTest {
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
10,
100,
10,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(1000),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程启动");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int i = 10 / 2;
return i;
}, executor).handle((result,exception)->{
if (result != null){
return result;
}
if (exception != null){
return 0;
}
return 0;
});
Integer integer = future.get();
System.out.println("异步执行结果:"+integer);
System.out.println("主线程结束");
}
}
线程串行化
/**
* 当一个线程依赖上一个线程时,获取上一个线程返回结果,并返回当前线程的结果
* A B两个线程,B 依赖 A 的返回结果时,B 可以获取 A 的返回结果拿过来计算,并返回 B 线程的执行结果
*/
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
return uniApplyStage(asyncPool, fn);
}
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor) {
return uniApplyStage(screenExecutor(executor), fn);
}
//===============================================================================
/**
* 当一个线程依赖上一个线程时,获取上一个线程返回结果,并拿来计算,当前线程并无返回结果
* A B 两个线程,B 可以拿到 A 线程的返回结果,并用 A 的返回结果来计算操作,B 没有返回结果
*/
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
return uniAcceptStage(asyncPool, action);
}
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor) {
return uniAcceptStage(screenExecutor(executor), action);
}
//===============================================================================
/**
* 只要上面的任务执行完成,就执行ThenRun()方法,只是一个后续操作
*/
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(asyncPool, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action,
Executor executor) {
return uniRunStage(screenExecutor(executor), action);
}
带有Async的都是异步的,就是另外开一个线程,不带的都是和上一个线程公用一个线程
thenRun测试
thenRun:没有返回值,不能过去到上个任务的执行结果
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程启动");
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
int i = 10 / 2;
System.out.println("任务一执行完成");
return i;
}, executor).thenRunAsync(() -> {
System.out.println("任务二执行完成");
}, executor);
System.out.println("主线程结束");
}
thenAccept测试
thenAccept:可以获取到上一任务的执行结果,没有返回值
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程启动");
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
int i = 10 / 2;
System.out.println("任务一执行完成");
return i;
}, executor).thenAcceptAsync(res -> {
System.out.println("任务一执行结果:"+res);
},executor);
System.out.println("主线程结束");
}
thenApply测试
thenApply:可以接收到上一个任务的执行结果,并且也有返回值
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程启动");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int i = 10 / 2;
System.out.println("任务一执行完成");
return i;
}, executor).thenApplyAsync(res -> {
System.out.println("任务一执行结果:" + res);
return res;
}, executor);
System.out.println(future.get());
System.out.println("主线程结束");
}
两个任务组合,都要完成
当两个CompletableFuture组合完成之后,如果还需继续执行其他任务,可以使用任务组合,必须等待前两个任务执行完才能继续执行
runAfterBoth():当两个任务执行完成之后,继续执行第三个任务,不能获取其他任务的返回结果,没有返回值
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程启动");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int i = 10 / 2;
System.out.println("任务一执行完成");
return i;
}, executor);
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务二执行完成");
return "hello world";
});
CompletableFuture<Void> voidCompletableFuture = future.runAfterBothAsync(future1, () -> {
System.out.println("任务三执行完成");
}, executor);
System.out.println(future.get());
System.out.println("主线程结束");
}
thenAcceptBoth():当两个任务执行完成之后,继续执行第三个任务,可以获取前两个任务的执行结果,没有返回值
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程启动");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int i = 10 / 2;
System.out.println("任务一执行完成");
return i;
}, executor);
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务二执行完成");
return "hello world";
});
CompletableFuture<Void> voidCompletableFuture = future.thenAcceptBothAsync(future1, (f1,f2) -> {
System.out.println("任务一的执行结果:"+f1);
System.out.println("任务二的执行结果:"+f2);
System.out.println("任务三执行完成");
}, executor);
System.out.println(future.get());
System.out.println("主线程结束");
}
thenCombine():当两个任务执行完成之后,继续执行第三个任务,可以获取前两个任务的执行结果,有返回值
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程启动");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int i = 10 / 2;
System.out.println("任务一执行完成");
return i;
}, executor);
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务二执行完成");
return "hello world";
});
CompletableFuture<String> future2 = future.thenCombineAsync(future1, (f1, f2) -> {
System.out.println("任务一的执行结果:" + f1);
System.out.println("任务二的执行结果:" + f2);
System.out.println("任务三执行完成");
return f1 + f2;
}, executor);
System.out.println("任务三执行结果"+future2.get());
System.out.println("主线程结束");
}
两个任务组合,一个完成
当两个CompletableFuture组合完成之后,如果还需继续执行其他任务,可以使用任务组合,只需等待两个中的其中一个执行完成,就可以执行其他任务
runAfterEither():当两个任务其中一个执行完成之后,就执行第三个任务,不能获取其他任务的返回结果,没有返回值
acceptEither():当两个任务其中一个执行完成之后,就执行第三个任务,可以获取其他任务的返回结果,没有返回值
acceptEither():当两个任务其中一个执行完成之后,就执行第三个任务,可以获取其他任务的返回结果,有返回值
多任务组合
CompletableFuture.allOf(future, future1):等待所有的任务执行完成
CompletableFuture.anyOf():只要有一个任务执行完就结束
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程启动---"+ LocalDateTime.now());
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("任务一执行完成");
}, executor);
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务二执行完成");
}, executor);
CompletableFuture<Void> allOf = CompletableFuture.allOf(future, future1);
allOf.get();//等待所有任务执行完
System.out.println("主线程结束---"+ LocalDateTime.now());
}