Apache Geode/GemFire 数据分区和路由机制浅析

简介: 本篇文章简单介绍 Apache Geode/GemFire 的数据分区和数据路由机制,并深入到源码做简要地剖析Apache Geode/GemFire 是如何进行数据分区和数据路由的。

本篇文章主要讲解Apache Geode/GemFire 是如何进行数据分区的。


GemFire和大多数分布式系统一样都采用 Hash 的方式对数据进行分区,将 Entry 数据分布到 PartitionedRegion 当中,大家都知道 Entry 数据主要保存在 ConcurrentHashMap 中,ConcurrentHashMap存放在 Bucket 中,在 PR 服务器启动后会为 PartitionedRegion创建相应的Bucket 来保存这个ConcurrentHashMap。因此它们三者的映射关系如下所示:


Entry—> ConcurrentHashMap—> Bucket—> Region



如何进行数据分区?


当前端应用对Entry 进行操作后,Entry 会按照如下的步骤分区到PR 服务器上。


1.Entry 在插入到分布式集群的某台节点服务器过程中,会放到 PR 创建的 Bucket 中。


2.在进行 Put 操作时, 会产生一个EntryOperation 事件,在这个 Event 事件中可以找到PartionedRegion,和要进行 Put 操作的 Key。这个路由的对象就是一般的 POJO 操作类。


3.在获得路由对象上,从条目操作中获得 Key 键,在通过 Key 键来得到相关的对象Object。


4.接下来再通过 Key来获得bucketId, 再通过bucketId获得 PR,给定一个key/routing对象, 运行hashCode()生成一个 long 值, 然后用这个值与 bucket size 取模得到bucketId值。

   * 为了更好地进行哈希key分布, 使用MD5、SHA或其他的 ID 生成方法.


详见PartionedRegionHelper 的getHashKey方法.


private static int getHashKey(EntryOperation event, PartitionedRegion pr,

      Operation operation, Object key, Object value, Object callbackArgument) {

    // avoid creating EntryOperation if there is no resolver

    if (event != null) {

      pr = (PartitionedRegion)event.getRegion();

      key = event.getKey();

      callbackArgument = event.getCallbackArgument();

    }


    PartitionResolver resolver = getResolver(pr, key, callbackArgument);

    Object resolveKey = null;

    if (pr.isFixedPartitionedRegion()) {

      String partition = null ;

      if (resolver instanceof FixedPartitionResolver) {

        Map<String, Integer[]> partitionMap = pr.getPartitionsMap();

        if (event == null) {

          event = new EntryOperationImpl(pr, operation, key, value,

              callbackArgument);

        }

        partition = ((FixedPartitionResolver)resolver).getPartitionName(

            event, partitionMap.keySet());

        if (partition == null) {

          Object[] prms = new Object[] { pr.getName(), resolver };

          throw new IllegalStateException(

              LocalizedStrings.PartitionedRegionHelper_FOR_REGION_0_PARTITIONRESOLVER_1_RETURNED_PARTITION_NAME_NULL.toLocalizedString(prms));

        }

        Integer[] bucketArray = partitionMap.get(partition);

        if (bucketArray == null) {

          Object[] prms = new Object[] { pr.getName(), partition };

          throw new PartitionNotAvailableException(

              LocalizedStrings.PartitionedRegionHelper_FOR_FIXED_PARTITIONED_REGION_0_FIXED_PARTITION_1_IS_NOT_AVAILABLE_ON_ANY_DATASTORE.toLocalizedString(prms));

        }

        int numBukets = bucketArray[1];

        resolveKey = (numBukets == 1) ? partition : resolver.getRoutingObject(event);

      }

      else if (resolver == null) {

        throw new IllegalStateException(

            LocalizedStrings.PartitionedRegionHelper_FOR_FIXED_PARTITIONED_REGION_0_FIXED_PARTITION_RESOLVER_IS_NOT_AVAILABLE.toString(pr.getName()));

      }

      else if (!(resolver instanceof FixedPartitionResolver)) {

        Object[] prms = new Object[] { pr.getName(), resolver };

        throw new IllegalStateException(

            LocalizedStrings.PartitionedRegionHelper_FOR_FIXED_PARTITIONED_REGION_0_RESOLVER_DEFINED_1_IS_NOT_AN_INSTANCE_OF_FIXEDPARTITIONRESOLVER.toLocalizedString(prms));

      }

      return assignFixedBucketId(pr, partition, resolveKey);

    }

    else {

      // Calculate resolveKey.

      if (resolver == null) {

        // no custom partitioning at all

        resolveKey = key;

        if (resolveKey == null) {

          throw new IllegalStateException("attempting to hash null");

        }

      }

      else {

        if (event == null) {

          event = new EntryOperationImpl(pr, operation, key, value,

              callbackArgument);

        }

        // 通过 Entry 操作, 获得一个路由对象, 得到resolveKey, 在通过 resolveKey进行Hash计算

        resolveKey = resolver.getRoutingObject(event);

        if (resolveKey == null) {

          throw new IllegalStateException(

              LocalizedStrings.PartitionedRegionHelper_THE_ROUTINGOBJECT_RETURNED_BY_PARTITIONRESOLVER_IS_NULL.toLocalizedString());

        }

      }

      // Finally, calculate the hash.

      return getHashKey(pr, resolveKey);

    }

  }


