Apache Storm 官方文档 —— 分布式 RPC

简介:

分布式 RPC(DRPC)的设计目标是充分利用 Storm 的计算能力实现高密度的并行实时计算。Storm 接收若干个函数参数作为输入流,然后通过 DRPC 输出这些函数调用的结果。严格来说,DRPC 并不能算作是 Storm 的一个特性,因为它只是一种基于 Storm 原语 (Stream、Spout、Bolt、Topology) 实现的计算模式。虽然可以将 DRPC 从 Storm 中打包出来作为一个独立的库,但是与 Storm 集成在一起显然更有用。

概述

DRPC 是通过一个 DRPC 服务端(DRPC server)来实现分布式 RPC 功能的。DRPC server 负责接收 RPC 请求,并将该请求发送到 Storm 中运行的 Topology,等待接收 Topology 发送的处理结果,并将该结果返回给发送请求的客户端。因此,从客户端的角度来说,DPRC 与普通的 RPC 调用并没有什么区别。例如,以下是一个使用参数 “http://twitter.com” 调用 “reach” 函数计算结果的例子:

DRPCClient client = new DRPCClient("drpc-host", 3772);
String result = client.execute("reach", "http://twitter.com");

下图是 DRPC 的原理示意图。

DRPC

客户端通过向 DRPC 服务器发送待执行函数的名称以及该函数的参数来获取处理结果。实现该函数的拓扑使用一个DRPCSpout 从 DRPC 服务器中接收一个函数调用流。DRPC 服务器会为每个函数调用都标记了一个唯一的 id。随后拓扑会执行函数来计算结果,并在拓扑的最后使用一个名为 ReturnResults 的 bolt 连接到 DRPC 服务器,根据函数调用的 id 来将函数调用的结果返回。

定义 DRPC 拓扑

可以直接使用普通的拓扑构造方法来构造 DRPC 拓扑,如下所示:

public static class ExclaimBolt extends BaseBasicBolt {
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String input = tuple.getString(1);
        collector.emit(new Values(tuple.getValue(0), input + "!"));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id", "result"));
    }
}
public static void main(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();
    // builder.setSpout(drpcSpout);
    // builder.setBolt(new ExclaimBolt(), 3);
    // submit(builder.createTopology());
}

本地模式 DRPC

DRPC 可以在本地模式下运行。以下是使用本地模式构造拓扑的例子:

LocalDRPC drpc = new LocalDRPC();
DRPCSpout spout = new DRPCSpout("exclamation", drpc);
builder.setSpout("drpc", spout);
builder.setBolt("exclaim", new ExclaimBolt(), 3)
        .shuffleGrouping("drpc");
builder.setBolt("return", new ReturnResults(), 3)
        .shuffleGrouping("exclaim");

LocalCluster cluster = new LocalCluster();
Config conf = new Config();
cluster.submitTopology("drpc-demo", conf, builder.createTopology());

// local mode 测试代码
System.out.println(drpc.execute("exclamation", "hello"));

cluster.shutdown();
drpc.shutdown();

在这种模式下,首先你会创建一个 LocalDPRC 对象,该对象会在进程中模拟一个 DRPC 服务器,其作用类似于LocalCluster 在进程中模拟 Storm 集群的功能。在定义好拓扑的各个组件之后,就可以使用 LocalCluster 来提交拓扑。在本地模式下 LocalDPRC 对象不会绑定到任何一个实际的端口,所以需要通过向 DRPCSpout 传入参数的方式来关联到拓扑中。

在启动拓扑后,你可以使用 execute 方法来完成 DRPC 调用。

远程模式 DRPC

在一个实际的集群中使用 DRPC 有以下三个步骤:

  1. 配置并启动 DRPC 服务器;
  2. 在集群的各个服务器上配置 DRPC 服务器的地址;
  3. 将 DRPC 拓扑提交到集群运行。

可以像 Nimbus、Supervisor 那样使用 storm 命令来启动 DRPC 服务器(注意,此 server 的基本配置,如 nimbus,ZooKeeper 等参数应该与 Storm 集群其他机器相同):

bin/storm drpc

接下来,你需要在集群的各个服务器上配置 DRPC 服务器的地址。这是为了让 DRPCSpout 了解从哪里获取函数调用的方法。可以通过编辑 storm.yaml 或者添加拓扑配置的方式实现配置。配置 storm.yaml 的方式类似于下面这样:

drpc.servers:
  - "drpc1.foo.com"
  - "drpc2.foo.com"

最后,你可以像其他拓扑一样使用 StormSubmitter 来启动拓扑。

以下是使用远程模式构造拓扑的一个例子:

TopologyBuilder builder = new TopologyBuilder();

DRPCSpout spout = new DRPCSpout("exclamation");
builder.setSpout("drpc", spout, 3);
builder.setBolt("exclaim", new ExclamationBolt(), 3)
        .shuffleGrouping("drpc");
builder.setBolt("return", new ReturnResults(), 7)
        .shuffleGrouping("exclaim");

Config conf = new Config();
conf.setNumWorkers(2);

StormSubmitter.submitTopology("drpc-demo", conf, builder.createTopology());

