Apache Storm 官方文档 —— 多语言接口协议

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介:

本文描述了 Storm (0.7.1 版本以上)的多语言接口协议。

Storm 多语言协议

Shell 组件

Storm 的多语言支持主要通过 ShellBolt,ShellSpout 和 ShellProcess 类来实现。这些类实现了 IBolt 接口、ISpout 接口,并通过使用 Java 的 ProcessBuilder 类调用 shell 进程实现了执行脚本的接口协议。

输出域

输出域是拓扑的 Thrift 定义的一部分。也就是说,如果你在 Java 中使用了多语言接口,那么你就需要创建一个继承自 ShellBolt 并实现 IRichBolt 接口的 bolt,这个 bolt 还需要在 declareOutputFields 方法中声明输出域(ShellSpout 也有类似的问题)。

你可以在基础概念一文中了解更多相关信息。

协议报头

最简单的协议是通过执行脚本或程序的标准输入输出(STDIN/STDOUT)来实现的。在这个过程中传输的数据都是以 JSON 格式编码的,这样可以支持很多种语言。

打包

为了在集群上运行壳组件,执行的外壳脚本必须和待提交的 jar 包一起置于 resources/ 目录下。

但是,在本地开发测试时,resources 目录只需要保持在 classpath 中即可。

协议

注意:

  • 输入输出协议的结尾都使用行读机制,所以,必须要修剪掉输入中的新行并将他们添加到输出中。
  • 所有的 JSON 输入输出都由一个包含 “end” 的行结束标志。注意,这个定界符并不是 JSON 的一部分。
  • 下面的几个标题就是从脚本作者的 STDIN 与 STDOUT 的角度出发的。

初始握手

两种类型壳组件的初始握手过程都是相同的:

  • STDIN: 设置信息。这是一个包含 Storm 配置、PID 目录、拓扑上下文的 JSON 对象:
{
    "conf": {
        "topology.message.timeout.secs": 3,
        // etc
    },
    "pidDir": "...",
    "context": {
        "task->component": {
            "1": "example-spout",
            "2": "__acker",
            "3": "example-bolt1",
            "4": "example-bolt2"
        },
        "taskid": 3,
        // 以下内容仅支持 Storm 0.10.0 以上版本
        "componentid": "example-bolt"
        "stream->target->grouping": {
            "default": {
                "example-bolt2": {
                    "type": "SHUFFLE"}}},
        "streams": ["default"],
        "stream->outputfields": {"default": ["word"]},
        "source->stream->grouping": {
            "example-spout": {
                "default": {
                    "type": "FIELDS",
                    "fields": ["word"]
                }
            }
        }
        "source->stream->fields": {
            "example-spout": {
                "default": ["word"]
            }
        }
    }
}

你的脚本应该在这个目录下创建一个以 PID 命名的空文件。比如,PID 是 1234 的时候,在目录中创建一个名为 1234 的空文件。这个文件可以让 supervisor 了解到进程的 PID,这样,supervisor 在需要的时候就可以关闭该进程。

Storm 0.10.0 加强了发送到壳组件的上下文的功能,现在的上下文中包含了兼容 JVM 组件的拓扑上下文中的所有内容。新增的一个关键因素是确定拓扑中某个壳组件的源与目标(也就是输入与输出)的功能,这是通过 stream->target->grouping 和 source->stream->grouping字典实现的。在这些关联字典的底层,分组是以字典的形式表示的,至少包含有一个 type 键,并且也可以含有一个 fields 键,该键可以用于指定在 FIELDS 分组中所涉及的域。

  • STDOUT: 你的 PID,以 JSON 对象的形式展现,比如 {"pid": 1234}。这个壳组件将会把 PID 记录到它自己的日志中。

接下来怎么做就要取决于组件的具体类型了。

Spouts

Shell Spouts 都是同步的。以下内容是在一个 while(true) 循环中实现的:

  • STDIN: 一个 next、ack 或者 fail 命令。

“next” 与 ISpout 的 nextTuple 等价,可以这样定义 “next”:

{"command": "next"}

