Java 并发编程学习笔记(16) ----CompletionService

CompletionService

1.1 CompletionService 的功能

CompletionService的功能就是以异步的方式,一边产生新的任务,一边处理已完成任务的结果。 
CompletionService主要解决一个什么问题呢? 
    Future 接口调用get()方法取得处理的结果,但是这个方法是阻塞性的,如果调用get()
方法时,任务尚未执行完成,get方法会一直阻塞到此任务执行完成为止。这样的后果就是一旦前面
的任务耗时太长,后面的任务的get()方法就会排队等待,影响相率。
    使用CompletionService可以按照完成这些任务的时间顺序处理他们的结果。

1.2 代码


package com.lhc.concurrent.completion.nonBlock;

import com.google.common.collect.Lists;

import java.util.ArrayList;
import java.util.concurrent.*;

public class MyCallable implements Callable<String>{
    private String userName;
    private long sleepTime;

    public MyCallable(String userName, long sleepTime) {
        this.userName = userName;
        this.sleepTime = sleepTime;
    }

    @Override
    public String call() throws Exception {
        System.out.println(userName);
        Thread.sleep(sleepTime);
        return "return " + sleepTime;
    }

    public static void main(String[] args) throws Exception{
        MyCallable name1 = new MyCallable("name1", 1050);
        MyCallable name2 = new MyCallable("name2", 1040);
        MyCallable name3 = new MyCallable("name3", 1030);
        MyCallable name4 = new MyCallable("name4", 1020);
        MyCallable name5 = new MyCallable("name5", 1010);

        ArrayList<Callable> callables = Lists.newArrayList();
        callables.add(name1);
        callables.add(name2);
        callables.add(name3);
        callables.add(name4);
        callables.add(name5);

        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 5,
                TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());
        ExecutorCompletionService completionService = new ExecutorCompletionService(executor);

        for (int i = 0; i < callables.size(); i++) {
            completionService.submit(callables.get(i));
        }

        for (int i = 0; i < 5; i++){
            /**
             * 哪个任务先执行完,那个任务的返回值就先打印,解决了Future 阻塞的特点
             * 但是如果没有任何任务被执行完,则.take().get()方法还是呈阻塞特性。
             */
            System.out.println(completionService.take().get());
        }
    }
}

1.3测试结果

name1
name5
name2
name3
name4
return 1010
return 1020
return 1030
return 1040
return 1050

2.1 Future submit(Runnable task, V result)方法

另外一种使用姿势

2.2 代码


public class User {
    private String name;
    private String password;

    public User(String name, String password) {
        super();
        this.name = name;
        this.password = password;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }
}


package com.lhc.concurrent.completion.submit;

import java.util.concurrent.*;

public class MyRunnable implements Runnable{
    private User user;

    public MyRunnable(User user) {
        super();
        this.user = user;
    }

    @Override
    public void run() {
        user.setName("jt");
        user.setPassword("pwd");
        System.out.println("running end");
    }

    public static void main(String[] args) throws Exception{
        User user = new User("name", "pwd");
        MyRunnable myRunnable = new MyRunnable(user);

        ExecutorService executorService = Executors.newCachedThreadPool();
        ExecutorCompletionService completionService = new ExecutorCompletionService(executorService);
        Future<User> submit = completionService.submit(myRunnable, user);
        System.out.println(submit.get().getName() + " " + submit.get().getPassword());
    }
}


2.3 运行结果

running end
jt pwd

上一篇:零基础如何学Python爬虫技术?


下一篇:浅析Java CompletionService