RocketMQ Flink Catalog 设计与实践

简介: RocketMQ Flink Catalog 使用指南

摘要:本文为 RocketMQ Flink Catalog 使用指南。主要内容包括:

  1. Flink 和 Flink Catalog
  2. RocketMQ Flink Connector
  3. RocketMQ Flink Catalog

作者:李晓双 ,Apache RocketMQ Contributor

Mentor:蒋晓峰,Apache RocketMQ Committer

一、Flink 和 Flink Catalog

Flink 是一个分布式计算引擎,目前已经实现批流一体,可以实现对有界数据和无界数据的处理。需要有效分配和管理计算资源才能执行流式应用程序。

目前 Flink API 共抽象为四个部分:

  • 最顶层的抽象为 SQL。SQL 抽象与 Table API 抽象之间的关联是非常紧密的,并且 SQL 查询语句可以在 Table API 中定义的表上执行。
  • 第二层抽象为 Table API。Table API 是以表(Table)为中心的声明式编程(DSL)API,例如在流式数据场景下,它可以表示一张正在动态改变的表。
  • 第三层抽象是 Core APIs 。许多程序可能使用不到最底层的 API而是可以使用 Core APIs 进行编程:其中包含 DataStream API(应用于有界/无界数据流场景)和 DataSet API(应用于有界数据集场景)两部分。
  • 第四层抽象为有状态的实时流处理。

1

Flink Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。Flink 对于元数据的管理分为临时的、持久化的两种。内置的 GenericInMemoryCatalog 是基于内存实现的 Catalog,所有元数据只在 session 的生命周期内可用。JdbcCatalogHiveCatalog 就是可以持久化元数据的 Catalog。

Flink Catalog 是扩展的,支持用户自定义。为了在 Flink SQL 中使用自定义 Catalog,用户需要通过实现CatalogFactory接口来实现对应的 Catalog 工厂。该工厂是使用 Java 的服务提供者接口 (SPI) 发现的。可以将实现此接口的类添加到 META_INF/services/org.apache.flink.table.factories.FactoryJAR 文件中。

二、RocketMQ Flink Connector

RocketMQ 连接器为 Flink 提供从 RocketMQ Topic 中消费和写入数据的能力。Flink 的 Table API & SQL 程序可以连接到其他外部系统,用于读取和写入批处理和流式表。Source 提供对存储在外部系统(例如数据库、键值存储、消息队列或文件系统)中的数据的访问。Sink 将数据发送到外部存储系统。

该项目的 Github 仓库是: https://github.com/apache/rocketmq-flink

2

三、RocketMQ Flink Catalog

3.1 设计与实现

3.1.1 RocketMQ Flink Catalog 的设计主要分为两步

  • 实现一个 RocketMqCatalogFactory 基于字符串属性创建已配置 Catalog 实例的工厂。将此实现类添加到 META_INF/services/org.apache.flink.table.factories.Factory 中。
  • 继承 AbstractCatalog 实现 RocketMqCatalog,通过实现 Catalog 接口中的方法,完成对数据库、表、分区等信息的查询操作。

类图如下:

3

3.1.2 RocketMQ Flink Catalog 的存储

RocketMQ Flink Catalog 的底层存储使用的是 RocketMQ Schema Registry。Flink 调用 Catalog 的时候,在 AbstractCatalog 的实现类中通过 RocketMQ Schema Registry 的客户端和 RocketMQ Schema Registry 服务端进行交互。

  • Database : 返回默认的 default 。
  • Table : 从 RocketMQ Schema Registry 获取对应的 Schema,然后解析 IDL 转换成 DataType。
  • Partition : 通过 DefaultMQAdminExt 从 RocketMQ 中获取到 Partition 相关信息。
RocketMQ Schema Registry 是一个 Topic Schema 的管理中心。它为 Topic(RocketMQ Topic)的注册、删除、更新、获取和引用模式提供了一个 RESTful 接口。New RocketMQ 客户端通过将 Schema 与 Subject 关联起来,可以直接发送结构化数据。用户不再需要关心序列化和反序列化的细节。

4

3.1.3 RocketMQ Flink Catalog 支持的 API

目前 RocketMQ Flink Catalog 支持对 Database、Table、Partition 的查询和判断是否存在的操作,不支持创建、修改、删除。所以在使用之前需要通过 RocketMQ Schema Registry 来创建好对应的 Schema。

表格

3.2 使用指南

表环境(TableEnvironment)是 Flink 中集成 Table API & SQL 的核心概念。它负责:

  • 在内部的 Catalog 中注册 Table。
  • 注册外部的 Catalog。
  • 加载可插拔模块。
  • 执行 SQL 查询。
  • 注册自定义函数 (scalar、table 或 aggregation)。
  • 将 DataStream 或 DataSet 转换成 Table。
  • 持有对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用。

3.2.1 创建并注册 Catalog

Table API

RocketMQCatalog rocketMqCatalog = new RocketMQCatalog("rocketmq_catalog", "default", "http://localhost:9876", "http://localhost:8080");
tableEnvironment.registerCatalog("rocketmq_catalog", rocketMqCatalog);

