Apache Storm 官方文档 —— 多语言接口协议

简介:

本文描述了 Storm (0.7.1 版本以上)的多语言接口协议。

Storm 多语言协议

Shell 组件

Storm 的多语言支持主要通过 ShellBolt,ShellSpout 和 ShellProcess 类来实现。这些类实现了 IBolt 接口、ISpout 接口,并通过使用 Java 的 ProcessBuilder 类调用 shell 进程实现了执行脚本的接口协议。

输出域

输出域是拓扑的 Thrift 定义的一部分。也就是说,如果你在 Java 中使用了多语言接口,那么你就需要创建一个继承自 ShellBolt 并实现 IRichBolt 接口的 bolt,这个 bolt 还需要在 declareOutputFields 方法中声明输出域(ShellSpout 也有类似的问题)。

你可以在基础概念一文中了解更多相关信息。

协议报头

最简单的协议是通过执行脚本或程序的标准输入输出(STDIN/STDOUT)来实现的。在这个过程中传输的数据都是以 JSON 格式编码的,这样可以支持很多种语言。

打包

为了在集群上运行壳组件,执行的外壳脚本必须和待提交的 jar 包一起置于 resources/ 目录下。

但是,在本地开发测试时,resources 目录只需要保持在 classpath 中即可。

协议

注意:

  • 输入输出协议的结尾都使用行读机制,所以,必须要修剪掉输入中的新行并将他们添加到输出中。
  • 所有的 JSON 输入输出都由一个包含 “end” 的行结束标志。注意,这个定界符并不是 JSON 的一部分。
  • 下面的几个标题就是从脚本作者的 STDIN 与 STDOUT 的角度出发的。

初始握手

两种类型壳组件的初始握手过程都是相同的:

  • STDIN: 设置信息。这是一个包含 Storm 配置、PID 目录、拓扑上下文的 JSON 对象:
{
    "conf": {
        "topology.message.timeout.secs": 3,
        // etc
    },
    "pidDir": "...",
    "context": {
        "task->component": {
            "1": "example-spout",
            "2": "__acker",
            "3": "example-bolt1",
            "4": "example-bolt2"
        },
        "taskid": 3,
        // 以下内容仅支持 Storm 0.10.0 以上版本
        "componentid": "example-bolt"
        "stream->target->grouping": {
            "default": {
                "example-bolt2": {
                    "type": "SHUFFLE"}}},
        "streams": ["default"],
        "stream->outputfields": {"default": ["word"]},
        "source->stream->grouping": {
            "example-spout": {
                "default": {
                    "type": "FIELDS",
                    "fields": ["word"]
                }
            }
        }
        "source->stream->fields": {
            "example-spout": {
                "default": ["word"]
            }
        }
    }
}

你的脚本应该在这个目录下创建一个以 PID 命名的空文件。比如,PID 是 1234 的时候,在目录中创建一个名为 1234 的空文件。这个文件可以让 supervisor 了解到进程的 PID,这样,supervisor 在需要的时候就可以关闭该进程。

Storm 0.10.0 加强了发送到壳组件的上下文的功能,现在的上下文中包含了兼容 JVM 组件的拓扑上下文中的所有内容。新增的一个关键因素是确定拓扑中某个壳组件的源与目标(也就是输入与输出)的功能,这是通过 stream->target->grouping 和 source->stream->grouping字典实现的。在这些关联字典的底层,分组是以字典的形式表示的,至少包含有一个 type 键,并且也可以含有一个 fields 键,该键可以用于指定在 FIELDS 分组中所涉及的域。

  • STDOUT: 你的 PID,以 JSON 对象的形式展现,比如 {"pid": 1234}。这个壳组件将会把 PID 记录到它自己的日志中。

接下来怎么做就要取决于组件的具体类型了。

Spouts

Shell Spouts 都是同步的。以下内容是在一个 while(true) 循环中实现的:

  • STDIN: 一个 next、ack 或者 fail 命令。

“next” 与 ISpout 的 nextTuple 等价,可以这样定义 “next”:

{"command": "next"}

可以这样定义 “ack”:

{"command": "ack", "id": "1231231"}

可以这样定义 “fail”:

{"command": "fail", "id": "1231231"}
  • STDOUT: 前面的命令对你的 spout 作用产生的结果。这个结果可以是一组 emits 和 logs。

emit 大概是这样的:

{
    "command": "emit",
    // tuple 的 id,如果是不可靠 emit 可以省略此值,该 id 可以为字符串或者数字
    "id": "1231231",
    // tuple 将要发送到的流 id,如果发送到默认流,将该值留空
    "stream": "1",
    // 如果是一个直接型 emit,需要定义 tuple 将要发送到的任务 id
    "task": 9,
    // 这个 tuple 中的所有值
    "tuple": ["field1", 2, 3]
}

如果不是直接型 emit,你会立即在 STDIN 上收到一条表示 tuple 发送到的任务的 id 的消息,这个消息是以 JSON 数组形式展现的。

“log” 会将消息记录到 worker log 中,“log” 大概是这样的:

{
    "command": "log",
    // 待记录的消息
    "msg": "hello world!"
}
  • STDOUT: “sync” 命令会结束 emits 与 logs 的队列,“sync” 是这样使用的:
{"command": "sync"}

在 sync 之后, ShellSpout 不会继续读取你的输出,直到它发送出新的 next,ack 或者 fail。

注意,与 ISpout 类似,worker 中的所有 spouts 都会在调用 next,ack 或者 fail 之后锁定,直到你调用 sync。同样,如果没有需要发送的 tuple,你也应该在 sync 之前 sleep 一小段时间。ShellSpout 不会自动 sleep。

Bolts

Shell Bolts 的协议是异步的。你会在有 tuple 可用时立即从 STDIN 中获取到 tuple,同时你需要像下面的示例这样调用 emit,ack,fail,log 等操作写入 STDOUT:

  • STDIN: 就是一个 tuple!这是一个 JSON 编码的结构:
{
    // tuple 的 id,为了兼容缺少 64 位数据类型的语言,这里使用了字符串
    "id": "-6955786537413359385",
    // 创建该 tuple 的 id
    "comp": "1",
    // tuple 将要发往的流 id
    "stream": "1",
    // 创建该 tuple 的任务
    "task": 9,
    // tuple 中的所有值
    "tuple": ["snow white and the seven dwarfs", "field2", 3]
}
  • STDOUT: 一个 ack,fail,emit 或者 log。例如,emit 是这样的:
{
    "command": "emit",
    // 标记这个输出 tuple 的 tuples 的 ids
    "anchors": ["1231231", "-234234234"],
    // tuple 将要发送到的流 id,如果发送到默认流,将该值留空
    "stream": "1",
    // 如果是一个直接型 emit,需要定义 tuple 将要发送到的任务 id
    "task": 9,
    // 这个 tuple 中的所有值
    "tuple": ["field1", 2, 3]
}

如果不是直接型 emit,你会立即在 STDIN 上收到一条表示 tuple 发送到的任务的 id 的消息,这个消息是以 JSON 数组形式展现的。注意,由于 shell bolt 协议的异步特性,如果你在 emit 之后立即接收数据,有可能不会收到对应的任务 id,而是收到上一个 emit 的任务 id,或者是一个待处理的新 tuple。然而,最终接收到的任务 id 序列仍然是和 emit 的顺序完全一致的。

ack 是这样的:

{
    "command": "ack",
    // 待 ack 的 tuple
    "id": "123123"
}

fail 是这样的:

{
    "command": "fail",
    // 待 fail 的 tuple
    "id": "123123"
}

“log” 会将消息记录到 worker log 中,“log” 是这样的:

{
    "command": "log",
    // 待记录的消息
    "msg": "hello world!"
}
  • 注意:对于 0.7.1 版本,shell bolt 不再需要进行“同步”。

处理心跳(0.9.3 及以上版本适用)

Storm 0.9.3 通过在 ShellSpout/ShellBolt 与他们的多语言子进程之间使用心跳来检测子进程是否处于挂起或僵死状态。所有通过多语言接口与 Storm 交互的库都必须使用以下步骤来……

Spout