可以这样定义 “ack”:

{"command": "ack", "id": "1231231"}

可以这样定义 “fail”:

{"command": "fail", "id": "1231231"}
  • STDOUT: 前面的命令对你的 spout 作用产生的结果。这个结果可以是一组 emits 和 logs。

emit 大概是这样的:

{
    "command": "emit",
    // tuple 的 id,如果是不可靠 emit 可以省略此值,该 id 可以为字符串或者数字
    "id": "1231231",
    // tuple 将要发送到的流 id,如果发送到默认流,将该值留空
    "stream": "1",
    // 如果是一个直接型 emit,需要定义 tuple 将要发送到的任务 id
    "task": 9,
    // 这个 tuple 中的所有值
    "tuple": ["field1", 2, 3]
}

如果不是直接型 emit,你会立即在 STDIN 上收到一条表示 tuple 发送到的任务的 id 的消息,这个消息是以 JSON 数组形式展现的。

“log” 会将消息记录到 worker log 中,“log” 大概是这样的:

{
    "command": "log",
    // 待记录的消息
    "msg": "hello world!"
}
  • STDOUT: “sync” 命令会结束 emits 与 logs 的队列,“sync” 是这样使用的:
{"command": "sync"}

在 sync 之后, ShellSpout 不会继续读取你的输出,直到它发送出新的 next,ack 或者 fail。

注意,与 ISpout 类似,worker 中的所有 spouts 都会在调用 next,ack 或者 fail 之后锁定,直到你调用 sync。同样,如果没有需要发送的 tuple,你也应该在 sync 之前 sleep 一小段时间。ShellSpout 不会自动 sleep。

Bolts

Shell Bolts 的协议是异步的。你会在有 tuple 可用时立即从 STDIN 中获取到 tuple,同时你需要像下面的示例这样调用 emit,ack,fail,log 等操作写入 STDOUT:

  • STDIN: 就是一个 tuple!这是一个 JSON 编码的结构:
{
    // tuple 的 id,为了兼容缺少 64 位数据类型的语言,这里使用了字符串
    "id": "-6955786537413359385",
    // 创建该 tuple 的 id
    "comp": "1",
    // tuple 将要发往的流 id
    "stream": "1",
    // 创建该 tuple 的任务
    "task": 9,
    // tuple 中的所有值
    "tuple": ["snow white and the seven dwarfs", "field2", 3]
}
  • STDOUT: 一个 ack,fail,emit 或者 log。例如,emit 是这样的:
{
    "command": "emit",
    // 标记这个输出 tuple 的 tuples 的 ids
    "anchors": ["1231231", "-234234234"],
    // tuple 将要发送到的流 id,如果发送到默认流,将该值留空
    "stream": "1",
    // 如果是一个直接型 emit,需要定义 tuple 将要发送到的任务 id
    "task": 9,
    // 这个 tuple 中的所有值
    "tuple": ["field1", 2, 3]
}

如果不是直接型 emit,你会立即在 STDIN 上收到一条表示 tuple 发送到的任务的 id 的消息,这个消息是以 JSON 数组形式展现的。注意,由于 shell bolt 协议的异步特性,如果你在 emit 之后立即接收数据,有可能不会收到对应的任务 id,而是收到上一个 emit 的任务 id,或者是一个待处理的新 tuple。然而,最终接收到的任务 id 序列仍然是和 emit 的顺序完全一致的。

ack 是这样的:

{
    "command": "ack",
    // 待 ack 的 tuple
    "id": "123123"
}

fail 是这样的:

{
    "command": "fail",
    // 待 fail 的 tuple
    "id": "123123"
}

“log” 会将消息记录到 worker log 中,“log” 是这样的:

{
    "command": "log",
    // 待记录的消息
    "msg": "hello world!"
}
  • 注意:对于 0.7.1 版本,shell bolt 不再需要进行“同步”。

处理心跳(0.9.3 及以上版本适用)

Storm 0.9.3 通过在 ShellSpout/ShellBolt 与他们的多语言子进程之间使用心跳来检测子进程是否处于挂起或僵死状态。所有通过多语言接口与 Storm 交互的库都必须使用以下步骤来……

