《Ceph源码分析》——第3章,第2节Simple实现

简介: 本节书摘来自华章出版社《Ceph源码分析》一书中的第3章,第3.2节Simple实现,作者常涛,更多章节内容可以访问云栖社区“华章计算机”公众号查看 3.2 Simple实现 Simple在Ceph里实现比较早,目前也比较稳定,是在生产环境中使用的网络通信模块。

本节书摘来自华章出版社《Ceph源码分析》一书中的第3章,第3.2节Simple实现,作者常涛,更多章节内容可以访问云栖社区“华章计算机”公众号查看

3.2 Simple实现
Simple在Ceph里实现比较早,目前也比较稳定,是在生产环境中使用的网络通信模块。如其名字所示,实现相对比较简单。下面具体分析一下,Simple如何实现Ceph网络通信框架的各个模块。

3.2.1 SimpleMessager
类SimpleMessager实现了Messager接口。
class SimpleMessenger : public SimplePolicyMessenger {
Accepter accepter; //用于接受客户端的链接请求
DispatchQueue dispatch_queue; //接收到的请求的消息分发队列
bool did_bind; //是否绑定

__u32 global_seq;//生成全局的消息seq
 ceph_spinlock_t global_seq_lock;//用于保护global_seq

` //地址→pipe映射
ceph::unordered_map rank_pipe;
//正在处理的pipes
set accepting_pipes;
//所有的pipes
set pipes;
//准备释放的pipes
list pipe_reap_queue;`

//内部集群的协议版本
` int cluster_protocol;
}`

3.2.2 Accepter
类Accepter用来在Server端监听端口,接收链接,它继承了Thread类,本身是一个线程,来不断地监听Server的端口:
`class Accepter : public Thread {
SimpleMessenger *msgr;
bool done;
int listen_sd; //监听的端口
uint64_t nonce;
……
}
`
3.2.3 DispatchQueue
DispatchQueue类用于把接收到的请求保存在内部,通过其内部的线程,调用SimpleMessenger类注册的Dispatch类的处理函数来处理相应的消息:

class DispatchQueue {
  ......
  mutable Mutex lock;
  Cond cond;

  class QueueItem {
    int type;
    ConnectionRef con;
    MessageRef m;
    ......
  };

  PrioritizedQueue<QueueItem, uint64_t> mqueue;    //接收消息的优先队列

  set<pair<double, Message*> > marrival;  
  //接收到的消息集合 pair为(recv_time, message) 

  map<Message *, set<pair<double, Message*> >::iterator> marrival_map;
  //消息→所在集合位置的映射
    ……
};

其内部的mqueue为优先级队列,用来保存消息,marrival保存了接收到的消息。marrival_map保存消息在集合中的位置。
函数DispatchQueue::enqueue用来把接收到的消息添加到消息队列中,函数DispatchQueue::entry为线程的处理函数,用于处理消息。

3.2.4 Pipe
类Pipe实现了PipeConnection的接口,它实现了两个端口之间的类似管道的功能。
对于每一个pipe,内部都有一个Reader和一个Writer线程,分别用来处理这个Pipe有关的消息接收和请求的发送。线程DelayedDelivery用于故障注入测试:

class Pipe : public RefCountedObject {
  class Reader : public Thread {
  ……
  } reader_thread;  
  //接收线程,用于接收数据
  class Writer : public Thread {
  ……      
  } writer_thread; 
  //发送线程,用于发送数据
  SimpleMessenger *msgr;        // msgr的指针
  uint64_t conn_id;             //分配给Pipe自己唯一的id

  char *recv_buf;               //接收缓存区
  int recv_max_prefetch;        //接收缓冲区一次预取的最大值
  int recv_ofs;                 //接收的偏移量
  int recv_len;                 //接收的长度

  int sd;                       // pipe对应的socked fd

  struct iovec msgvec[IOV_MAX]; //发送消息的iovec结构

  int port;                     //链接端口
  int peer_type;                //链接对方的类型
  entity_addr_t peer_addr;      //对方地址
  Messenger::Policy policy;     //策略
  
  Mutex pipe_lock;
  int state;                    //当前链接的状态
  atomic_t state_closed;        //如果非0,那么状态为STATE_CLOSED
  
  PipeConnectionRef connection_state;   //PipeConnection的引用

