ceph monitor paxos的实现(一)

简介: ceph monitor的一个主要功能是使用paxos分布式式协议维护一个key/value数据库的一致性。所使用的数据库引擎一般是leveldb。 数据库接口抽象 ----------- 为了适应不同的数据库引擎, ceph定义一个MonitorDBStore类来抽象对k/v数据库的操作。对后端数据库要求是支持事务或者原子性的key/value批量更新。它定义个一 Transa

ceph monitor的一个主要功能是使用paxos分布式式协议维护一个key/value数据库的一致性。所使用的数据库引擎一般是leveldb。

数据库接口抽象

为了适应不同的数据库引擎, ceph定义一个MonitorDBStore类来抽象对k/v数据库的操作。对后端数据库要求是支持事务或者原子性的key/value批量更新。它定义个一 Transaction类来说明一个事务包含的所有操作,并且这个类是可以序列化和反序列化的,以便在服务器之间传送:

struct Op {
    uint8_t type;
    string prefix;
    string key, endkey;
    bufferlist bl;
}

struct Transaction {
    list<Op> ops;
    uint64_t bytes, keys;
                                                                              
    Transaction() : bytes(0), keys(0) {}
                                                                                
    enum { 
      OP_PUT    = 1,
      OP_ERASE  = 2,
      OP_COMPACT = 3,
    };

例如它定义个put、erase的成员函数操作:


// 设置一个key的value
void put(string prefix, string key, bufferlist& bl) {
      ops.push_back(Op(OP_PUT, prefix, key, bl));
      ++keys;
      bytes += prefix.length() + key.length() + bl.length();
}
   
// 删除一个key
void erase(string prefix, string key) {
      ops.push_back(Op(OP_ERASE, prefix, key));
      ++keys;
      bytes += prefix.length() + key.length();
}
    

而序列化、反序列化函数:

void encode(bufferlist& bl) const { 
      ENCODE_START(2, 1,bl);
      ::encode(ops,bl);
      ::encode(bytes, bl);
      ::encode(keys, bl);
      ENCODE_FINISH(bl);
}
                                                                                
void decode(bufferlist::iterator& bl) {
      DECODE_START(2, bl);
      ::decode(ops, bl);
      if (struct_v >= 2) {
        ::decode(bytes,bl);
        ::decode(keys, bl);
      }
      DECODE_FINISH(bl);
}
    

ceph 主要用这个MonitorDBStore来为各个需要使用paxos的模块提供存储,为了各个模块不相互干扰,每个模块会选择一个前缀, 所有属于这个模块的数据都使用这个prefix再加上 一个key,才构成后端数据库真正的key, 具体结构时这样的:

prefix + '\0' + key

MonitorDBStore的API 主要是

int apply_transaction(MonitorDBStore::TransactionRef t)

负责把Transaction的每一条操作以原子方式在后端数据库执行,是一个同步操作,而

queue_transaction(MonitorDBStore::TransactionRef t,Context *oncommit)

是一个异步操作,事务完成后会回调一个从Context导出的类对象,类似于C语言中的回调函数。

除此以外,MonitorDBStore还有get操作

int get(const string& prefix, const string& key, bufferlist& bl);
int get(const string& prefix, const version_t ver, bufferlist& bl);

定义迭代器用来批量获取数据,它可以指定几个prefix, 并批量把数据追加到一个Transaction里面,以便在服务器见批量传数据, 可以预见加进去的数据操作是put操作

class WholeStoreIteratorImpl : public StoreIteratorImpl {
    KeyValueDB::WholeSpaceIterator iter;
    set<string> sync_prefixes;
public:
    WholeStoreIteratorImpl(KeyValueDB::WholeSpaceIterator(
    set<string> &prefixes) : StoreIteratorImpl(), iter(iter),
   sync_prefixes(prefixes)
    { }
    bool add_chunk_entry(TransactionRef tx
                         string &prefix,
                         string &key,
                         bufferlist &value,
                         uint64_t max); 
}
    

paxos数据在MonitorDB上的存放格式

ceph内部使用了log来记录最近一段时间的操作,log存放在leveldb中,key的前缀‘paxos’被paxos核心模块保留。每一条log一个key, key的组成是paxos前缀+
index, index是用整数来表示的,顺序增加。为了加快log的查询, 还用"first_committed" "last_committed", 两个key来表示这段log, 前者是第一条log,后者是最后一条log。

monitor启动时的数据同步

每次monitor server启动时都会按照monmap中的服务器地址去连接其他monitor服务器,并同步数据。这个过程叫做bootstrap(). bootstrap的第一个目的是补全数据,从其他服务拉缺失的paxos log或者全量复制数据库,其次是在必要时形成多数派建立一个paxos集群或者加入到已有的多数派中。

启动时将自己加入到一个外部法人集合,因为刚开始自己肯定不是在多数派中:

// i'm outside the quorum
if (monmap->contains(name))
    outside_quorum.insert(name);

然后给其它所有它知道的服务器发送探测包:

  // probe monitors
  dout(10) << "probing other monitors" << dendl;
  for (unsigned i = 0; i < monmap->size(); i++) {
    if ((int)i != rank)
      messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined),
                  monmap->get_inst(i));
  }
  for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
       p != extra_probe_peers.end();
       ++p) {
    if (*p != messenger->get_myaddr()) {
      entity_inst_t i;
      i.name = entity_name_t::MON(-1);
      i.addr = *p;
      messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined), i);
    }
  }

任何一个服务器收到探测包都会比较自己的最后一次修改数据的版本是否落后于正在探测的服务器的数据:

 if (!is_probing() && !is_synchronizing()) {
    // If the probing mon is way ahead of us, we need to re-bootstrap.
    // Normally we capture this case when we initially bootstrap, but
    // it is possible we pass those checks (we overlap with
    // quorum-to-be) but fail to join a quorum before it moves past
    // us.  We need to be kicked back to bootstrap so we can
    // synchonize, not keep calling elections.
    if (paxos->get_version() + 1 < m->paxos_first_version) {
      dout(1) << " peer " << m->get_source_addr() << " has first_committed " << "ahead of us, re-bootstrapping" << dendl;
      bootstrap();
      goto out;

    }
  }

对于被探测的服务器,如果最后一条log的index number都跟不上对方的第一条记录的index number,意味着已经落后太多了,中间log记录已经缺失,不可能让paxos核心部分通过log来传播数据到本进程以获得数据的最终版本,本进程需要重启bootstrap从对方主动拉数据。此时不会带对方的探测包返回应答。正常情况,我们会报告本服务器的paxos状态:

r = new MMonProbe(monmap->fsid, MMonProbe::OP_REPLY, name, has_ever_joined);
  r->name = name;
  r->quorum = quorum;
  monmap->encode(r->monmap_bl, m->get_connection()->get_features());
  r->paxos_first_version = paxos->get_first_committed();
  r->paxos_last_version = paxos->get_version();
  m->get_connection()->send_message(r);

  // did we discover a peer here?
  if (!monmap->contains(m->get_source_addr())) {
    dout(1) << " adding peer " << m->get_source_addr()
        << " to list of hints" << dendl;
    extra_probe_peers.insert(m->get_source_addr());
  }

主要内容包括我们是否是多数派的一员(通过返回多数派成员列表),以及我的paxos log的第一条记录号和最后一条记录号。

一旦一个发出探测包的服务器收到一个应答也会检查paxos log是否过时:

if (paxos->get_version() < m->paxos_first_version &&
    m->paxos_first_version > 1) {  // no need to sync if we're 0 and they start at 1.
      dout(10) << " peer paxos versions [" << m->paxos_first_version
           << "," << m->paxos_last_version << "]"
           << " vs my version " << paxos->get_version()
           << " (too far ahead)"
           << dendl;
      cancel_probe_timeout();
      sync_start(other, true);
      m->put();
      return;
    }
    if (paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version) {
      dout(10) << " peer paxos version " << m->paxos_last_version
           << " vs my version " << paxos->get_version()
           << " (too far ahead)"
           << dendl;
      cancel_probe_timeout();
      sync_start(other, false);
      m->put();
      return;
    }

一种情况是我的最后一条log记录和对方的第一条log记录之间有空隙,中间有缺失,只能主动从对方拉数据,道理与上面相同。还有一种是根据配置变量paxos_max_join_drift,数据并没有缺失,但是要传的log超过一个阀值,不如全量从对方复制数据。

输入探测方发现不需要在这个阶段复制数据,并且对方就是多数派的一员,那么可以肯定它的数据是和其他服务器同步的,至少应该乐观的认为,:-) ,所以直接加入到多数派去:

if (m->quorum.size()) { // 多数派列表非空
   if (monmap->contains(name) &&
        !monmap->get_addr(name).is_blank_ip()) {
      // i'm part of the cluster; just initiate a new election
      // 我的地址他们都知道了, 通过start_election选举后可以加入多数派
      start_election();
   } else {
    // 需要通知leader把我的地址修改了,然后会probe time会超时后重启bootstrap
    dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl;
      messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
                              monmap->get_inst(*m->quorum.begin()));
   }
}
else {
    //如果对方也不是当前多数派的一员,并且是属于monmap的一员,那么把它列入到在多数派外面的人
    if (monmap->contains(m->name)) {
      dout(10) << " mon." << m->name << " is outside the quorum" << dendl;
      outside_quorum.insert(m->name);
    } else {
      dout(10) << " mostly ignoring mon." << m->name << ", not part of monmap" << dendl;
      m->put();
      return;
    }

    //一旦发现不在多数派的人数超过2F + 1 (包括自己), 说明集群不存在多数派,就可以通过选举来形成多数派
    unsigned need = monmap->size() / 2 + 1;
    dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl;
    if (outside_quorum.size() >= need) {
      if (outside_quorum.count(name)) {
        dout(10) << " that's enough to form a new quorum, calling election" << dendl;
        start_election();
      } else {
        dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl;
      }
    } else {
      dout(10) << " that's not yet enough for a new quorum, waiting" << dendl;
    }
  }

本章总结

ceph monitor通过bootstrap过程,探测服务器列表中的各个服务器,比对log的最小记录号和最大记录号,直到本机数据的log历史(第一条记录和最后一条记录)都与所有其他服务器有交集,说明本机没有漏掉数据,从而进入多数派的形成过程,为paxos核心部分只通过传播log就可以同步数据创造条件。在boostrap阶段,服务器分析是否存在一个多数派,必要是通过进入竞选形成多数派。在这个阶段的全量同步和部分数据传输,没有介绍,因为相对简单,可以通过阅读ceph源码获得。
本章并未涉及ceph paxos设计最核心部分,有时间再介绍。

目录
相关文章
|
网络虚拟化
干货 | 华三 Monitor Link常用功能配置操作
干货 | 华三 Monitor Link常用功能配置操作
356 0
|
消息中间件 传感器 数据处理
"揭秘实时流式计算:低延迟、高吞吐量的数据处理新纪元,Apache Flink示例带你领略实时数据处理的魅力"
【8月更文挑战第10天】实时流式计算即时处理数据流,低延迟捕获、处理并输出数据,适用于金融分析等需即时响应场景。其框架(如Apache Flink)含数据源、处理逻辑及输出目标三部分。例如,Flink可从数据流读取信息,转换后输出。此技术优势包括低延迟、高吞吐量、强容错性及处理逻辑的灵活性。
326 4
|
11月前
|
人工智能 搜索推荐 测试技术
AI 辅助编程的效果衡量
本文主要介绍了如何度量研发效能,以及 AI 辅助编程是如何影响效能的,进而阐述如何衡量 AI 辅助编程带来的收益。
|
12月前
|
机器学习/深度学习 算法 自动驾驶
深度学习之分布式智能体学习
基于深度学习的分布式智能体学习是一种针对多智能体系统的机器学习方法,旨在通过多个智能体协作、分布式决策和学习来解决复杂任务。这种方法特别适用于具有大规模数据、分散计算资源、或需要智能体彼此交互的应用场景。
665 4
|
数据采集 算法 开发者
如何使用Python爬虫处理多种类型的滑动验证码
如何使用Python爬虫处理多种类型的滑动验证码
|
存储 关系型数据库 MySQL
MVCC:深入解析多版本并发控制机制
【4月更文挑战第20天】MVCC是数据库并发控制的关键技术,通过保存数据多个版本,使读写操作无锁并发,减少锁竞争,提高并发性能。它保证事务看到一致数据快照,避免并发问题,并支持事务回滚与恢复。MVCC广泛应用于PostgreSQL、InnoDB等,提供时间旅行查询和无锁读等功能,对于构建高性能、高并发数据库系统至关重要。
331 13
|
运维 Linux KVM
KVM详解(五)——KVM虚拟机镜像格式
KVM详解(五)——KVM虚拟机镜像格式
1269 0
|
存储 负载均衡 数据中心
带你读《存储漫谈:Ceph原理与实践》——3.2.5 元数据 / 数据同步
带你读《存储漫谈:Ceph原理与实践》——3.2.5 元数据 / 数据同步
|
算法 Linux
易懂的方式讲解ARM中断原理以及中断嵌套方法
易懂的方式讲解ARM中断原理以及中断嵌套方法
470 0
|
存储 分布式计算 监控
集群管理
集群管理是一种管理和协调多个计算机或服务器的技术,以便它们可以共同工作,提供更高的性能、可靠性、可扩展性和容错能力。集群管理通常涉及负载均衡、故障转移、数据备份和恢复、监控和自动化维护等功能。 集群管理可以应用于许多领域,例如数据中心、云计算、高性能计算、数据库、网络和存储等。常见的集群管理工具和技术包括:
305 2

热门文章

最新文章