SQL

TableResult tableResult = tableEnvironment.executeSql(
                "CREATE CATALOG rocketmq_catalog WITH (" +
                        "'type'='rocketmq_catalog'," +
                        "'nameserver.address'='http://localhost:9876'," +
                        "'schema.registry.base.url'='http://localhost:8088');");

3.2.2 修改当前的 Catalog

Table API

tableEnvironment.useCatalog("rocketmq_catalog");

SQL

tableEnvironment.executeSql("USE CATALOG rocketmq_catalog");

3.2.3 列出可用的 Catalog

Table API

String[] catalogs = tableEnvironment.listCatalogs();

SQL

TableResult tableResult = tableEnvironment.executeSql("show catalogs");

3.2.4 列出可用的 Database

Table API

String[] databases = tableEnvironment.listDatabases();

SQL

TableResult tableResult = tableEnvironment.executeSql("show databases");

3.2.5 列出可用的 Table

Table API

String[] tables = tableEnvironment.listTables();

SQL

TableResult tableResult = tableEnvironment.executeSql("show tables");

3.3 Quick Start

需要提前准备可用的 RocketMQ 、RocketMQ Schema Registry:

3.3.1 创建 Topic

创建两个 Topic,rocketmq_source 和 rocketmq_sink。

5

3.3.2 注册 Source Schema

curl -X POST -H "Content-Type: application/json" \
-d '{"schemaIdl":"{\"type\":\"record\",\"name\":\"rocketmq_source_schema\",\"namespace\":\"namespace\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]}"}' \
http://localhost:8088/schema-registry/v1/subject/rocketmq_source/schema/rocketmq_source_schema

3.3.3 注册 Sink Schema

curl -X POST -H "Content-Type: application/json" \
-d '{"schemaIdl":"{\"type\":\"record\",\"name\":\"rocketmq_sink_schema\",\"namespace\":\"namespace\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]}"}' \
http://localhost:8088/schema-registry/v1/subject/rocketmq_sink/schema/rocketmq_sink_schema

3.3.4 添加依赖

创建一个任务项目 ,添加 rocketmq-flink 的依赖 :

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-flink</artifactId>
    <version>1.0.0-SNAPSHOT</version>
</dependency>

目前 RocketMQ Schema Registry 还没有发布正式的版本,只有快照版,如果发现 jar 找不到,可以尝试以下方法:

<repositories>
    <repository>
        <id>snapshot-repos</id>
        <name>Apache Snapshot Repository</name>
        <url>https://repository.apache.org/snapshots/</url>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
        <layout>default</layout>
    </repository>
</repositories>

3.3.5 创建任务

/**
 * @author lixiaoshuang
 */
public class RocketMqCatalog {
    public static void main(String[] args) {
        // 初始化表环境参数
        EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().inStreamingMode().build();
        // 创建 table 环境
        TableEnvironment tableEnvironment = TableEnvironment.create(environmentSettings);


        // 注册 rocketmq catalog
        tableEnvironment.executeSql(
                "CREATE CATALOG rocketmq_catalog WITH (" +
                        "'type'='rocketmq_catalog'," +
                        "'nameserver.address'='http://localhost:9876'," +
                        "'schema.registry.base.url'='http://localhost:8088');");
        tableEnvironment.executeSql("USE CATALOG rocketmq_catalog");

        // 从 rocketmq_source 中获取数据写入到 rocketmq_sink 中
        TableResult tableResult = tableEnvironment.executeSql("INSERT INTO rocketmq_sink /*+ OPTIONS" +
                "('producerGroup'='topic_producer_group') */ select * from rocketmq_source /*+ OPTIONS" +
                "('consumerGroup'='topic_consumer_group') */");
    }
}

启动任务并运行以后,打开 RocketMQ 控制台,往 rocketmq_source 这个 Topic 发送一条消息。

6

然后再查看 rocketmq_sink 的状态,就会发现消息已经通过写入到 rocketmq_sink 中了。

7

点击查看更多技术内容


Flink Forward Asia 2022

img

img

活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算Flink版现开启活动:
99 元试用 实时计算Flink版(包年包月、10CU)即有机会获得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
了解活动详情:https://www.aliyun.com/product/bigdata/sc

image.png

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
9月前
|
存储 监控 数据挖掘
京东物流基于Flink & StarRocks的湖仓建设实践
本文整理自京东物流高级数据开发工程师梁宝彬在Flink Forward Asia 2024的分享,聚焦实时湖仓的探索与建设、应用实践、问题思考及未来展望。内容涵盖京东物流通过Flink和Paimon等技术构建实时湖仓体系的过程,解决复杂业务场景下的数据分析挑战,如多维OLAP分析、大屏监控等。同时,文章详细介绍了基于StarRocks的湖仓一体方案,优化存储成本并提升查询效率,以及存算分离的应用实践。最后,对未来数据服务的发展方向进行了展望,计划推广长周期数据存储服务和原生数据湖建设,进一步提升数据分析能力。
854 1
京东物流基于Flink & StarRocks的湖仓建设实践
|
7月前
|
消息中间件 存储 Kafka
一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
本文详细介绍了分布式消息中间件RocketMQ的核心概念、部署方式及使用方法。RocketMQ由阿里研发并开源,具有高性能、高可靠性和分布式特性,广泛应用于金融、互联网等领域。文章从环境搭建到消息类型的实战(普通消息、延迟消息、顺序消息和事务消息)进行了全面解析,并对比了三种消费者类型(PushConsumer、SimpleConsumer和PullConsumer)的特点与适用场景。最后总结了使用RocketMQ时的关键注意事项,如Topic和Tag的设计、监控告警的重要性以及性能与可靠性的平衡。通过学习本文,读者可掌握RocketMQ的使用精髓并灵活应用于实际项目中。
5440 9
 一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
