Apache Geode/GemFire 数据分区和路由机制浅析

简介: 本篇文章简单介绍 Apache Geode/GemFire 的数据分区和数据路由机制,并深入到源码做简要地剖析Apache Geode/GemFire 是如何进行数据分区和数据路由的。

本篇文章主要讲解Apache Geode/GemFire 是如何进行数据分区的。


GemFire和大多数分布式系统一样都采用 Hash 的方式对数据进行分区,将 Entry 数据分布到 PartitionedRegion 当中,大家都知道 Entry 数据主要保存在 ConcurrentHashMap 中,ConcurrentHashMap存放在 Bucket 中,在 PR 服务器启动后会为 PartitionedRegion创建相应的Bucket 来保存这个ConcurrentHashMap。因此它们三者的映射关系如下所示:


Entry—> ConcurrentHashMap—> Bucket—> Region



如何进行数据分区?


当前端应用对Entry 进行操作后,Entry 会按照如下的步骤分区到PR 服务器上。


1.Entry 在插入到分布式集群的某台节点服务器过程中,会放到 PR 创建的 Bucket 中。


2.在进行 Put 操作时, 会产生一个EntryOperation 事件,在这个 Event 事件中可以找到PartionedRegion,和要进行 Put 操作的 Key。这个路由的对象就是一般的 POJO 操作类。


3.在获得路由对象上,从条目操作中获得 Key 键,在通过 Key 键来得到相关的对象Object。


4.接下来再通过 Key来获得bucketId, 再通过bucketId获得 PR,给定一个key/routing对象, 运行hashCode()生成一个 long 值, 然后用这个值与 bucket size 取模得到bucketId值。

   * 为了更好地进行哈希key分布, 使用MD5、SHA或其他的 ID 生成方法.


详见PartionedRegionHelper 的getHashKey方法.


private static int getHashKey(EntryOperation event, PartitionedRegion pr,

      Operation operation, Object key, Object value, Object callbackArgument) {

    // avoid creating EntryOperation if there is no resolver

    if (event != null) {

      pr = (PartitionedRegion)event.getRegion();

      key = event.getKey();

      callbackArgument = event.getCallbackArgument();

    }


    PartitionResolver resolver = getResolver(pr, key, callbackArgument);

    Object resolveKey = null;

    if (pr.isFixedPartitionedRegion()) {

      String partition = null ;

      if (resolver instanceof FixedPartitionResolver) {

        Map<String, Integer[]> partitionMap = pr.getPartitionsMap();

        if (event == null) {

          event = new EntryOperationImpl(pr, operation, key, value,

              callbackArgument);

        }

        partition = ((FixedPartitionResolver)resolver).getPartitionName(

            event, partitionMap.keySet());

        if (partition == null) {

          Object[] prms = new Object[] { pr.getName(), resolver };

          throw new IllegalStateException(

              LocalizedStrings.PartitionedRegionHelper_FOR_REGION_0_PARTITIONRESOLVER_1_RETURNED_PARTITION_NAME_NULL.toLocalizedString(prms));

        }

        Integer[] bucketArray = partitionMap.get(partition);

        if (bucketArray == null) {

          Object[] prms = new Object[] { pr.getName(), partition };

          throw new PartitionNotAvailableException(

              LocalizedStrings.PartitionedRegionHelper_FOR_FIXED_PARTITIONED_REGION_0_FIXED_PARTITION_1_IS_NOT_AVAILABLE_ON_ANY_DATASTORE.toLocalizedString(prms));

        }

        int numBukets = bucketArray[1];

        resolveKey = (numBukets == 1) ? partition : resolver.getRoutingObject(event);

      }

      else if (resolver == null) {

        throw new IllegalStateException(

            LocalizedStrings.PartitionedRegionHelper_FOR_FIXED_PARTITIONED_REGION_0_FIXED_PARTITION_RESOLVER_IS_NOT_AVAILABLE.toString(pr.getName()));

      }

      else if (!(resolver instanceof FixedPartitionResolver)) {

        Object[] prms = new Object[] { pr.getName(), resolver };

        throw new IllegalStateException(

            LocalizedStrings.PartitionedRegionHelper_FOR_FIXED_PARTITIONED_REGION_0_RESOLVER_DEFINED_1_IS_NOT_AN_INSTANCE_OF_FIXEDPARTITIONRESOLVER.toLocalizedString(prms));

      }

      return assignFixedBucketId(pr, partition, resolveKey);

    }

    else {

      // Calculate resolveKey.

      if (resolver == null) {

        // no custom partitioning at all

        resolveKey = key;

        if (resolveKey == null) {

          throw new IllegalStateException("attempting to hash null");

        }

      }

      else {

        if (event == null) {

          event = new EntryOperationImpl(pr, operation, key, value,

              callbackArgument);

        }

        // 通过 Entry 操作, 获得一个路由对象, 得到resolveKey, 在通过 resolveKey进行Hash计算

        resolveKey = resolver.getRoutingObject(event);

        if (resolveKey == null) {

          throw new IllegalStateException(

              LocalizedStrings.PartitionedRegionHelper_THE_ROUTINGOBJECT_RETURNED_BY_PARTITIONRESOLVER_IS_NULL.toLocalizedString());

        }

      }

      // Finally, calculate the hash.

      return getHashKey(pr, resolveKey);

    }

  }


