Apache Geode/GemFire 数据分区和路由机制浅析

简介: 本篇文章简单介绍 Apache Geode/GemFire 的数据分区和数据路由机制,并深入到源码做简要地剖析Apache Geode/GemFire 是如何进行数据分区和数据路由的。

本篇文章主要讲解Apache Geode/GemFire 是如何进行数据分区的。


GemFire和大多数分布式系统一样都采用 Hash 的方式对数据进行分区,将 Entry 数据分布到 PartitionedRegion 当中,大家都知道 Entry 数据主要保存在 ConcurrentHashMap 中,ConcurrentHashMap存放在 Bucket 中,在 PR 服务器启动后会为 PartitionedRegion创建相应的Bucket 来保存这个ConcurrentHashMap。因此它们三者的映射关系如下所示:


Entry—> ConcurrentHashMap—> Bucket—> Region



如何进行数据分区?


当前端应用对Entry 进行操作后,Entry 会按照如下的步骤分区到PR 服务器上。


1.Entry 在插入到分布式集群的某台节点服务器过程中,会放到 PR 创建的 Bucket 中。


2.在进行 Put 操作时, 会产生一个EntryOperation 事件,在这个 Event 事件中可以找到PartionedRegion,和要进行 Put 操作的 Key。这个路由的对象就是一般的 POJO 操作类。


3.在获得路由对象上,从条目操作中获得 Key 键,在通过 Key 键来得到相关的对象Object。


4.接下来再通过 Key来获得bucketId, 再通过bucketId获得 PR,给定一个key/routing对象, 运行hashCode()生成一个 long 值, 然后用这个值与 bucket size 取模得到bucketId值。

   * 为了更好地进行哈希key分布, 使用MD5、SHA或其他的 ID 生成方法.


详见PartionedRegionHelper 的getHashKey方法.


private static int getHashKey(EntryOperation event, PartitionedRegion pr,

      Operation operation, Object key, Object value, Object callbackArgument) {

    // avoid creating EntryOperation if there is no resolver

    if (event != null) {

      pr = (PartitionedRegion)event.getRegion();

      key = event.getKey();

      callbackArgument = event.getCallbackArgument();

    }


    PartitionResolver resolver = getResolver(pr, key, callbackArgument);

    Object resolveKey = null;

    if (pr.isFixedPartitionedRegion()) {

      String partition = null ;

      if (resolver instanceof FixedPartitionResolver) {

        Map<String, Integer[]> partitionMap = pr.getPartitionsMap();

        if (event == null) {

          event = new EntryOperationImpl(pr, operation, key, value,

              callbackArgument);

        }

        partition = ((FixedPartitionResolver)resolver).getPartitionName(

            event, partitionMap.keySet());

        if (partition == null) {

          Object[] prms = new Object[] { pr.getName(), resolver };

          throw new IllegalStateException(

              LocalizedStrings.PartitionedRegionHelper_FOR_REGION_0_PARTITIONRESOLVER_1_RETURNED_PARTITION_NAME_NULL.toLocalizedString(prms));

        }

        Integer[] bucketArray = partitionMap.get(partition);

        if (bucketArray == null) {

          Object[] prms = new Object[] { pr.getName(), partition };

          throw new PartitionNotAvailableException(

              LocalizedStrings.PartitionedRegionHelper_FOR_FIXED_PARTITIONED_REGION_0_FIXED_PARTITION_1_IS_NOT_AVAILABLE_ON_ANY_DATASTORE.toLocalizedString(prms));

        }

        int numBukets = bucketArray[1];

        resolveKey = (numBukets == 1) ? partition : resolver.getRoutingObject(event);

      }

      else if (resolver == null) {

        throw new IllegalStateException(

            LocalizedStrings.PartitionedRegionHelper_FOR_FIXED_PARTITIONED_REGION_0_FIXED_PARTITION_RESOLVER_IS_NOT_AVAILABLE.toString(pr.getName()));

      }

      else if (!(resolver instanceof FixedPartitionResolver)) {

        Object[] prms = new Object[] { pr.getName(), resolver };

        throw new IllegalStateException(

            LocalizedStrings.PartitionedRegionHelper_FOR_FIXED_PARTITIONED_REGION_0_RESOLVER_DEFINED_1_IS_NOT_AN_INSTANCE_OF_FIXEDPARTITIONRESOLVER.toLocalizedString(prms));

      }

      return assignFixedBucketId(pr, partition, resolveKey);

    }

    else {

      // Calculate resolveKey.

      if (resolver == null) {

        // no custom partitioning at all

        resolveKey = key;

        if (resolveKey == null) {

          throw new IllegalStateException("attempting to hash null");

        }

      }

      else {

        if (event == null) {

          event = new EntryOperationImpl(pr, operation, key, value,

              callbackArgument);

        }

        // 通过 Entry 操作, 获得一个路由对象, 得到resolveKey, 在通过 resolveKey进行Hash计算

        resolveKey = resolver.getRoutingObject(event);

        if (resolveKey == null) {

          throw new IllegalStateException(

              LocalizedStrings.PartitionedRegionHelper_THE_ROUTINGOBJECT_RETURNED_BY_PARTITIONRESOLVER_IS_NULL.toLocalizedString());

        }

      }

      // Finally, calculate the hash.

      return getHashKey(pr, resolveKey);

    }

  }


如何进行数据感知路由?

 