Shell Spouts 是同步的,所有子进程会在 next() 的结尾发送 sync 命令。因此,你不需要为 spouts 做过多的处理。也就是说,在 next()过程中不能够让子进程的 sleep 时间超过 worker 的延时时间。

Bolt

Shell Bolts 是异步的,所以 ShellBolt 会定期向它的子进程发送心跳 tuple。心跳 tuple 是这样的:

{
    "id": "-6955786537413359385",
    "comp": "1",
    "stream": "__heartbeat",
    // 这个 shell bolt 的系统任务 id
    "task": -1,
    "tuple": []
}

在子进程收到心跳 tuple 之后,它必须向 ShellBolt 发送一个 sync 命令。

相关实践学习
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。     相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
相关文章
|
SQL Java 数据库连接
Apache Doris 支持 Arrow Flight SQL 协议,数据传输效率实现百倍飞跃
近年来,随着数据科学、数据湖分析等场景的兴起,对数据读取和传输速度提出更高的要求。而 JDBC/ODBC 作为与数据库交互的主流标准,在应对大规模数据读取和传输时显得力不从心,无法满足高性能、低延迟等数据处理需求。为提供更高效的数据传输方案,Apache Doris 在 2.1 版本中基于 Arrow Flight SQL 协议实现了高速数据传输链路,使得数据传输性能实现百倍飞跃。
965 0
|
12月前
|
Dubbo 安全 应用服务中间件
Apache Dubbo 正式发布 HTTP/3 版本 RPC 协议,弱网效率提升 6 倍
在 Apache Dubbo 3.3.0 版本之后,官方推出了全新升级的 Triple X 协议,全面支持 HTTP/1、HTTP/2 和 HTTP/3 协议。本文将围绕 Triple 协议对 HTTP/3 的支持进行详细阐述,包括其设计目标、实际应用案例、性能测试结果以及源码架构分析等内容。
707 109
|
存储 Java BI
探索Apache POI库:强大的Excel和Word文档处理工具
在企业应用和数据处理中,Excel和Word文档是常见的数据交换和存储格式。然而,处理和操作这些文档可能是一项繁琐的任务。Apache POI库作为一款强大的文档处理工具,可以帮助我们更轻松地进行Excel和Word文档的读写、编辑和生成。本文将深入探讨Apache POI库的基本概念、特点,以及如何在实际应用中使用它进行文档处理。
1401 0
|
消息中间件 Java Kafka
【Azure 事件中心】在微软云中国区 (Mooncake) 上实验以Apache Kafka协议方式发送/接受Event Hubs消息 (Java版)
【Azure 事件中心】在微软云中国区 (Mooncake) 上实验以Apache Kafka协议方式发送/接受Event Hubs消息 (Java版)
334 1
|
自然语言处理 Kubernetes Dubbo
带你读《Apache Dubbo微服务开发从入门到精通》——二、 HTTP/2(Triple)协议(1)
带你读《Apache Dubbo微服务开发从入门到精通》——二、 HTTP/2(Triple)协议(1)
376 101
|
SQL Apache 流计算
Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
【2月更文挑战第25天】Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
1413 3
|
消息中间件 存储 负载均衡
「事件驱动架构」Apache Kafka再平衡协议:再平衡协议101
「事件驱动架构」Apache Kafka再平衡协议:再平衡协议101
「事件驱动架构」Apache Kafka再平衡协议:再平衡协议101
|
XML Java API
Apache POI详解及Word文档读取示例
apache poi资料详解,包括内部jar包依赖关系,及与使用文档的对应关系
3183 0
|
消息中间件 存储 分布式计算
Hadoop生态系统中的实时数据处理技术:Apache Kafka和Apache Storm的应用
Hadoop生态系统中的实时数据处理技术:Apache Kafka和Apache Storm的应用
|
消息中间件 大数据 Kafka
数据流处理:Apache Samza和Apache Storm的比较
数据流处理是现代大数据应用程序中至关重要的组成部分。为了有效地处理大规模的实时数据流,开发人员需要选择适合其需求的数据流处理框架。在本文中,我们将比较两个受欢迎的数据流处理框架 Apache Samza 和 Apache Storm,并探讨它们的特点、优势和适用场景。
369 0

推荐镜像

更多