Spout

Shell Spouts 是同步的,所有子进程会在 next() 的结尾发送 sync 命令。因此,你不需要为 spouts 做过多的处理。也就是说,在 next()过程中不能够让子进程的 sleep 时间超过 worker 的延时时间。

Bolt

Shell Bolts 是异步的,所以 ShellBolt 会定期向它的子进程发送心跳 tuple。心跳 tuple 是这样的:

{
    "id": "-6955786537413359385",
    "comp": "1",
    "stream": "__heartbeat",
    // 这个 shell bolt 的系统任务 id
    "task": -1,
    "tuple": []
}

在子进程收到心跳 tuple 之后,它必须向 ShellBolt 发送一个 sync 命令。

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
6月前
|
SQL Java 数据库连接
Apache Doris 支持 Arrow Flight SQL 协议,数据传输效率实现百倍飞跃
近年来,随着数据科学、数据湖分析等场景的兴起,对数据读取和传输速度提出更高的要求。而 JDBC/ODBC 作为与数据库交互的主流标准,在应对大规模数据读取和传输时显得力不从心,无法满足高性能、低延迟等数据处理需求。为提供更高效的数据传输方案,Apache Doris 在 2.1 版本中基于 Arrow Flight SQL 协议实现了高速数据传输链路,使得数据传输性能实现百倍飞跃。
|
3月前
|
消息中间件 Java Kafka
【Azure 事件中心】在微软云中国区 (Mooncake) 上实验以Apache Kafka协议方式发送/接受Event Hubs消息 (Java版)
【Azure 事件中心】在微软云中国区 (Mooncake) 上实验以Apache Kafka协议方式发送/接受Event Hubs消息 (Java版)
|
6月前
|
SQL Apache 流计算
Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
【2月更文挑战第25天】Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
696 3
|
存储 Java BI
探索Apache POI库:强大的Excel和Word文档处理工具
在企业应用和数据处理中,Excel和Word文档是常见的数据交换和存储格式。然而,处理和操作这些文档可能是一项繁琐的任务。Apache POI库作为一款强大的文档处理工具,可以帮助我们更轻松地进行Excel和Word文档的读写、编辑和生成。本文将深入探讨Apache POI库的基本概念、特点,以及如何在实际应用中使用它进行文档处理。
760 0
|
XML Java API
Apache POI详解及Word文档读取示例
apache poi资料详解,包括内部jar包依赖关系,及与使用文档的对应关系
1401 0
|
自然语言处理 Kubernetes Dubbo
带你读《Apache Dubbo微服务开发从入门到精通》——二、 HTTP/2(Triple)协议(2)
带你读《Apache Dubbo微服务开发从入门到精通》——二、 HTTP/2(Triple)协议(2)
158 2
|
自然语言处理 Kubernetes Dubbo
带你读《Apache Dubbo微服务开发从入门到精通》——二、 HTTP/2(Triple)协议(1)
带你读《Apache Dubbo微服务开发从入门到精通》——二、 HTTP/2(Triple)协议(1)
204 3
|
消息中间件 存储 负载均衡
「事件驱动架构」Apache Kafka再平衡协议:再平衡协议101
「事件驱动架构」Apache Kafka再平衡协议:再平衡协议101
「事件驱动架构」Apache Kafka再平衡协议:再平衡协议101
|
消息中间件 存储 分布式计算
Hadoop生态系统中的实时数据处理技术:Apache Kafka和Apache Storm的应用
Hadoop生态系统中的实时数据处理技术:Apache Kafka和Apache Storm的应用
|
消息中间件 大数据 Kafka
数据流处理:Apache Samza和Apache Storm的比较
数据流处理是现代大数据应用程序中至关重要的组成部分。为了有效地处理大规模的实时数据流,开发人员需要选择适合其需求的数据流处理框架。在本文中,我们将比较两个受欢迎的数据流处理框架 Apache Samza 和 Apache Storm,并探讨它们的特点、优势和适用场景。
251 0

推荐镜像

更多