GemFire 开发了一个Function Service 模块能够让客户端和服务器节点一起来处理提交的任务。如果数据跨多个节点分区,GemFire能够透明地路由数据执行行为到待处理数据的节点,这样避免了数据跨网络移动,这被称为“数据感知功能路由”。带有数据感知路由功能的应用根本不需要管理数据。

 

GemFire路由数据的执行行为而不是数据本身,GemFire直接路由数据执行行为到需要做并行处理,或汇聚结果的节点。这个特性使得GemFire从根本上降低了执行复杂任务的时间。分布式并行处理活动被抽象出来,与应用调用端无关。

 

 

应用即能够在单点执行,也能在一个小集群并行执行,甚至能够跨整个分布式集群并行执行。

GemFire的并行模型非常类似于Google的Map-Reduce模型。数据感知路由最适合于执行迭代查询或汇聚数据条目的操作。通过数据并处和并行计算,系统的吞吐量显著提升。最重要的是,计算的延迟与并行计算的节点数成反比。

 

单节点的Function执行类似于关系型数据库的Stored Procedure执行。

 

并行计算之后,结果通过Function中的结果收集器,把处理完的结果会调用ResultCollector统一收集回来。相当于Map-Reduce模型中的输出收集器。



相关文章
|
5月前
|
Apache 流计算 OceanBase
手把手教你实现 OceanBase 数据到阿里云数据库 SelectDB 内核版 Apache Doris 的便捷迁移|实用指南
本文介绍了如何将数据从 OceanBase 迁移到阿里云数据库 SelectDB 内核版 Apache Doris。提供 3 种数据同步方法 1. 使用 DataX,下载 DataX 并编写配置文件,通过 OceanBaseReader 和 DorisWriter 进行数据迁移。 2. 利用 Apache Doris 的 Catalog功 能,将 OceanBase 表映射到 Doris 并插入数据。 3. 通过Flink CDC,设置 OceanBase 环境,配置 Flink 连接器,实现实时数据同步。
手把手教你实现 OceanBase 数据到阿里云数据库 SelectDB 内核版 Apache Doris 的便捷迁移|实用指南
|
8天前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
26 3
|
2月前
|
物联网 数据管理 Apache
拥抱IoT浪潮,Apache IoTDB如何成为你的智能数据守护者?解锁物联网新纪元的数据管理秘籍!
【8月更文挑战第22天】随着物联网技术的发展,数据量激增对数据库提出新挑战。Apache IoTDB凭借其面向时间序列数据的设计,在IoT领域脱颖而出。相较于传统数据库,IoTDB采用树形数据模型高效管理实时数据,具备轻量级结构与高并发能力,并集成Hadoop/Spark支持复杂分析。在智能城市等场景下,IoTDB能处理如交通流量等数据,为决策提供支持。IoTDB还提供InfluxDB协议适配器简化迁移过程,并支持细致的权限管理确保数据安全。综上所述,IoTDB在IoT数据管理中展现出巨大潜力与竞争力。
86 1
|
12天前
|
SQL 消息中间件 大数据
大数据-159 Apache Kylin 构建Cube 准备和测试数据(一)
大数据-159 Apache Kylin 构建Cube 准备和测试数据(一)
31 1
|
12天前
|
SQL 大数据 Apache
大数据-159 Apache Kylin 构建Cube 准备和测试数据(二)
大数据-159 Apache Kylin 构建Cube 准备和测试数据(二)
44 1
|
13天前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
39 1
|
2月前
|
存储 消息中间件 人工智能
AI大模型独角兽 MiniMax 基于阿里云数据库 SelectDB 版内核 Apache Doris 升级日志系统,PB 数据秒级查询响应
早期 MiniMax 基于 Grafana Loki 构建了日志系统,在资源消耗、写入性能及系统稳定性上都面临巨大的挑战。为此 MiniMax 开始寻找全新的日志系统方案,并基于阿里云数据库 SelectDB 版内核 Apache Doris 升级了日志系统,新系统已接入 MiniMax 内部所有业务线日志数据,数据规模为 PB 级, 整体可用性达到 99.9% 以上,10 亿级日志数据的检索速度可实现秒级响应。
AI大模型独角兽 MiniMax 基于阿里云数据库 SelectDB 版内核 Apache Doris 升级日志系统,PB 数据秒级查询响应
|
13天前
|
存储 分布式计算 大数据
大数据-145 Apache Kudu 架构解读 Master Table 分区 读写
大数据-145 Apache Kudu 架构解读 Master Table 分区 读写
28 0
|
1月前
|
存储 大数据 数据挖掘
【数据新纪元】Apache Doris:重塑实时分析性能,解锁大数据处理新速度,引爆数据价值潜能!
【9月更文挑战第5天】Apache Doris以其卓越的性能、灵活的架构和高效的数据处理能力,正在重塑实时分析的性能极限,解锁大数据处理的新速度,引爆数据价值的无限潜能。在未来的发展中,我们有理由相信Apache Doris将继续引领数据处理的潮流,为企业提供更快速、更准确、更智能的数据洞察和决策支持。让我们携手并进,共同探索数据新纪元的无限可能!
114 11
|
5月前
|
消息中间件 Java Kafka
实时计算 Flink版操作报错之Apache Flink中的SplitFetcher线程在读取数据时遇到了未预期的情况,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。

推荐镜像

更多