如何进行数据感知路由?

 

GemFire 开发了一个Function Service 模块能够让客户端和服务器节点一起来处理提交的任务。如果数据跨多个节点分区,GemFire能够透明地路由数据执行行为到待处理数据的节点,这样避免了数据跨网络移动,这被称为“数据感知功能路由”。带有数据感知路由功能的应用根本不需要管理数据。

 

GemFire路由数据的执行行为而不是数据本身,GemFire直接路由数据执行行为到需要做并行处理,或汇聚结果的节点。这个特性使得GemFire从根本上降低了执行复杂任务的时间。分布式并行处理活动被抽象出来,与应用调用端无关。

 

 

应用即能够在单点执行,也能在一个小集群并行执行,甚至能够跨整个分布式集群并行执行。

GemFire的并行模型非常类似于Google的Map-Reduce模型。数据感知路由最适合于执行迭代查询或汇聚数据条目的操作。通过数据并处和并行计算,系统的吞吐量显著提升。最重要的是,计算的延迟与并行计算的节点数成反比。

 

单节点的Function执行类似于关系型数据库的Stored Procedure执行。

 

并行计算之后,结果通过Function中的结果收集器,把处理完的结果会调用ResultCollector统一收集回来。相当于Map-Reduce模型中的输出收集器。



相关文章
|
1月前
|
存储 关系型数据库 Apache
深入理解Apache Hudi异步索引机制
深入理解Apache Hudi异步索引机制
37 2
|
1月前
|
存储 缓存 分布式计算
Apache Hudi数据跳过技术加速查询高达50倍
Apache Hudi数据跳过技术加速查询高达50倍
38 2
|
1月前
|
分布式计算 测试技术 Apache
如何不加锁地将数据并发写入Apache Hudi?
如何不加锁地将数据并发写入Apache Hudi?
32 0
|
1月前
|
SQL Apache HIVE
一文彻底掌握Apache Hudi的主键和分区配置
一文彻底掌握Apache Hudi的主键和分区配置
61 0
|
1月前
|
Apache 开发者
揭秘!Apache Hudi社区发展数据盘点
揭秘!Apache Hudi社区发展数据盘点
32 0
|
1月前
|
分布式计算 Java 数据管理
使用Apache Hudi + Amazon EMR进行变化数据捕获(CDC)
使用Apache Hudi + Amazon EMR进行变化数据捕获(CDC)
87 0
|
1月前
|
分布式计算 大数据 测试技术
查询时间降低60%!Apache Hudi数据布局黑科技了解下
查询时间降低60%!Apache Hudi数据布局黑科技了解下
22 0
|
1月前
|
分布式计算 测试技术 Apache
如何将数据更快导入Apache Hudi?
如何将数据更快导入Apache Hudi?
30 0
|
1月前
|
消息中间件 分布式计算 Kafka
硬核!Apache Hudi中自定义序列化和数据写入逻辑
硬核!Apache Hudi中自定义序列化和数据写入逻辑
33 1
|
1月前
|
消息中间件 Kafka 数据处理
超硬核!详解Apache Hudi灵活的Payload机制
超硬核!详解Apache Hudi灵活的Payload机制
39 3

热门文章

最新文章

推荐镜像

更多