更复杂的例子

请参考Trident 教程一文中计算指定 URL 的 Reach 数的例子。

相关文章
|
6月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
778 5
|
15天前
|
消息中间件 监控 数据可视化
Apache Airflow 开源最顶级的分布式工作流平台
Apache Airflow 是一个用于创作、调度和监控工作流的平台,通过将工作流定义为代码,实现更好的可维护性和协作性。Airflow 使用有向无环图(DAG)定义任务,支持动态生成、扩展和优雅的管道设计。其丰富的命令行工具和用户界面使得任务管理和监控更加便捷。适用于静态和缓慢变化的工作流,常用于数据处理。
Apache Airflow 开源最顶级的分布式工作流平台
|
5月前
|
监控 NoSQL 数据建模
使用Apache Cassandra进行分布式数据库管理的技术实践
【6月更文挑战第5天】本文探讨了使用Apache Cassandra进行分布式数据库管理的技术实践。Cassandra是一款高性能、可扩展的NoSQL数据库,适合大规模、高并发场景。文章介绍了其高可扩展性、高性能、高可用性和灵活数据模型等核心特性,并详细阐述了环境准备、安装配置、数据建模与查询以及性能优化与监控的步骤。通过本文,读者可掌握Cassandra的运用,适应不断增长的数据需求。
|
5月前
|
存储 分布式计算 Hadoop
使用Apache Hadoop进行分布式计算的技术详解
【6月更文挑战第4天】Apache Hadoop是一个分布式系统框架,应对大数据处理需求。它包括HDFS(分布式文件系统)和MapReduce编程模型。Hadoop架构由HDFS、YARN(资源管理器)、MapReduce及通用库组成。通过环境搭建、编写MapReduce程序,可实现分布式计算。例如,WordCount程序用于统计单词频率。优化HDFS和MapReduce性能,结合Hadoop生态系统工具,能提升整体效率。随着技术发展,Hadoop在大数据领域将持续发挥关键作用。
|
4月前
|
消息中间件 存储 Java
Apache Kafka是分布式消息系统,用于高吞吐量的发布订阅
【7月更文挑战第1天】Apache Kafka是分布式消息系统,用于高吞吐量的发布订阅。在Java中,开发者使用Kafka的客户端库创建生产者和消费者。生产者发送消息到主题,消费者订阅并消费。Kafka提供消息持久化、容灾机制,支持分区和复制以确保高可用性。通过优化如分区、批处理和消费者策略,可适应高并发场景。简单的Java示例展示了如何创建和交互消息。
66 0
|
6月前
|
缓存 监控 Java
Java一分钟之-Apache Geode:分布式内存数据平台
【5月更文挑战第21天】Apache Geode是低延迟的分布式内存数据平台,用于构建实时应用,提供缓存、数据库和消息传递功能。本文聚焦于Geode的常见问题,如数据一致性(数据同步延迟和分区冲突)和性能瓶颈(网络延迟和资源管理不当),并提出解决方案。确保数据一致性可通过选择合适的数据策略和利用`InterestPolicy`、`CacheListener`;提升性能则需优化网络和合理配置资源。通过示例代码展示了如何创建和操作Geode的Region。正确配置和调优Geode对于实现高可用、高性能应用至关重要。
127 1
|
6月前
|
存储 缓存 监控
Java一分钟之-Apache Ignite:分布式内存计算平台
【5月更文挑战第21天】Apache Ignite是一款开源的分布式内存计算平台,涉及内存数据网格、流处理和计算服务。本文关注其常见问题,如数据丢失、分区不均、内存管理和网络延迟。为保证数据一致性,建议使用适当的數據模式和备份策略,实现数据持久化。优化内存配置和监控网络可提升性能与稳定性。提供的Java代码示例展示了如何创建分区缓存并设置备份。正确配置和管理Ignite是构建高可用、高性能应用的关键,持续监控集群状态至关重要。
172 0
|
自然语言处理 关系型数据库 定位技术
分布式系列教程(35) -ElasticSearch文档映射
分布式系列教程(35) -ElasticSearch文档映射
73 0
|
6月前
|
分布式计算 资源调度 Hadoop
Apache Hadoop入门指南:搭建分布式大数据处理平台
【4月更文挑战第6天】本文介绍了Apache Hadoop在大数据处理中的关键作用,并引导初学者了解Hadoop的基本概念、核心组件(HDFS、YARN、MapReduce)及如何搭建分布式环境。通过配置Hadoop、格式化HDFS、启动服务和验证环境,学习者可掌握基本操作。此外,文章还提及了开发MapReduce程序、学习Hadoop生态系统和性能调优的重要性,旨在为读者提供Hadoop入门指导,助其踏入大数据处理的旅程。
890 0
|
6月前
|
监控 安全 Apache
Apache ZooKeeper - 使用ZK实现分布式锁(非公平锁/公平锁/共享锁 )
Apache ZooKeeper - 使用ZK实现分布式锁(非公平锁/公平锁/共享锁 )
319 1

热门文章

最新文章

推荐镜像

更多
下一篇
无影云桌面