开发者社区> 喜洋洋123> 正文

Apache Geode/GemFire 数据分区和路由机制浅析

简介: 本篇文章简单介绍 Apache Geode/GemFire 的数据分区和数据路由机制,并深入到源码做简要地剖析Apache Geode/GemFire 是如何进行数据分区和数据路由的。
+关注继续查看

本篇文章主要讲解Apache Geode/GemFire 是如何进行数据分区的。


GemFire和大多数分布式系统一样都采用 Hash 的方式对数据进行分区,将 Entry 数据分布到 PartitionedRegion 当中,大家都知道 Entry 数据主要保存在 ConcurrentHashMap 中,ConcurrentHashMap存放在 Bucket 中,在 PR 服务器启动后会为 PartitionedRegion创建相应的Bucket 来保存这个ConcurrentHashMap。因此它们三者的映射关系如下所示:


Entry—> ConcurrentHashMap—> Bucket—> Region



如何进行数据分区?


当前端应用对Entry 进行操作后,Entry 会按照如下的步骤分区到PR 服务器上。


1.Entry 在插入到分布式集群的某台节点服务器过程中,会放到 PR 创建的 Bucket 中。


2.在进行 Put 操作时, 会产生一个EntryOperation 事件,在这个 Event 事件中可以找到PartionedRegion,和要进行 Put 操作的 Key。这个路由的对象就是一般的 POJO 操作类。


3.在获得路由对象上,从条目操作中获得 Key 键,在通过 Key 键来得到相关的对象Object。


4.接下来再通过 Key来获得bucketId, 再通过bucketId获得 PR,给定一个key/routing对象, 运行hashCode()生成一个 long 值, 然后用这个值与 bucket size 取模得到bucketId值。

   * 为了更好地进行哈希key分布, 使用MD5、SHA或其他的 ID 生成方法.


详见PartionedRegionHelper 的getHashKey方法.


private static int getHashKey(EntryOperation event, PartitionedRegion pr,

      Operation operation, Object key, Object value, Object callbackArgument) {

    // avoid creating EntryOperation if there is no resolver

    if (event != null) {

      pr = (PartitionedRegion)event.getRegion();

      key = event.getKey();

      callbackArgument = event.getCallbackArgument();

    }


    PartitionResolver resolver = getResolver(pr, key, callbackArgument);

    Object resolveKey = null;

    if (pr.isFixedPartitionedRegion()) {

      String partition = null ;

      if (resolver instanceof FixedPartitionResolver) {

        Map<String, Integer[]> partitionMap = pr.getPartitionsMap();

        if (event == null) {

          event = new EntryOperationImpl(pr, operation, key, value,

              callbackArgument);

        }

        partition = ((FixedPartitionResolver)resolver).getPartitionName(

            event, partitionMap.keySet());

        if (partition == null) {

          Object[] prms = new Object[] { pr.getName(), resolver };

          throw new IllegalStateException(

              LocalizedStrings.PartitionedRegionHelper_FOR_REGION_0_PARTITIONRESOLVER_1_RETURNED_PARTITION_NAME_NULL.toLocalizedString(prms));

        }

        Integer[] bucketArray = partitionMap.get(partition);

        if (bucketArray == null) {

          Object[] prms = new Object[] { pr.getName(), partition };

          throw new PartitionNotAvailableException(

              LocalizedStrings.PartitionedRegionHelper_FOR_FIXED_PARTITIONED_REGION_0_FIXED_PARTITION_1_IS_NOT_AVAILABLE_ON_ANY_DATASTORE.toLocalizedString(prms));

        }

        int numBukets = bucketArray[1];

        resolveKey = (numBukets == 1) ? partition : resolver.getRoutingObject(event);

      }

      else if (resolver == null) {

        throw new IllegalStateException(

            LocalizedStrings.PartitionedRegionHelper_FOR_FIXED_PARTITIONED_REGION_0_FIXED_PARTITION_RESOLVER_IS_NOT_AVAILABLE.toString(pr.getName()));

      }

      else if (!(resolver instanceof FixedPartitionResolver)) {

        Object[] prms = new Object[] { pr.getName(), resolver };

        throw new IllegalStateException(

            LocalizedStrings.PartitionedRegionHelper_FOR_FIXED_PARTITIONED_REGION_0_RESOLVER_DEFINED_1_IS_NOT_AN_INSTANCE_OF_FIXEDPARTITIONRESOLVER.toLocalizedString(prms));

      }

      return assignFixedBucketId(pr, partition, resolveKey);

    }

    else {

      // Calculate resolveKey.

      if (resolver == null) {

        // no custom partitioning at all

        resolveKey = key;

        if (resolveKey == null) {

          throw new IllegalStateException("attempting to hash null");

        }

      }

      else {

        if (event == null) {

          event = new EntryOperationImpl(pr, operation, key, value,

              callbackArgument);

        }

        // 通过 Entry 操作, 获得一个路由对象, 得到resolveKey, 在通过 resolveKey进行Hash计算

        resolveKey = resolver.getRoutingObject(event);

        if (resolveKey == null) {

          throw new IllegalStateException(

              LocalizedStrings.PartitionedRegionHelper_THE_ROUTINGOBJECT_RETURNED_BY_PARTITIONRESOLVER_IS_NULL.toLocalizedString());

        }

      }

      // Finally, calculate the hash.

      return getHashKey(pr, resolveKey);

    }

  }


