Giraph源码分析(三)—— 消息通信

简介: 由前文知道每个BSPServiceWorker有一个WorkerServer对象,WorkerServer对象里面又有ServerData对象,作为数据实。ServerData中包含该Worker的partitionStore、edgeStore、incomingMessageStore、currentMessageStore、聚集值等。

由前文知道每个BSPServiceWorker有一个WorkerServer对象,WorkerServer对象里面又有ServerData对象,作为数据实。ServerData中包含该Worker的partitionStore、edgeStore、incomingMessageStore、currentMessageStore、聚集值等。其中incomingMessageStore对象为MessageStoreByPartition(接口)类型,也就是说消息时按照分区来存储的。MessageStoreByPartition接口的关系图如下:

在SimpleMessageStore抽象类中,有一个ConcurrentMap>类型的变量map,用来存储消息。第一层是pairtitionID到发送到该partition消息的映射;第二层是VertexID 到发送给该Vertex的消息队列。

《Giraph通信模块分析》:http://my.oschina.net/skyaugust/blog/95182

每个顶点的消息列表具体为ExtendedDataOutput类型,它继承DataOutput接口,增加了几个方法而已。每个消息是以字节形式写入到ExtendedDataOutput对象中的。

发送消息时,采用异步式通信。

图顶点的计算处理与消息通信并发执行,在计算过程中就可以发送消息,将大规模消息发送分散在不同的时间段,避免瞬时网络通信阻塞,但是接受端需要额外的空间,存储临时接收到的消息,相当于空间换时间。而集中式通信,图顶点的计算处理与消息通信串行进行,在计算完毕后,统一发送消息,控制和实现方式简单,可在发送端对消息进行最大程度优化,但容易造成瞬时间的网络通信阻塞以及增加发送端的消息存储开销。

不同Worker间的消息通信使用RPC方式,具体为Netty。同一Worker内,连续两次迭代的消息直接通过内存操作,把要发送的消息直接复制到Worker的incomingMessageStore中。下面详述消息的存储格式和发送机制。

Giraph使用Cache来缓存消息,当消息达到一定阈值后,一次性发送。

既按照bulk模式进行,不会一条一条信息发送。向某个顶点发送的消息是按照 pair存储在ByteArrayVertexIdData中(实际为ByteArrayVertexIdMessages类型)。介绍如下: org.apache.giraph.utils.ByteArrayVertexIdData

功能:把<顶点ID,data> Pair 存储在一个 byte数组中。里面有 ExtendedDataOutput对象用来存储数据。


该类中还有一个内部类:VertexIdDataIterator,该内部类继承 VertexIdIterator类。

org.apache.giraph.comm.SendCache用来缓存发送的信息,然后以“Bulk”模式发送。在Giraph中,每个Worker上可以对应多个分区。消息缓存的阈值是以Worker为单位计算,而不是Partition。

SendCache中有ByteArrayVertexIdData[ ] dataCache数组用来存储发送给每个Partition的消息;有int[ ] dataSizes数组用于记录向每个Worker发送的消息大小,若大于MAX_MSG_REQUEST_SIZE(默认为512KB)就把此Worker上的所有Partition缓存的消息发送到给该Worker,同一Worker内消息也是如此缓存;有int[ ] initBufferSizes数组用于记录每个Worker上的每个Partition的初始化ByteArrayVertexIdData中ExtendedDataOutput对象的大小,同一Worker上的所有Partition初始值相同,该值为平均值。记MAX_MSG_REQUEST_SIZE(message request size)值为M, 该Worker上有P个 partitions,ADDTITIONNAL_MSG_REQUEST_SIZE(比平均值大的因子)默认为0.2f,记为A。则每个Partition的初始大小为:M*(1+A) / P .

由前文知道,每个Worker都有一个NettyWorkerClientRequestProcessor用来发送消息。该类中有SendMessageCache对象用来缓存向外发送的信息。NettyWorkerClientRequestProcessor类中的sendMessageRequest(I,M)

方法如下,用于向某个顶点destVertexId发送消息message。

方法解释:首先根据destVertexId得到对应的partitionId和WorkerInfo,然后把消息add到SendMessageCache中,并返回向该顶点所属Worker发送的消息大小workerMessageSize。若该值大于默认值512KB,则把此Worker对应的所有Partition消息从SendMessageCache中删除,把删除的消息赋值给workerMessages,其类型为PairList> ,key为partitionId,value为发送给该partition的消息列表,最后调用doRequest()方法发送信息。doRequest()方法如下:

可以看到在发送消息时,先判断是否在同一Worker上。如果是的话,调用SendWorkerMessagesRequest的doRequest发送消息;否则使用WorkerClient(底层使用Netty)进行消息发送。下面着重讨论同一Worker内的机制。

org.apache.giraph.comm.requests.SendWorkerMessagesRequest类中的doRequest方法如下:

参数为该Worker的ServerData,代码中的partitionVertexData实际为PairList>workerMessages。遍历来添加到ServerData中的incomingMessageStore中。

ByteArrayMessagesPerVertexStore类中的addPartitionMessages()方法如下:

当用户使用了Combiner,incomingMessageStore对应的类型则为OneMessagePerVertexStore,该类为每个顶点只存储一个消息,而非消息队列。 结构如下图:

当添加一条消息时,会把顶点已对应的消息和要添加的消息调用combine()方法进行合并,然后存储在上述结构图中。addPartitionMessages()方法如下:

在ComputeCallable中的call()方法调用computePartition(Partition)计算完所有Partition上的顶点后,调用WorkerClientRequestProcessor.flush()方法把所有剩余的消息发送出去

相关文章
|
15小时前
|
消息中间件 存储 监控
消息队列通信的优缺点
【10月更文挑战第29天】消息队列通信具有诸多优点,如解耦性强、异步通信、缓冲削峰等,能够有效地提高系统的灵活性、可扩展性和稳定性。但同时也存在一些缺点,如系统复杂性增加、性能开销、数据一致性挑战和实时性受限等。在实际应用中,需要根据具体的业务需求和场景,权衡其优缺点,合理地选择和使用消息队列通信机制,以实现系统的高效运行和优化。
|
5天前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
4月前
|
消息中间件 存储 RocketMQ
MetaQ/RocketMQ 原理问题之在解耦场景中,消息队列工作的问题如何解决
MetaQ/RocketMQ 原理问题之在解耦场景中,消息队列工作的问题如何解决
|
5月前
|
消息中间件 自然语言处理 负载均衡
RabbitMQ揭秘:轻量级消息队列的优缺点全解析
**RabbitMQ简介** RabbitMQ是源自电信行业的消息中间件,支持AMQP协议,提供轻量、快速且易于部署的解决方案。它拥有灵活的路由配置,广泛的语言支持,适用于异步处理、负载均衡、日志收集和微服务通信等场景。然而,当面临大量消息堆积或高吞吐量需求时,性能可能会下降,并且扩展和开发成本相对较高。
264 0
|
消息中间件 Java Maven
消息中间件系列教程(07) -RabbitMQ -案例代码(点对点队列模式)
消息中间件系列教程(07) -RabbitMQ -案例代码(点对点队列模式)
67 1
|
消息中间件 存储 微服务
RPC 和消息队列的区别
RPC 和消息队列的区别
316 0
|
6月前
|
消息中间件 存储 Java
RabbitMQ是如何实现消息传递的?
RabbitMQ是如何实现消息传递的?
107 0
|
消息中间件 中间件 Kafka
RocketMQ源码(一)RocketMQ消息生产及消费通信链路源码分析
**RocketMQ**的核心架构主要分为Broker、Producer、Consumer,通过阅读源码看到他们之间是通过Netty来通信的 ,具体来说Broker端是**Netty服务器**用来负责与客户端的连接请求处理,而Producer/Consumer端是**Netty客户端**用来负责与Netty服务器的通信及请求响应处理。
186 1
|
消息中间件 安全 JavaScript
小家Spring】从Spring中的(ApplicationEvent)事件驱动机制出发,聊聊【观察者模式】【监听者模式】【发布订阅模式】【消息队列MQ】【EventSourcing】...(中)
小家Spring】从Spring中的(ApplicationEvent)事件驱动机制出发,聊聊【观察者模式】【监听者模式】【发布订阅模式】【消息队列MQ】【EventSourcing】...(中)
|
消息中间件 Java 中间件
RocketMQ延迟消息的代码实战及原理分析
在RocketMQ中,支持延迟消息,但是不支持任意时间精度的延迟消息,只支持特定级别的延迟消息。如果要支持任意时间精度,不能避免在Broker层面做消息排序,再涉及到持久化的考量,那么消息排序就不可避免产生巨大的性能开销。
3222 0