呵呵,昨天看到两新粉,一激动,就想着今天来写这篇文章。
其实一直在关注这个领域,但是一直没有信心来写,所以一直期望着有一个开源的来用。
看到了彭渊大师的淘宝分布式框架Fourinone介绍,确实有一种相见恨晚的感觉,于是就准备去研究一番,详细见本人的感想文章由fourinone初步学习想到的,确实来说,感觉到有一种啃不动的感觉,当然也可能是本人水平不足的原因所致。但是不管怎么说,促动了本人来写一个简单的并行计算框架。
在此引用本人的名言:“牛人的代码就是生手也一看就懂;生手的代码就是牛人来了也看不懂。”
好的,亲们,不管你是生手还是牛人,let's GO!
HelloWorld之一
当然,还是从Hello说起,不过这次的hello与之前不太一样,管呢,先看看再说:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
public class WorkerHello extends AbstractWorker {
public WorkerHello() throws RemoteException {
super ( "hello" );
}
public Warehouse doWork(Work work) throws RemoteException {
String name = work.getInputWarehouse().get( "name" );
System.out.println(String.format( "id %s: Hello %s" , getId(), name));
Warehouse outputWarehouse = new WarehouseDefault();
outputWarehouse.put( "helloInfo" , "Hello," + name);
return outputWarehouse;
}
} |
首先,工人Hello继承自抽象工人,也就是说他首先得是个工人,然后呢是个Hello工人。
在它的构造函数中,抛出一个RemoteException,表明,它是可以被远程调用的工人,在构造方法中调用super("hello"),表明这个工人是个干hello活的工人。
既然是工人么,因此当然得做工作了。
首先从工作的的仓库中取出一个叫name的字符串,然后控制台打一下,然后构建了一个输出的仓库,在里面放了一个helloInfo的字符串,然后返回输出仓库,工人的任务就算完成了。
下面看看示例代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
JobCenter jobCenter = new JobCenterLocal();
for ( int i = 0 ; i < 5 ; i++) {
jobCenter.registerWorker( new WorkerHello());
}
Foreman helloForeman = new ForemanSelectOneWorker( "hello" );
jobCenter.registerForeman(helloForeman);
Warehouse inputWarehouse = new WarehouseDefault();
inputWarehouse.put( "name" , "world" );
Work work = new WorkDefault( "hello" , inputWarehouse);
Warehouse outputWarehouse = jobCenter.doWork(work);
System.out.println(outputWarehouse.get( "helloInfo" ));
jobCenter.stop();
}
|
首先开个职业介绍所,然后构建一了5个Hello工人,放在注册到职业介绍所去。
然后又注册了一个专门干hello活的包工头到职业介绍所,这个包工头有点特别,随便找一个hello工人来干hello这个活。
然后,构建了一个工作,介个工作是个hello工作,它的来料仓库里放了个name是world的值。
然后他就对职业介绍所说,你帮咱把这个活干干。
活干完了,也没有发生异常,顺利的在结果仓库里找到了helloInfo这个值,并且从控制台打出。
下面是运行结果:
1
2
|
id 46fbffdeb18b45f28cda4617795c2a52: Hello world Hello,world |
从上面的例子当中,我们理解了下面几个概念:
1
2
3
4
5
6
7
8
|
职业介绍所:JobCenter,主要用于注册工人,注册包工头,接受或处理任务; 包工头:领取工作并招募工人,完成工作,并返回结果 工人:就是我们常说的民工了,只知道来料加工,处于生态环境的低层,最后还没有得工资 工作:只有工作类型和来料仓库 仓库:用于放各种来料或成品 |
职业介绍所,一般来说不用写,框架已经提供;工作,一般来说不用写;工头,绝大多数不需要写,框架已经提供了若干类型工头,一般够用了;工人,一定需要写。
自此,简单的hello并行计算就算完成了。
HelloWorld之二
上面的hello工作完成之后,老板突发齐想,一个hello吼得声音太小了,偶想让所有的工人都帮偶齐声喊一起Hello,World,那该多壮观,当然老板有钱,说干就干:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
public static void main(String[] args) throws IOException, ClassNotFoundException {
JobCenter jobCenter = new JobCenterLocal();
for ( int i = 0 ; i < 5 ; i++) {
jobCenter.registerWorker( new WorkerHello());
}
Foreman helloForeman = new ForemanSelectAllWorker( "hello" );
jobCenter.registerForeman(helloForeman);
Warehouse inputWarehouse = new WarehouseDefault();
inputWarehouse.put( "name" , "world" );
Work work = new WorkDefault( "hello" , inputWarehouse);
jobCenter.doWork(work);
jobCenter.stop();
} |
当然,这次的包工头换了一下,这个包工头会找所有的工人来干活,结果如下:
1
2
3
4
5
|
id 83274d8f8c194bb89d773c232e867cc4: Hello world id 16fbf219d3cf4ba48eef23c260de509a: Hello world id 9c17a119a4f341d68b589a503712b0f9: Hello world id e7e3b2bdc9444a179ad62abdd35275e1: Hello world id 4b12a1b70f5d43e2bff473382096dfbe: Hello world |
老板一看,尼妈,这帮工人喊是喊完了,这声音就响过(用的是System.out)就没有了,也不知道有几个工人给喊过,包工头说哦,我没有干这收集数据的活,你想要呀,你想要就吱声呀,我加个结果合并给你:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
public static void main(String[] args) throws IOException, ClassNotFoundException {
JobCenter jobCenter = new JobCenterLocal();
for ( int i = 0 ; i < 5 ; i++) {
jobCenter.registerWorker( new WorkerHello());
}
Foreman helloForeman = new ForemanSelectAllWorker( "hello" , new HelloWorkCombiner());
jobCenter.registerForeman(helloForeman);
Warehouse inputWarehouse = new WarehouseDefault();
inputWarehouse.put( "name" , "world" );
Work work = new WorkDefault( "hello" , inputWarehouse);
Warehouse outputWarehouse = jobCenter.doWork(work);
List<String> result = outputWarehouse.get( "helloInfo" );
System.out.println(result.size());
jobCenter.stop();
} |
Hello结果收集器,用于把工人干的活合并成一个结果出来:
1
2
3
4
5
6
7
8
9
10
11
12
|
public class HelloWorkCombiner implements WorkCombiner {
public Warehouse combine(List<Warehouse> warehouseList) throws RemoteException {
Warehouse warehouse = new WarehouseDefault();
List<String> helloList = new ArrayList<String>();
for (Warehouse w : warehouseList) {
helloList.add((String) w.get( "helloInfo" ));
}
warehouse.put( "helloInfo" , helloList);
return warehouse;
}
} |
老板终于称心如意了。
分布式求和
老板消停了一下下,又想,偶想知道从1加到10000这个结果值是多少。但是一个计算机算,算得太慢了,能不能多几台机器帮我看看,让我早些知道结果?(仅用于说明原理,你可以理解为从1加到10000需要几个小时)
首先造个工人:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
public class WorkerSum extends AbstractWorker {
public WorkerSum() throws RemoteException {
super ( "sum" );
}
public Warehouse doWork(Work work) throws RemoteException {
long start = (Long) work.getInputWarehouse().get( "start" );
long end = (Long) work.getInputWarehouse().get( "end" );
long sum = 0 ;
for ( long i = start; i <= end; i++) {
sum += i;
}
Warehouse outputWarehouse = new WarehouseDefault();
outputWarehouse.put( "sum" , sum);
return outputWarehouse;
}
} |
工人从来料仓库获取开始和结束,然后计算合计值并放在输出仓库中的sum值域中。
但是这活该怎么分给工人呢,工人算完的结果又怎么合并呢?
这个时候,就需要搞个工作分解合并器给包工头用了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
public class SumSplitterCombiner implements WorkSplitterCombiner {
public List<Warehouse> split(Work work, List<Worker> workers) throws RemoteException {
List<Warehouse> list = new ArrayList<Warehouse>();
long start = (Long) work.getInputWarehouse().get( "start" );
long end = (Long) work.getInputWarehouse().get( "end" );
long count = end - start + 1 ;
long step = count / workers.size();
for ( int i = 0 ; i < workers.size(); i++) {
Warehouse subInputWarehouse = new WarehouseDefault();
subInputWarehouse.put( "start" , step * i + start);
if (i == workers.size() - 1 ) {
subInputWarehouse.put( "end" , end);
} else {
subInputWarehouse.put( "end" , step * (i + 1 ));
}
list.add(subInputWarehouse);
}
return list;
}
public Warehouse combine(List<Warehouse> warehouseList) throws RemoteException {
Warehouse outputWarehouse = new WarehouseDefault();
long sum = 0 ;
for (Warehouse w : warehouseList) {
sum += (Long) w.get( "sum" );
}
outputWarehouse.put( "sum" , sum);
return outputWarehouse;
}
} |
一共两方法,一个分解方法,一个合并方法,非常容易理解。
万事具备,呵呵,开工:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
public class Test {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
JobCenter jobCenter = new JobCenterLocal();
JobCenter center = new JobCenterRemote();
for ( int i = 0 ; i < 5 ; i++) {
center.registerWorker( new WorkerSum());
}
Foreman helloForeman = new ForemanSelectAllWorker( "sum" , new SumSplitterCombiner());
center.registerForeman(helloForeman);
Warehouse inputWarehouse = new WarehouseDefault();
inputWarehouse.put( "start" , 1l);
inputWarehouse.put( "end" , 10000l);
Work work = new WorkDefault( "sum" , inputWarehouse);
Warehouse outputWarehouse = center.doWork(work);
System.out.println(outputWarehouse.get( "sum" ));
jobCenter.stop();
center.stop();
}
} |
注意,输入仓库是两个长整型数,因此,下面两句最后的值是1-10000,而不是11~100001
1
2
|
inputWarehouse.put( "start" , 1l);
inputWarehouse.put( "end" , 10000l);
|
1
|
50005000 |
多阶段任务
当然,简单的任务都是一下就干完了的,复杂的工作就需要分成多个阶段进行了。不同的阶段需要的包工头或工人又都是不一定相同的。对于解决这种类型的任务,咱也有相当简单的解决办法。
先造个工人:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
public class WorkerHello extends AbstractWorker {
public WorkerHello() throws RemoteException {
super ( "hello" );
}
public Warehouse doWork(Work work) throws RemoteException {
String name = work.getInputWarehouse().get( "name" );
System.out.println(String.format( "id %s: Hello %s" , getId(), name));
Warehouse outputWarehouse = new WarehouseDefault();
outputWarehouse.put( "name" , name + "_1" );
return outputWarehouse;
}
} |
这个工人有点怪,每次都是给名字后面附加一个"_1",然后原样返回。别的没有啥子不同。
EN,然后来做做一系列的工作:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
public class TestSerialWork {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
JobCenter jobCenter = new JobCenterLocal();
for ( int i = 0 ; i < 5 ; i++) {
jobCenter.registerWorker( new WorkerHello());
}
Foreman helloForeman = new ForemanSelectOneWorker( "hello" );
jobCenter.registerForeman(helloForeman);
Warehouse inputWarehouse = new WarehouseDefault();
inputWarehouse.put( "name" , "world" );
Work work = new WorkDefault( "hello" , inputWarehouse);
work.setNextWork( new WorkDefault( "hello" )).setNextWork( new WorkDefault( "hello" ));
Warehouse warehouse = jobCenter.doWork(work);
System.out.println(warehouse.get( "name" ));
jobCenter.stop();
}
} |
与前面的例子唯一的不同就是
1
|
work.setNextWork( new WorkDefault( "hello" )).setNextWork( new WorkDefault( "hello" ));
|
这里通过指定下一工作,来建立了一个系列工作,这里定义的工作是三步,下面是运行结果:
1
2
3
4
|
id 2a53a967e3b84289beb3dbaf12a7d8be: Hello world id e3d471c27e264a1a87cf263605bfe9bd: Hello world_1 id 2a53a967e3b84289beb3dbaf12a7d8be: Hello world_1_1 world_1_1_1 |
运行结果与预期完全一致。
通过序列工作的方式可以把复杂的工作分解成简单的工作,而且不同的工作可以由不同的包工头和工人来完成。
圆周率计算
1
2
3
4
5
6
7
8
|
public static void main(String[] args)
{ double pi= 0.0 ;
for ( double i= 1.0 ;i<1000000001d;i++){
pi += Math.pow(- 1 ,i+ 1 )/( 2 *i- 1 );
}
System.out.println( 4 *pi);
} |
来计算,先创建个工人:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
public class PiWorker extends AbstractWorker {
public PiWorker() throws RemoteException {
super ( "pi" );
}
@Override
protected Warehouse doWork(Work work) throws RemoteException {
long m = (Long) work.getInputWarehouse().get( "start" );
long n = (Long) work.getInputWarehouse().get( "end" );
double pi = 0 .0d;
for ( double i = m; i < n; i++) {
pi += Math.pow(- 1 , i + 1 ) / ( 2 * i - 1 );
}
work.getInputWarehouse().put( "pi" , 4 * pi);
return work.getInputWarehouse();
}
} |
再写个拆分合并器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
public class PiSplitterCombiner implements WorkSplitterCombiner {
public List<Warehouse> split(Work work, List<Worker> workers) throws RemoteException {
List<Warehouse> list = new ArrayList<Warehouse>();
long start = (Long) work.getInputWarehouse().get( "start" );
long end = (Long) work.getInputWarehouse().get( "end" );
long count = end - start + 1 ;
long step = count / workers.size();
for ( int i = 0 ; i < workers.size(); i++) {
Warehouse subInputWarehouse = new WarehouseDefault();
subInputWarehouse.put( "start" , step * i + start);
if (i == workers.size() - 1 ) {
subInputWarehouse.put( "end" , end);
} else {
subInputWarehouse.put( "end" , step * (i + 1 ));
}
list.add(subInputWarehouse);
}
return list;
}
public Warehouse combine(List<Warehouse> warehouseList) throws RemoteException {
Warehouse outputWarehouse = new WarehouseDefault();
double pi = 0d;
for (Warehouse w : warehouseList) {
pi += (Double) w.get( "pi" );
}
outputWarehouse.put( "pi" , pi);
return outputWarehouse;
}
} |
接下来是测试类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public class Test {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
JobCenter jobCenter = new JobCenterLocal();
for ( int i = 0 ; i < 10 ; i++) {
jobCenter.registerWorker( new PiWorker());
}
Foreman helloForeman = new ForemanSelectAllWorker( "pi" , new PiSplitterCombiner());
jobCenter.registerForeman(helloForeman);
Warehouse inputWarehouse = new WarehouseDefault();
inputWarehouse.put( "start" , 1l);
inputWarehouse.put( "end" , 1000000001l);
Work work = new WorkDefault( "pi" , inputWarehouse);
Warehouse outputWarehouse = jobCenter.doWork(work);
System.out.println( "pi:" +outputWarehouse.get( "pi" ));
jobCenter.stop();
}
} |
1
2
3
4
|
并行计算运行结果: time:10326ms pi: 3.141592694075038
单线程计算运行结果 time:24857ms pi: 3.1415926525880504
|
这个结果是在本人笔记本跑出来的,笔记本是4核机器,而不是4CPU机器,所以4个并行跑,并没有得到期望的1/4的时间,而是1/2.4左右的时间,因此可以得出两个结论:
结论1:通过并行计算,确实可以缩短计算时间,更好的利用CPU资源。
绪论2:4核和4C还是有显著差异的。
小结
职业介绍所,工人,工头,可以在一台计算机上的,也可以都在一台计算机上。
现在,你可以很牛掰的说,速度慢?哥给你搞个分布式计算不就快了?