Tiny并行计算框架之实现机理

简介:

当然,秉承偶的一向的观点,让新手也能看得懂。

首先看工作的接口:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public interface Work extends Serializable {
     /**
      * 返回工作类型,每个工作都有一个工作类型,包工头及工人只能处理同样类型的工作
      *
      * @return
      */
     String getType();
 
     String getId();
 
     /**
      * 返回后续步骤的工作,如果有,说明是复合工作,如果没有,说明是简单工作
      *
      * @return
      */
     Work getNextWork();
 
     /**
      * 设置后续步骤工作
      *
      * @param nextWork 后续工作
      * @return 返回后续工作
      */
     Work setNextWork(Work nextWork);
 
     /**
      * 是否需要序列化
      *
      * @return true表示工作永不丢失,false表示容器关闭即丢失
      */
     boolean isNeedSerialize();
 
     /**
      * 设置是否需要进行序列化,如果要用到MQ,则需要设置为需要序列化
      *
      * @param needSerialize true表示工作永不丢失,false表示容器关闭即丢失
      */
     void setNeedSerialize( boolean needSerialize);
 
     /**
      * 返回输入仓库
      *
      * @return
      */
     Warehouse getInputWarehouse();
 
 
     /**
      * 设置输入仓库
      *
      * @param inputWarehouse
      */
     void setInputWarehouse(Warehouse inputWarehouse);
 
     /**
      * 设置工作状态
      *
      * @param workStatus
      */
     void setWorkStatus(WorkStatus workStatus);
 
     /**
      * 获取工作状态
      *
      * @return
      */
     WorkStatus getWorkStatus();
}

是不是很简单,它的实现也是同样简单:

?
1
2
3
4
5
6
7
8
public class WorkDefault implements Work {
     private String id;
     private String type;
     private Warehouse inputWarehouse;
     private boolean needSerialize = false ;
     private Work nextStepWork = null ;
     private WorkStatus workStatus = WorkStatus.WAITING;
}

基本上就是上述属性的set,get方法。

工人的接口如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
  * 工人,用于干具体的工作
  * Created by luoguo on 14-1-8.
  */
public interface Worker extends ParallelObject {
     /**
      * 执行工作
      *
      * @return
      */
     Warehouse work(Work work) throws RemoteException;
 
     /**
      * 是否接受工作
      * 即使是同样类型的工人,有可能对工作也挑三捡四,这里给了工人一定的灵活性
      *
      * @param work
      * @return true表示接受,false表示不接受
      */
     boolean acceptWork(Work work) throws RemoteException;
 
       /**
      * 返回类型
      *
      * @return
      */
     String getType() throws RemoteException;
 
}

是不是也很简单?

下面看看工头:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
  * 包工头
  * 包工头用于带着一组工人并完成对应的任务
  * Created by luoguo on 14-1-8.
  */
public interface Foreman extends ParallelObject {
     /**
      * 返回执行哪种类型的工作任务
      *
      * @return
      */
     String getType() throws RemoteException;
 
     /**
      * 开始干活以完成工作
      */
     Warehouse work(Work work, List<Worker> workerList) throws IOException;
 
     /**
      * 设置工作合并器
      *
      * @param workCombiner
      */
     void setWorkCombiner(WorkCombiner workCombiner);
 
     /**
      * 设置工作分解器
      *
      * @param workSplitter
      */
     void setWorkSplitter(WorkSplitter workSplitter);
 
}

下面看看职业介绍所,呵呵,这个就复杂些了(方法多了)

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
/**
  * 职介所
  * 职介所是分布式处理的核心场所,所有工作相关的元素都要通过职介所进行关联
  * Created by luoguo on 14-1-8.
  */
public interface JobCenter {
     String WORK_QUEUE = "WorkQueue" ;
     String FOREMAN = "Foreman" ;
     String WORKER = "Worker" ;
 
     RmiServer getRmiServer();
 
     void setRmiServer(RmiServer rmiServer);
 
     /**
      * 注册工人
      *
      * @param worker
      */
     void registerWorker(Worker worker) throws RemoteException;
 