如何进行数据感知路由?

 

GemFire 开发了一个Function Service 模块能够让客户端和服务器节点一起来处理提交的任务。如果数据跨多个节点分区,GemFire能够透明地路由数据执行行为到待处理数据的节点,这样避免了数据跨网络移动,这被称为“数据感知功能路由”。带有数据感知路由功能的应用根本不需要管理数据。

 

GemFire路由数据的执行行为而不是数据本身,GemFire直接路由数据执行行为到需要做并行处理,或汇聚结果的节点。这个特性使得GemFire从根本上降低了执行复杂任务的时间。分布式并行处理活动被抽象出来,与应用调用端无关。

 

 

应用即能够在单点执行,也能在一个小集群并行执行,甚至能够跨整个分布式集群并行执行。

GemFire的并行模型非常类似于Google的Map-Reduce模型。数据感知路由最适合于执行迭代查询或汇聚数据条目的操作。通过数据并处和并行计算,系统的吞吐量显著提升。最重要的是,计算的延迟与并行计算的节点数成反比。

 

单节点的Function执行类似于关系型数据库的Stored Procedure执行。

 

并行计算之后,结果通过Function中的结果收集器,把处理完的结果会调用ResultCollector统一收集回来。相当于Map-Reduce模型中的输出收集器。



版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
Apache Flume-- 自定义 sink(扩展)--数据写入本地|学习笔记
Apache Flume-- 自定义 sink(扩展)--数据写入本地
107 0
《基于Apache Hudi的CDC数据入湖》电子版地址
《基于Apache Hudi的CDC数据入湖》PPT
36 0
Spring认证中国教育管理中心-Apache Geode 的 Spring 数据教程一
Spring认证中国教育管理中心-Apache Geode 的 Spring 数据教程一
68 0
Apache Geode 的 Spring 数据
Spring认证|Apache Geode 的 Spring 数据
70 0
Apache Geode/GemFire入门(2)-基本概念和模块
Apache Geode/GemFire 入门文章,简要介绍 Geode 基本概念和核心功能模块,以及基本使用方法,有助于初学者比较好的理解相关基本概念
5732 0
Apache Geode/GemFire入门(1)-基本概念和模块
Apache Geode/GemFire 入门文章,简要介绍 Geode 基本概念和核心功能模块,以及基本使用方法,有助于初学者比较好的理解相关基本概念
11164 0
+关注
喜洋洋123
Apache Geode/GemFire 中国社区发起人
文章
问答
文章排行榜
最热
最新
相关电子书
更多
贺小令|Apache Flink 1.16 简介
立即下载
Apache Dubbo3 源码深入解读
立即下载
Apache Dubbo 微服务开发从入门到精通
立即下载