问题来源
非常感谢@doctorwho的问题:
假如职业介绍所来了一批生产汽车的工作,假设生产一辆汽车任务是这样的:搭好底盘、拧4个轮胎、安装发动机、安装4个座椅、再装4个车门、最后安装顶棚。之间有的任务是可以并行计算的(比如拧4个轮胎,安装发动机和安装座椅),有的任务有前置任务(比如先装好座椅,才能装车门和顶棚)。让两组包工头组织两种类型的工作:将工人分成两种类型,即可并行计算的放在同一组内,由职业介绍所来控制A组包工头做完的任务交给B组包工头。中间环节的半成品保存到Warehouse中,是这样使用TINY框架来生产汽车么?
接下来,我就用Tiny并行计算框架来展示一下这个示例,在编写示例的时候,发现了一个BUG,这也充分体现了开源的精神与价值,再次感谢@doctorwho。
问题分析
doctorwho的问题还是比较复杂的,但是实际上道理是一样的,因此我把问题简化成下面的过程
第一步:构建底盘
第二步:并行进行安装引擎,座位和轮胎
第三步:并行进行安装门及车顶
由于我和doctorwho都不是造车行家,因此就不用纠结这么造是不是合理了,假设这么做就是合理的。
代码实现
按我前面说的过程,工人是必须要有的,因此我们首先构建工人:
第一步的底盘构建工人
1
2
3
4
5
6
7
8
9
10
11
12
13
|
public
class
StepFirstWorker
extends
AbstractWorker {
public
StepFirstWorker()
throws
RemoteException {
super
(
"first"
);
}
@Override
protected
Warehouse doWork(Work work)
throws
RemoteException {
System.out.println(String.format(
"%s 构建底盘完成."
, work.getInputWarehouse().get(
"carType"
)));
Warehouse outputWarehouse = work.getInputWarehouse();
outputWarehouse.put(
"baseInfo"
,
"something about baseInfo"
);
return
outputWarehouse;
}
}
|
由于第二步工人有好几个类型,因此再搞个第二步抽象工人:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
public
abstract
class
StepThirdWorker
extends
AbstractWorker {
public
StepThirdWorker()
throws
RemoteException {
super
(
"third"
);
}
protected
boolean
acceptMyWork(Work work) {
String workClass = work.getInputWarehouse().get(
"class"
);
if
(workClass !=
null
) {
return
true
;
}
return
false
;
}
protected
Warehouse doMyWork(Work work)
throws
RemoteException {
System.out.println(String.format(
"Base:%s "
, work.getInputWarehouse().get(
"baseInfo"
)));
System.out.println(String.format(
"%s is Ok"
, work.getInputWarehouse().get(
"class"
)));
return
work.getInputWarehouse();
}
}
|
接下来构建第二步的引擎工人:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
public
class
StepSecondEngineWorker
extends
StepSecondWorker {
public
static
final
String ENGINE =
"engine"
;
public
StepSecondEngineWorker()
throws
RemoteException {
super
();
}
public
boolean
acceptWork(Work work) {
return
acceptMyWork(work);
}
protected
Warehouse doWork(Work work)
throws
RemoteException {
return
super
.doMyWork(work);
}
}
|
第二步的座位工人:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
public
class
StepSecondSeatWorker
extends
StepSecondWorker {
public
static
final
String SEAT =
"seat"
;
public
StepSecondSeatWorker()
throws
RemoteException {
super
();
}
public
boolean
acceptWork(Work work) {
return
acceptMyWork(work);
}
protected
Warehouse doWork(Work work)
throws
RemoteException {
return
super
.doMyWork(work);
}
}
<div>
</div>
|
第二步的轮胎工人:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public
class
StepSecondTyreWorker
extends
StepSecondWorker {
public
static
final
String TYRE =
"tyre"
;
public
StepSecondTyreWorker()
throws
RemoteException {
super
();
}
public
boolean
acceptWork(Work work) {
return
acceptMyWork(work);
}
protected
Warehouse doWork(Work work)
throws
RemoteException {
return
super
.doMyWork(work);
}
}
|
同理,第三步也是大同小异的。
第三步的抽象工人类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
public
abstract
class
StepThirdWorker
extends
AbstractWorker {
public
StepThirdWorker()
throws
RemoteException {
super
(
"third"
);
}
protected
boolean
acceptMyWork(Work work) {
String workClass = work.getInputWarehouse().get(
"class"
);
if
(workClass !=
null
) {
return
true
;
}
return
false
;
}
protected
Warehouse doMyWork(Work work)
throws
RemoteException {
System.out.println(String.format(
"Base:%s "
, work.getInputWarehouse().get(
"baseInfo"
)));
System.out.println(String.format(
"%s is Ok"
, work.getInputWarehouse().get(
"class"
)));
return
work.getInputWarehouse();
}
}
|
第三步的车门工人:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
public
class
StepThirdDoorWorker
extends
StepThirdWorker {
public
static
final
String DOOR =
"door"
;
public
StepThirdDoorWorker()
throws
RemoteException {
super
();
}
public
boolean
acceptWork(Work work) {
return
acceptMyWork(work);
}
@Override
protected
Warehouse doWork(Work work)
throws
RemoteException {
return
super
.doMyWork(work);
}
}
|
第三步的车顶工人:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
public
class
StepThirdRoofWorker
extends
StepThirdWorker {
public
static
final
String ROOF =
"roof"
;
public
StepThirdRoofWorker()
throws
RemoteException {
super
();
}
public
boolean
acceptWork(Work work) {
return
acceptMyWork(work);
}
protected
Warehouse doWork(Work work)
throws
RemoteException {
return
super
.doMyWork(work);
}
}
|
以上就把工人都构建好了,我们前面也说过,如果要进行任务分解,是必须要构建任务分解合并器的,这里简单起见,只实现任务分解了。
第二部的任务分解:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
public
class
SecondWorkSplitter
implements
WorkSplitter {
public
List<Warehouse> split(Work work, List<Worker> workers)
throws
RemoteException {
List<Warehouse> list =
new
ArrayList<Warehouse>();
list.add(getWareHouse(work.getInputWarehouse(),
"engine"
));
list.add(getWareHouse(work.getInputWarehouse(),
"seat"
));
list.add(getWareHouse(work.getInputWarehouse(),
"seat"
));
list.add(getWareHouse(work.getInputWarehouse(),
"seat"
));
list.add(getWareHouse(work.getInputWarehouse(),
"seat"
));
list.add(getWareHouse(work.getInputWarehouse(),
"tyre"
));
list.add(getWareHouse(work.getInputWarehouse(),
"tyre"
));
list.add(getWareHouse(work.getInputWarehouse(),
"tyre"
));
list.add(getWareHouse(work.getInputWarehouse(),
"tyre"
));
return
list;
}
private
Warehouse getWareHouse(Warehouse inputWarehouse, String stepClass) {
Warehouse warehouse =
new
WarehouseDefault();
warehouse.put(
"class"
, stepClass);
warehouse.putSubWarehouse(inputWarehouse);
return
warehouse;
}
}
|
从上面可以看到,构建了一个引擎的仓库,4个座位仓库,4个轮胎仓库。呵呵,既然能并行,为啥不让他做得更快些?
接下来是第三步的任务分解器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public
class
ThirdWorkSplitter
implements
WorkSplitter {
public
List<Warehouse> split(Work work, List<Worker> workers)
throws
RemoteException {
List<Warehouse> list =
new
ArrayList<Warehouse>();
list.add(getWareHouse(work.getInputWarehouse(),
"door"
));
list.add(getWareHouse(work.getInputWarehouse(),
"door"
));
list.add(getWareHouse(work.getInputWarehouse(),
"door"
));
list.add(getWareHouse(work.getInputWarehouse(),
"door"
));
list.add(getWareHouse(work.getInputWarehouse(),
"roof"
));
return
list;
}
private
Warehouse getWareHouse(Warehouse inputWarehouse, String stepClass) {
Warehouse warehouse =
new
WarehouseDefault();
warehouse.put(
"class"
, stepClass);
warehouse.putSubWarehouse(inputWarehouse);
return
warehouse;
}
}
|
从上面可以看到,第三部构建了4个门仓库一个车顶仓库,同样的,可以让4个工人同时装门。
上面就把所有的准备工作都做好了,接下来就是测试方法了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
public
class
Test {
public
static
void
main(String[] args)
throws
IOException, ClassNotFoundException, InterruptedException {
JobCenter jobCenter =
new
JobCenterLocal();
for
(
int
i =
0
; i <
5
; i++) {
jobCenter.registerWorker(
new
StepFirstWorker());
}
for
(
int
i =
0
; i <
5
; i++) {
jobCenter.registerWorker(
new
StepSecondTyreWorker());
}
for
(
int
i =
0
; i <
5
; i++) {
jobCenter.registerWorker(
new
StepSecondSeatWorker());
}
for
(
int
i =
0
; i <
5
; i++) {
jobCenter.registerWorker(
new
StepSecondEngineWorker());
}
for
(
int
i =
0
; i <
5
; i++) {
jobCenter.registerWorker(
new
StepThirdDoorWorker());
}
for
(
int
i =
0
; i <
5
; i++) {
jobCenter.registerWorker(
new
StepThirdRoofWorker());
}
jobCenter.registerForeman(
new
ForemanSelectOneWorker(
"first"
));
<span></span> <span></span>jobCenter.registerForeman(
new
ForemanSelectAllWorker(
"second"
,
new
SecondWorkSplitter()));
jobCenter.registerForeman(
new
ForemanSelectAllWorker(
"third"
,
new
ThirdWorkSplitter()));
Warehouse inputWarehouse =
new
WarehouseDefault();
inputWarehouse.put(
"class"
,
"car"
);
inputWarehouse.put(
"carType"
,
"普桑"
);
WorkDefault work =
new
WorkDefault(
"first"
, inputWarehouse);
work.setForemanType(
"first"
);
WorkDefault work2 =
new
WorkDefault(
"second"
);
work2.setForemanType(
"second"
);
WorkDefault work3 =
new
WorkDefault(
"third"
);
work3.setForemanType(
"third"
);
work.setNextWork(work2).setNextWork(work3);
Warehouse warehouse = jobCenter.doWork(work);
jobCenter.stop();
}
}
|
呵呵,工人各加了5个,然后注册了三个工头,第一步的工头是随便挑一个工人类型的,第二步和第三步是挑所有工人的,同时还指定了任务分解器。
接下来就构建了一个工作,造一个高端大气上档次的普桑汽车,然后告诉职业介绍所说给我造就可以了。
下面是造车的过程,我把日志也贴上来了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
普桑 构建底盘完成.
-
234
[RMI TCP Connection(
1
)-
192.168
.
84.73
] INFO - 线程组<id:4af96b81d14a4954a6b649308d444e4c,type:second>运行开始,线程数
9
...
-
234
[id:4af96b81d14a4954a6b649308d444e4c,type:second-a763f156ffd74b5db285198d2498edcf] INFO - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-a763f156ffd74b5db285198d2498edcf>运行开始...
-
234
[id:4af96b81d14a4954a6b649308d444e4c,type:second-c2d2fb38ef6c4509b3a39b3e7d5c1d61] INFO - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-c2d2fb38ef6c4509b3a39b3e7d5c1d61>运行开始...
-
234
[id:4af96b81d14a4954a6b649308d444e4c,type:second-d624ea0a6df3409c80df6b97ab3c813b] INFO - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-d624ea0a6df3409c80df6b97ab3c813b>运行开始...
-
235
[id:4af96b81d14a4954a6b649308d444e4c,type:second-abdb57f0641a4727a9efa744d07cf2d1] INFO - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-abdb57f0641a4727a9efa744d07cf2d1>运行开始...
-
236
[id:4af96b81d14a4954a6b649308d444e4c,type:second-d6f7074f6c4a4b12bd37ec5f5c11aff8] INFO - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-d6f7074f6c4a4b12bd37ec5f5c11aff8>运行开始...
-
237
[id:4af96b81d14a4954a6b649308d444e4c,type:second-04db3f945b804500a2bbe2b9aabdce3b] INFO - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-04db3f945b804500a2bbe2b9aabdce3b>运行开始...
Base:something about baseInfo
engine is Ok
Base:something about baseInfo
seat is Ok
-
245
[id:4af96b81d14a4954a6b649308d444e4c,type:second-a763f156ffd74b5db285198d2498edcf] INFO - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-a763f156ffd74b5db285198d2498edcf>运行结束
-
246
[id:4af96b81d14a4954a6b649308d444e4c,type:second-f3efba2dc7804c6cbcd5a25f42fdc177] INFO - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-f3efba2dc7804c6cbcd5a25f42fdc177>运行开始...
-
246
[id:4af96b81d14a4954a6b649308d444e4c,type:second-abdb57f0641a4727a9efa744d07cf2d1] INFO - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-abdb57f0641a4727a9efa744d07cf2d1>运行结束
Base:something about baseInfo
seat is Ok
-
248
[id:4af96b81d14a4954a6b649308d444e4c,type:second-8c3b9359bcfa4de7b6e0492daab0d73a] INFO - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-8c3b9359bcfa4de7b6e0492daab0d73a>运行开始...
Base:something about baseInfo
tyre is Ok
-
250
[id:4af96b81d14a4954a6b649308d444e4c,type:second-f3efba2dc7804c6cbcd5a25f42fdc177] INFO - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-f3efba2dc7804c6cbcd5a25f42fdc177>运行结束
-
250
[id:4af96b81d14a4954a6b649308d444e4c,type:second-c2d2fb38ef6c4509b3a39b3e7d5c1d61] INFO - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-c2d2fb38ef6c4509b3a39b3e7d5c1d61>运行结束
Base:something about baseInfo
seat is Ok
-
252
[id:4af96b81d14a4954a6b649308d444e4c,type:second-869b573e226046aca8ad30765f1f300c] INFO - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-869b573e226046aca8ad30765f1f300c>运行开始...
-
253
[id:4af96b81d14a4954a6b649308d444e4c,type:second-8c3b9359bcfa4de7b6e0492daab0d73a] INFO - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-8c3b9359bcfa4de7b6e0492daab0d73a>运行结束
Base:something about baseInfo
seat is Ok
Base:something about baseInfo
tyre is Ok
-
257
[id:4af96b81d14a4954a6b649308d444e4c,type:second-d624ea0a6df3409c80df6b97ab3c813b] INFO - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-d624ea0a6df3409c80df6b97ab3c813b>运行结束
-
258
[id:4af96b81d14a4954a6b649308d444e4c,type:second-04db3f945b804500a2bbe2b9aabdce3b] INFO - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-04db3f945b804500a2bbe2b9aabdce3b>运行结束
Base:something about baseInfo
tyre is Ok
Base:something about baseInfo
tyre is Ok
-
262
[id:4af96b81d14a4954a6b649308d444e4c,type:second-869b573e226046aca8ad30765f1f300c] INFO - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-869b573e226046aca8ad30765f1f300c>运行结束
-
264
[id:4af96b81d14a4954a6b649308d444e4c,type:second-d6f7074f6c4a4b12bd37ec5f5c11aff8] INFO - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-d6f7074f6c4a4b12bd37ec5f5c11aff8>运行结束
-
264
[RMI TCP Connection(
1
)-
192.168
.
84.73
] INFO - 线程组<id:4af96b81d14a4954a6b649308d444e4c,type:second>运行结束, 用时:30ms
-
333
[RMI TCP Connection(
1
)-
192.168
.
84.73
] INFO - 线程组<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third>运行开始,线程数
5
...
-
334
[id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-ca58e9b733514c668a224875417c9d26] INFO - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-ca58e9b733514c668a224875417c9d26>运行开始...
-
334
[id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-17debef817e34c49996a2c38840f3de2] INFO - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-17debef817e34c49996a2c38840f3de2>运行开始...
-
334
[id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-af0e914cce89480987c6184a885770d5] INFO - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-af0e914cce89480987c6184a885770d5>运行开始...
-
334
[id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-3f7707cb2a224d3d8844a09271b24a07] INFO - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-3f7707cb2a224d3d8844a09271b24a07>运行开始...
Base:something about baseInfo
door is Ok
Base:something about baseInfo
door is Ok
Base:something about baseInfo
-
338
[id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-ca58e9b733514c668a224875417c9d26] INFO - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-ca58e9b733514c668a224875417c9d26>运行结束
door is Ok
-
339
[id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-cfccfb37ebda4279b7552f7e060b2ddb] INFO - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-cfccfb37ebda4279b7552f7e060b2ddb>运行开始...
-
338
[id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-17debef817e34c49996a2c38840f3de2] INFO - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-17debef817e34c49996a2c38840f3de2>运行结束
Base:something about baseInfo
door is Ok
Base:something about baseInfo
-
340
[id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-3f7707cb2a224d3d8844a09271b24a07] INFO - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-3f7707cb2a224d3d8844a09271b24a07>运行结束
roof is Ok
-
340
[id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-af0e914cce89480987c6184a885770d5] INFO - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-af0e914cce89480987c6184a885770d5>运行结束
-
342
[id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-cfccfb37ebda4279b7552f7e060b2ddb] INFO - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-cfccfb37ebda4279b7552f7e060b2ddb>运行结束
-
343
[RMI TCP Connection(
1
)-
192.168
.
84.73
] INFO - 线程组<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third>运行结束, 用时:10ms
|
从上面的日志可以看出:
由于第一步工作是挑一个单干的,因此是没有启用线程组的
第二步同时有9个线程干活:
1
2
3
|
-
234
[RMI TCP Connection(
1
)-
192.168
.
84.73
] INFO - 线程组<id:4af96b81d14a4954a6b649308d444e4c,type:second>运行开始,线程数
9
...
...
-
264
[RMI TCP Connection(
1
)-
192.168
.
84.73
] INFO - 线程组<id:4af96b81d14a4954a6b649308d444e4c,type:second>运行结束, 用时:30ms
|
第三步同时有5个线程干活:
1
2
3
|
-
333
[RMI TCP Connection(
1
)-
192.168
.
84.73
] INFO - 线程组<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third>运行开始,线程数
5
...
...
-
343
[RMI TCP Connection(
1
)-
192.168
.
84.73
] INFO - 线程组<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third>运行结束, 用时:10ms
|
总结:
Tiny并行计算框架确实是可以方便的解决各种复杂并行计算的问题。