|
7月前
|
资源调度 Kubernetes 流计算
Flink在B站的大规模云原生实践
本文基于哔哩哔哩资深开发工程师丁国涛在Flink Forward Asia 2024云原生专场的分享,围绕Flink On K8S的实践展开。内容涵盖五个部分:背景介绍、功能及稳定性优化、性能优化、运维优化和未来展望。文章详细分析了从YARN迁移到K8S的优势与挑战,包括资源池统一、环境一致性改进及隔离性提升,并针对镜像优化、Pod异常处理、启动速度优化等问题提出解决方案。此外,还探讨了多机房容灾、负载均衡及潮汐混部等未来发展方向,为Flink云原生化提供了全面的技术参考。
416 9
Flink在B站的大规模云原生实践
|
8月前
|
SQL 存储 NoSQL
Flink x Paimon 在抖音集团生活服务的落地实践
本文整理自抖音集团数据工程师陆魏与流式计算工程冯向宇在Flink Forward Asia 2024的分享,聚焦抖音生活服务业务中的实时数仓技术演变及Paimon湖仓实践。文章分为三部分:背景及现状、Paimon湖仓实践与技术优化。通过引入Paimon,解决了传统实时数仓开发效率低、资源浪费、稳定性差等问题,显著提升了开发运维效率、节省资源并增强了任务稳定性。同时,文中详细探讨了Paimon在维表实践、宽表建设、标签变更检测等场景的应用,并介绍了其核心技术优化与未来规划。
800 10
Flink x Paimon 在抖音集团生活服务的落地实践
|
8月前
|
资源调度 Kubernetes 调度
网易游戏 Flink 云原生实践
本文分享了网易游戏在Flink实时计算领域的资源管理与架构演进经验,从Yarn到K8s云原生,再到混合云的实践历程。文章详细解析了各阶段的技术挑战与解决方案,包括资源隔离、弹性伸缩、自动扩缩容及服务混部等关键能力的实现。通过混合云架构,网易游戏显著提升了资源利用率,降低了30%机器成本,小作业计算成本下降40%,并为未来性能优化、流批一体及智能运维奠定了基础。
470 9
网易游戏 Flink 云原生实践
|
10月前
|
存储 运维 监控
阿里妈妈基于 Flink+Paimon 的 Lakehouse 应用实践
本文总结了阿里妈妈数据技术专家陈亮在Flink Forward Asia 2024大会上的分享,围绕广告业务背景、架构设计及湖仓方案演进展开。内容涵盖广告生态运作、实时数仓挑战与优化,以及基于Paimon的湖仓方案优势。通过分层设计与技术优化,实现业务交付周期缩短30%以上,资源开销降低40%,并大幅提升系统稳定性和运营效率。文章还介绍了阿里云实时计算Flink版的免费试用活动,助力企业探索实时计算与湖仓一体化解决方案。
1082 3
阿里妈妈基于 Flink+Paimon 的 Lakehouse 应用实践
|
10月前
|
存储 SQL Java
Flink CDC + Hologres高性能数据同步优化实践
本文整理自阿里云高级技术专家胡一博老师在Flink Forward Asia 2024数据集成(二)专场的分享,主要内容包括:1. Hologres介绍:实时数据仓库,支持毫秒级写入和高QPS查询;2. 写入优化:通过改进缓冲队列、连接池和COPY模式提高吞吐量和降低延迟;3. 消费优化:优化离线场景和分区表的消费逻辑,提升性能和资源利用率;4. 未来展望:进一步简化用户操作,支持更多DDL操作及全增量消费。Hologres 3.0全新升级为一体化实时湖仓平台,提供多项新功能并降低使用成本。
745 1
Flink CDC + Hologres高性能数据同步优化实践
|
10月前
|
SQL 存储 调度
基于 Flink 进行增量批计算的探索与实践
基于 Flink 进行增量批计算的探索与实践
250 1
基于 Flink 进行增量批计算的探索与实践
|
10月前
|
存储 运维 BI
万字长文带你深入广告场景Paimon+Flink全链路探索与实践
本文将结合实时、离线数据研发痛点和当下Paimon的特性,以实例呈现低门槛、低成本、分钟级延迟的流批一体化方案,点击文章阅读详细内容~
|
10月前
|
SQL 弹性计算 DataWorks
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
483 6

相关产品

  • 实时计算 Flink版