  utime_t backoff;              // backoff的时间
    

  map<int, list<Message*> > out_q;  //准备发送的消息优先队列
  DispatchQueue *in_q;          //接收消息的DispatchQueue
  list<Message*> sent;          //要发送的消息
  Cond cond;
  bool send_keepalive;
  bool send_keepalive_ack;
  utime_t keepalive_ack_stamp;
  bool halt_delivery;           //如果Pipe队列消毁,停止增加
    
  __u32 connect_seq, peer_global_seq;
  uint64_t out_seq;             //发送消息的序列号 
  uint64_t in_seq, in_seq_acked;    //接收到消息序号和ACK的序号
}

3.2.5 消息的发送
1)当发送一个消息时,首先要通过Messenger类,获取对应的Connection:
conn = messenger->get_connection(dest_server);
具体到SimpleMessenger的实现如下所示:
a)首先比较,如果dest.addr是my_inst.addr,就直接返回local_connection。
b)调用函数_lookup_pipe在已经存在的Pipe中查找。如果找到,就直接返回pipeConnectionRef;否则调用函数connect_rank新创建一个Pipe,并加入到msgr的register_pipe里。
2)当获得一个Connection之后,就可以调用Connection的发送函数来发送消息。
conn->send_message(m);
其最终调用了SimpleMessenger::submit_message函数:
a)如果Pipe不为空,并且状态不是Pipe::STATE_CLOSED状态,调用函数pipe→_send把发送的消息添加到out_q发送队列里,触发发送线程。
b)如果Pipe为空,就调用connect_rank创建Pipe,并把消息添加到out_q发送队列中。
3)发送线程writer把消息发送出去。通过步骤2,要发送的消息已经保存在相应Pipe的out_q队列里,并触发了发送线程。每个Pipe的Writer线程负责发送out_q的消息,其线程入口函数为Pipe::writer,实现功能:
a)调用函数_get_next_outgoing从out_q中获取消息。
b)调用函数write_message(header, footer, blist)把消息的header、footer、数据blist发送出去。

3.2.6 消息的接收
1)每个Pipe对应的线程Reader用于接收消息。入口函数为Pipe::reader,其功能如下:
a)判断当前的state,如果为STATE_ACCEPTING,就调用函数Pipe::accept来接收连接,如果不是STATE_CLOSED,并且不是STATE_CONNECTING状态,就接收消息。
b)先调用函数tcp_read来接收一个tag。
c)根据tag,来接收不同类型的消息如下所示:
`CEPH_MSGR_TAG_KEEPALIVE消息。
CEPH_MSGR_TAG_KEEPALIVE2,在CEPH_MSGR_TAG_KEEPALIVE的基础上,添加了时间。
CEPH_MSGR_TAG_KEEPALIVE2_ACK。
CEPH_MSGR_TAG_ACK。
CEPH_MSGR_TAG_MSG,这里才是接收的消息。
CEPH_MSGR_TAG_CLOSE。`
d)调用函数read_message来接收消息,当本函数返回后,就完成了接收消息。
2)调用函数in_q->fast_preprocess(m)预处理消息。
3)调用函数in_q->can_fast_dispatch(m),如果可以进行fast_dispatch,就in_q->fast_dispatch(m)处理。fast_dispatch并不把消息加入到mqueue里,而是直接调用msgr->ms_fast_dispatch函数,并最终调用注册的fast_dispatcher函数处理。
4)如果不能fast_dispatch,就调用函数in_q->enqueue(m, m->get_priority(), conn_id) 把接收到的消息加入到DispatchQueue的mqueue队列里,由DispatchQueue的分发线程调用ms_dispatch处理。
ms_fast_dispath和ms_dispatch两种处理的区别在于:ms_dispatch是由DispatchQueue的线程处理的,它是一个单线程;ms_fast_dispatch函数是由Pipe的接收线程直接调用处理的,因此性能比前者要好。