     /**
      * 返回工作队列对象
      *
      * @return
      */
     WorkQueue getWorkQueue();
 
     /**
      * 注消工人
      *
      * @param worker
      */
     void unregisterWorker(Worker worker) throws RemoteException;
 
     /**
      * 注册一份工作,工作情况不需要马上关注。因此也就不用等待,马上返回可以进行其它处理
      * 如果有返回结果,可以通过异步方式,异步方式可以用后续工作的方式来指定
      *
      * @param work
      */
     void registerWork(Work work) throws IOException;
 
 
     /**
      * 取消工作,在工作没有分配出去之前,可以从职介所注消工作,如果工作已经分配出去,则无法注消
      *
      * @param work
      */
     void unregisterWork(Work work) throws RemoteException;
 
     /**
      * 返回指定工作的工作状态
      *
      * @param work
      * @return
      */
     WorkStatus getWorkStatus(Work work) throws RemoteException;
 
     /**
      * 执行一项工作,期望同步得到结果或异常
      * 如果没有合适的工人或包工头进行处理,马上会抛出异常
      *
      * @param work
      */
     Warehouse doWork(Work work) throws IOException;
 
     /**
      * 注册包工头
      *
      * @param foreman
      */
     void registerForeman(Foreman foreman) throws RemoteException;
 
     /**
      * 注销包工头
      *
      * @param foreman
      */
     void unregisterForeMan(Foreman foreman) throws RemoteException;
 
     /**
      * 返回具有某种类型的空闲且愿意接受工作的工人列表
      *
      * @return
      */
     List<Worker> getIdleWorkerList(Work work);
 
 
     /**
      * 返回所有的工作列表
      *
      * @return
      */
     List<Work> getWorkList() throws RemoteException;
 
     /**
      * 返回某种类型的某种状态的工作列表
      *
      * @return
      */
     List<Work> getWorkList(String type, WorkStatus workStatus) throws RemoteException;
 
 
     /**
      * 返回组织某种工作的的空闲工头列表
      *
      * @param type
      * @return
      */
     List<Foreman> getIdleForeman(String type);
 
     /**
      * 自动进行匹配,如果有匹配成功的,则予以触发执行
      */
     void autoMatch() throws IOException;
 
     /**
      * 职业介绍所关门
      *
      * @throws RemoteException
      */
     void stop() throws RemoteException;
}



讲过了四个重要接口,现在说说实现思路:

工人、包工头都是无状态的。这有个好处是不管来多少工作,都可以进行处理;缺点是没有办法进行后续干预。

综合来说,我还是觉得无状态的比有状态的更好。

因为一开始我的实现是有状态的,甚至可以让包工头取消一个工作,包工头再让工人取消工作,但是这样会带来异常复杂的分布式状态维护。但是改成用无状态的模式,就方便多了。

在职业介绍所,有两个重要的方法,一个是doWork,即立即执行一个工作,如果找不到合适的工头和工人,就会抛出异常;另外一个是autoMatch触发职业介绍所进行一次自动匹配。之所以没有做在职业介绍所内部开启线程是为了给外部提供更方便的控制。

所以,其总体设计思想就是开个职业介绍所,工人或工头、工作就可以注册进来,由职业介绍所来撮合成合适的虚拟小组来达成任务。是不是很容易理解?

下面就是所有的接口与实现类了:

对于做并行开发的人员来说:

职业介绍所,工人,工头都不用开发,框架自带的已经足够用了。开发人员只要开发工人和工作分解合并器即可。

工人继承AbstractWorker之后,只有一个方法实现即可。工作分解一个方法,工作合并一个方法,其它的都交给Tiny并行计算框架吧。

总结:

此框架对并行计算的参与者进行了分解,分解为职业介绍所、工头、工人及工作,职业介绍所是中心,职业介绍所停止运行,将无法进行并行计算。工头、工人是可以在任意计算机的,工作是可以在任意节点添加的,由于框架提供了工作序列化功能,因此只要设置工作需要序列化是true,此工作将一直存在,直到被完成,利用此特性可以方便的实现简单的MQ。同时为方便业务实现,工人有抽象类,建议直接继承抽象类实现要关业务方法即可。