如何进行数据感知路由?

 

GemFire 开发了一个Function Service 模块能够让客户端和服务器节点一起来处理提交的任务。如果数据跨多个节点分区,GemFire能够透明地路由数据执行行为到待处理数据的节点,这样避免了数据跨网络移动,这被称为“数据感知功能路由”。带有数据感知路由功能的应用根本不需要管理数据。

 

GemFire路由数据的执行行为而不是数据本身,GemFire直接路由数据执行行为到需要做并行处理,或汇聚结果的节点。这个特性使得GemFire从根本上降低了执行复杂任务的时间。分布式并行处理活动被抽象出来,与应用调用端无关。

 

 

应用即能够在单点执行,也能在一个小集群并行执行,甚至能够跨整个分布式集群并行执行。

GemFire的并行模型非常类似于Google的Map-Reduce模型。数据感知路由最适合于执行迭代查询或汇聚数据条目的操作。通过数据并处和并行计算,系统的吞吐量显著提升。最重要的是,计算的延迟与并行计算的节点数成反比。

 

单节点的Function执行类似于关系型数据库的Stored Procedure执行。

 

并行计算之后,结果通过Function中的结果收集器,把处理完的结果会调用ResultCollector统一收集回来。相当于Map-Reduce模型中的输出收集器。



目录
打赏
0
0
0
0
15
分享
相关文章
Apache Doris 2025 Roadmap:构建 GenAI 时代实时高效统一的数据底座
秉承“以场景驱动创新” 的核心理念,持续深耕三大核心场景的关键能力,并对大模型 GenAI 场景的融合应用进行重点投入,为智能时代构建实时、高效、统一的数据底座。
Apache Doris 2025 Roadmap:构建 GenAI 时代实时高效统一的数据底座
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
288 3
数据无界、湖仓无界, Apache Doris 湖仓一体解决方案全面解读(上篇)
湖仓一体架构融合了数据湖的低成本、高扩展性,以及数据仓库的高性能、强数据治理能力,高效应对大数据时代的挑战。为助力企业实现湖仓一体的建设,Apache Doris 提出了数据无界和湖仓无界核心理念,并结合自身特性,助力企业加速从 0 到 1 构建湖仓体系,降低转型过程中的风险和成本。本文将对湖仓一体演进及 Apache Doris 湖仓一体方案进行介绍。
数据无界、湖仓无界, Apache Doris 湖仓一体解决方案全面解读(上篇)
从 ClickHouse 到 Apache Doris:在网易云音乐日增万亿日志数据场景下的落地
日志数据已成为企业洞察系统状态、监控网络安全及分析业务动态的宝贵资源。网易云音乐引入 Apache Doris 作为日志库新方案,替换了 ClickHouse。解决了 ClickHouse 运维复杂、不支持倒排索引的问题。目前已经稳定运行 3 个季度,规模达到 50 台服务器, 倒排索引将全文检索性能提升7倍,2PB 数据,每天新增日志量超过万亿条,峰值写入吞吐 6GB/s 。
从 ClickHouse 到 Apache Doris:在网易云音乐日增万亿日志数据场景下的落地
拥抱IoT浪潮,Apache IoTDB如何成为你的智能数据守护者?解锁物联网新纪元的数据管理秘籍!
【8月更文挑战第22天】随着物联网技术的发展,数据量激增对数据库提出新挑战。Apache IoTDB凭借其面向时间序列数据的设计,在IoT领域脱颖而出。相较于传统数据库,IoTDB采用树形数据模型高效管理实时数据,具备轻量级结构与高并发能力,并集成Hadoop/Spark支持复杂分析。在智能城市等场景下,IoTDB能处理如交通流量等数据,为决策提供支持。IoTDB还提供InfluxDB协议适配器简化迁移过程,并支持细致的权限管理确保数据安全。综上所述,IoTDB在IoT数据管理中展现出巨大潜力与竞争力。
274 1
实时计算 Flink版操作报错之Apache Flink中的SplitFetcher线程在读取数据时遇到了未预期的情况,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
AI大模型独角兽 MiniMax 基于阿里云数据库 SelectDB 版内核 Apache Doris 升级日志系统,PB 数据秒级查询响应
早期 MiniMax 基于 Grafana Loki 构建了日志系统,在资源消耗、写入性能及系统稳定性上都面临巨大的挑战。为此 MiniMax 开始寻找全新的日志系统方案,并基于阿里云数据库 SelectDB 版内核 Apache Doris 升级了日志系统,新系统已接入 MiniMax 内部所有业务线日志数据,数据规模为 PB 级, 整体可用性达到 99.9% 以上,10 亿级日志数据的检索速度可实现秒级响应。
AI大模型独角兽 MiniMax 基于阿里云数据库 SelectDB 版内核 Apache Doris 升级日志系统,PB 数据秒级查询响应
大数据-159 Apache Kylin 构建Cube 准备和测试数据(一)
大数据-159 Apache Kylin 构建Cube 准备和测试数据(一)
228 1
大数据-159 Apache Kylin 构建Cube 准备和测试数据(二)
大数据-159 Apache Kylin 构建Cube 准备和测试数据(二)
150 1
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
188 1

推荐镜像

更多
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问