3.2.7 错误处理
网络模块复杂的功能是如何处理网络错误。无论是接收还是发送,会出现各种异常错误,包括返回异常错误码,接收数据的magic验证不一致,接收的数据的效验验证不一致,等等。错误的原因主要是由于网络本身的错误(物理链路等),或者字节跳变引起的。
目前错误处理的方法比较简单,处理流程如下:
1)关闭当前socket的连接。
2)重新建立一个socket连接。
3)重新发送没有接受到ACK应对的消息。
函数Pipe::fault用来处理错误:
1)调用shutdown_socket关闭pipe的socket。
2)调用函数requeue_sent把没有收到ACK的消息重新加入发送队列,当发送队列有请求时,发送线程会不断地尝试重新连接。

相关实践学习
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。 &nbsp; &nbsp; 相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
相关文章
|
机器学习/深度学习 人工智能 算法
一文让你了解AI产品的测试 评价人工智能算法模型的几个重要指标
一文让你了解AI产品的测试 评价人工智能算法模型的几个重要指标
1819 0
一文让你了解AI产品的测试 评价人工智能算法模型的几个重要指标
|
Java API Maven
Java工具篇之反射框架Reflections
Reflections通过扫描classpath,索引元数据,并且允许在运行时查询这些元数据。 使用Reflections可以很轻松的获取以下元数据信息: - [x] 获取某个类型的全部子类 - [x] 只要类型、构造器、方法,字段上带有特定注解,便能获取带有这个注解的全部信息(类型、构造器、方法,字段) - [x] 获取所有能匹配某个正则表达式的资源 - [x] 获取所有带有特定签名的方法,包括参数,参数注解,返回类型 - [x] 获取所有方法的名字 - [x] 获取代码里所有字段、方法名、构造器的使用权
1843 0
|
22天前
|
存储 编解码 JSON
PowerToys微软最强工具箱软件集!好用的Windows增强工具箱,降低内存和存储占用
Microsoft PowerToys是微软官方推出的免费开源工具集,专为Win10/11设计,集成20+实用功能,如高级粘贴、颜色拾取、窗口布局管理(FancyZones)、文本提取、屏幕缩放标注等,全面提升办公效率,支持开发者、设计师等高效操作,模块化自由配置,堪称Windows效率神器。
230 7
|
4月前
|
数据可视化 算法 量子技术
量子开发者实战:用量子算法可视化工具撕裂量子黑箱
量子研究面临线路复杂、态演化抽象、纠错黑箱三大难题,造成资源浪费与理解偏差。解决方案需具备量子线路降维、态演化全息投影与纠错可视化能力。板栗看板、Qiskit Bloch Sphere、Quantastica等工具助力科研与教学,提升理解效率与算法调试速度,推动量子技术从黑箱走向直观可视。
量子开发者实战:用量子算法可视化工具撕裂量子黑箱
|
数据可视化 JavaScript 前端开发
Matplotlib动画制作:让数据生动起来!
Matplotlib动画制作:让数据生动起来!
257 0
|
算法 搜索推荐 小程序
智慧医院导航系统,技术引领就医流程优化
【摘要】智慧医院导航系统解决患者寻路难题,提高就医效率。政府政策支持导航服务纳入智慧医院标准,系统包括来院规划、院内精准定位、AR实景导航和全程导诊功能,减少患者等待时间,减轻导医台压力,促进医院信息化建设。
505 2
智慧医院导航系统,技术引领就医流程优化
计算机硬件清洁与防尘
【8月更文挑战第2天】
784 1
|
存储 监控 安全
深度剖析Linux进程的内部机制:一探/proc/pid的奥秘
深度剖析Linux进程的内部机制:一探/proc/pid的奥秘
2591 0
|
芯片 关系型数据库
工程师首选:USB过压保护OVP芯片,40V-70V耐压,电流0.5A-6A
平芯微推出一系列集成保护功能的电源管理芯片,包括PW2605、PW2606B、PW2606、PW2609A、PW1600、PW1515、PW1605、PW1558A、PW2601、PW1555A、PW4054H、PW4057H和PW4056HH。这些芯片具备输入过压关闭保护,防止高压输入损坏电路,并提供不同电流等级的输出支持,部分型号还具有可调限流和内置LDO功能。产品适用于各种应用场景,如磁吸充电线、锂电池充电等。其中,PW系列芯片的过压保护点可调,且部分型号具有高耐压特性,以增强系统安全性。
|
数据采集 机器学习/深度学习 数据挖掘
使用Python改造一款外星人入侵小游戏
使用Python改造一款外星人入侵小游戏
268 0