注意:目前在执行过程中工人注销或停止服务,会导致整个工作执行失败,下次继续进行执行,后续拟改成考虑用其它工人继续工作。

今天比较忙,写得比较匆忙,如果有不清楚的可以在下面提问。

相关文章
|
6月前
|
存储 人工智能 PyTorch
基于PyTorch/XLA的高效分布式训练框架
基于PyTorch/XLA的高效分布式训练框架
316 2
|
20天前
|
机器学习/深度学习 监控 PyTorch
深度学习工程实践:PyTorch Lightning与Ignite框架的技术特性对比分析
在深度学习框架的选择上,PyTorch Lightning和Ignite代表了两种不同的技术路线。本文将从技术实现的角度,深入分析这两个框架在实际应用中的差异,为开发者提供客观的技术参考。
37 7
|
27天前
|
机器学习/深度学习 并行计算 Java
谈谈分布式训练框架DeepSpeed与Megatron
【11月更文挑战第3天】随着深度学习技术的不断发展,大规模模型的训练需求日益增长。为了应对这种需求,分布式训练框架应运而生,其中DeepSpeed和Megatron是两个备受瞩目的框架。本文将深入探讨这两个框架的背景、业务场景、优缺点、主要功能及底层实现逻辑,并提供一个基于Java语言的简单demo例子,帮助读者更好地理解这些技术。
54 2
|
3月前
|
机器学习/深度学习 TensorFlow 算法框架/工具
全面解析TensorFlow Lite:从模型转换到Android应用集成,教你如何在移动设备上轻松部署轻量级机器学习模型,实现高效本地推理
【8月更文挑战第31天】本文通过技术综述介绍了如何使用TensorFlow Lite将机器学习模型部署至移动设备。从创建、训练模型开始,详细演示了模型向TensorFlow Lite格式的转换过程,并指导如何在Android应用中集成该模型以实现预测功能,突显了TensorFlow Lite在资源受限环境中的优势及灵活性。
268 0
|
4月前
|
机器学习/深度学习 TensorFlow 算法框架/工具
使用Python实现深度学习模型:跨平台模型移植与部署
【7月更文挑战第10天】 使用Python实现深度学习模型:跨平台模型移植与部署
252 1
|
6月前
|
机器学习/深度学习 并行计算 PyTorch
【多GPU炼丹-绝对有用】PyTorch多GPU并行训练:深度解析与实战代码指南
本文介绍了PyTorch中利用多GPU进行深度学习的三种策略:数据并行、模型并行和两者结合。通过`DataParallel`实现数据拆分、模型不拆分,将数据批次在不同GPU上处理;数据不拆分、模型拆分则将模型组件分配到不同GPU,适用于复杂模型;数据和模型都拆分,适合大型模型,使用`DistributedDataParallel`结合`torch.distributed`进行分布式训练。代码示例展示了如何在实践中应用这些策略。
1849 2
【多GPU炼丹-绝对有用】PyTorch多GPU并行训练:深度解析与实战代码指南
|
机器学习/深度学习 缓存 人工智能
Julia开源新框架SimpleChain:小型神经网络速度比PyTorch快5倍!
Julia开源新框架SimpleChain:小型神经网络速度比PyTorch快5倍!
205 0
|
存储 机器学习/深度学习 PyTorch
使用PyTorch Lightning构建轻量化强化学习DQN(附完整源码)(一)
使用PyTorch Lightning构建轻量化强化学习DQN(附完整源码)(一)
271 0
使用PyTorch Lightning构建轻量化强化学习DQN(附完整源码)(一)
|
机器学习/深度学习 存储 PyTorch
使用PyTorch Lightning构建轻量化强化学习DQN(附完整源码)(二)
使用PyTorch Lightning构建轻量化强化学习DQN(附完整源码)(二)
504 0
使用PyTorch Lightning构建轻量化强化学习DQN(附完整源码)(二)
|
机器学习/深度学习
cuDNN:用于深度学习的高效原语【读书笔记】
cuDNN:用于深度学习的高效原语【读书笔记】
178 0
cuDNN:用于深度学习的高效原语【读书笔记】
下一篇
无影云桌面