[并发编程]并发编程第二篇:利用并发编程,实现计算大量数据的和

利用并发编程,实现计算大量数据的和

实现代码:

package tj.pojo.generate.main;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

public class ConcurrentCalculator {

    private ExecutorService exec;
    private int cpuCoreNumber;
    private List<Future<Long>> tasks = new ArrayList<Future<Long>>();

    // 内部类
    class SumCalculator implements Callable<Long> {
        private int[] numbers;
        private int start;
        private int end;

        public SumCalculator(final int[] numbers, int start, int end) {
            this.numbers = numbers;
            this.start = start;
            this.end = end;
        }
        @Override
        public Long call() throws Exception {
            Long sum = 0L;
            for (int i = start; i < end; i++) {
                sum += numbers[i];
            }
            System.out.println(String.format("%s~%s的和为%s", start, end, sum));
            return sum;
        }

    }

    public ConcurrentCalculator() {
        cpuCoreNumber = Runtime.getRuntime().availableProcessors();
        System.out.println("CPU核心数:" + cpuCoreNumber);
        exec = Executors.newFixedThreadPool(cpuCoreNumber);

    }

    public Long sum(final int[] numbers) {
        for (int i = 0; i < cpuCoreNumber; i++) {
            int increment = numbers.length / cpuCoreNumber + 1;
            int start = increment * i;
            int end = start + increment;
            if (end > numbers.length) {
                end = numbers.length;
            }
            SumCalculator task = new SumCalculator(numbers, start, end);

            FutureTask<Long> future = new FutureTask<Long>(task);
            tasks.add(future);
            System.out.println("添加一个任务,总任务数为:" + tasks.size());
            if (!exec.isShutdown()) {
                exec.submit(future);
                // ExecutoreService提供了submit()方法,传递一个Callable,或Runnable,返回Future。
                // exec.submit(task);
            }
        }
        System.out.println("任务分配完成,总任务数为:" + tasks.size());
        return getResult();
    }

    public Long getResult() {
        Long sums = 0L;
        for (Future<Long> task : tasks) {
            try {
                Long sum = task.get();
                sums += sum;
                System.out.println("当前总合计:" + sums);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        return sums;
    }

    public void close() {
        exec.shutdown();
    }
}

其中,在代码的第62行~第64行,由于不了解ExecutoreService.submit(Runnable task)方法的功能。

同时FutureTask<Long> future和SumCalculator task都实现了Runnable接口,造成代码调用时,进程一直不结束。

传递了FutureTask<Long> future才正确执行。

                exec.submit(future);
                // ExecutoreService提供了submit()方法,传递一个Callable,或Runnable,返回Future。
                // exec.submit(task);

测试方法:

public static void test() {
        int[] numbers = new int[100];
        for (int i = 0; i < 100; i++) {
            numbers[i] = i + 1;
        }
        tj.pojo.generate.main.ConcurrentCalculator cc = new tj.pojo.generate.main.ConcurrentCalculator();
        Long sum = cc.sum(numbers);
        System.out.println("1~100的和为" + sum);
        cc.close();
    }

 

FutureTask的实现代码:

public class FutureTask<V> implements RunnableFuture<V>

FutureTask类实现了RunnableFuture接口,RunnableFuture接口的实现代码:

public interface RunnableFuture<V> extends Runnable, Future<V> {

    void run();

}

可以看出RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。

所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

事实上,FutureTask是Future接口的一个唯一实现类。

并发编程的两种实现形式:

1):使用Callable+Future获取执行结果

public class Test {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        Future<Integer> result = executor.submit(task);
        executor.shutdown();
         
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
         
        System.out.println("主线程在执行任务");
         
        try {
            System.out.println("task运行结果"+result.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
         
        System.out.println("所有任务执行完毕");
    }
}
class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        System.out.println("子线程在进行计算");
        Thread.sleep(3000);
        int sum = 0;
        for(int i=0;i<100;i++)
            sum += i;
        return sum;
    }
}

2):使用Callable+FutureTask获取执行结果

public class Test {
    public static void main(String[] args) {
        //第一种方式
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        executor.submit(futureTask);
        executor.shutdown();
         
        //第二种方式,注意这种方式和第一种方式效果是类似的,只不过一个使用的是ExecutorService,一个使用的是Thread
        /*Task task = new Task();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        Thread thread = new Thread(futureTask);
        thread.start();*/
         
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
         
        System.out.println("主线程在执行任务");
         
        try {
            System.out.println("task运行结果"+futureTask.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
         
        System.out.println("所有任务执行完毕");
    }
}
class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        System.out.println("子线程在进行计算");
        Thread.sleep(3000);
        int sum = 0;
        for(int i=0;i<100;i++)
            sum += i;
        return sum;
    }
}

 

作者: Candyメ奶糖

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
博文来源广泛,如原作者认为我侵犯知识产权,请尽快给我发邮件 359031282@qq.com联系,我将以第一时间删除相关内容。

上一篇:[教程]在CentOS7上配置 FTP服务器 Proftpd 支持 MySQL 虚拟用户加密认证以及磁盘限额(Quota)


下一篇:《Java 7并发编程实战手册》第五章Fork/Join框架