Flink1.9新特性解读:通过Flink SQL查询Pulsar

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink1.9新特性解读:通过Flink SQL查询Pulsar

Flink1.9新增了很多的功能,其中一个对我们非常实用的特性通过Flink SQL查询Pulsar给大家介绍。

我们以前可能遇到过这样的问题。通过Spark读取Kafka,但是如果我们想查询kafka困难度有点大的,当然当前Spark也已经实现了可以通过Spark sql来查询kafka的数据。那么Flink 1.9又是如何实现通过Flink sql来查询Pulsar。

可能我们大多对kafka的比较熟悉的,但是对于Pulsar或许只是听说过,所以这里将Pulsar介绍下。

Pulsar简介


Pulsar由雅虎开发并开源的一个多租户、高可用,服务间的消息系统,目前是Apache软件基金会的孵化器项目。

Apache Pulsar是一个开源的分布式pub-sub消息系统,用于服务器到服务器消息传递的多租户,高性能解决方案,包括多个功能,例如Pulsar实例中对多个集群的本机支持,跨集群的消息的无缝geo-replication,非常低的发布和端到端 - 延迟,超过一百万个主题的无缝可扩展性,以及由Apache BookKeeper等提供的持久消息存储保证消息传递。

Pulsar已经在一些名企应用,比如腾讯用它类计费。而且它的扩展性是非常优秀的。下面是实际使用用户对他的认识。

和Puslar的原创团队做过一次挺深入交流,说说我的想法。我觉得Puslar是一个非常优秀的开源系统,它的整体框架偏向于HBase的设计,在其上实现了流数据的处理和服务。从与Kafka的对比上说,我个人对Kafka还是有比较深入的理解,Kafka也是很优秀的框架,给人一种非常纯粹和简洁的感觉。不过Puslar确实可以解决一些Kafka由于体系设计无法避免的痛点,最让我印象深刻的是Puslar的横向扩展能力要比Kafka好,因为Kafka的topic的性能扩展受限于partitions的个数,而Puslar是对数据分片,容易扩展。这对我们这种碰到大赛事需要扩展数倍系统吞吐能力的情景是很有用的。现在Puslar的框架都好了,缺的是整个生态,如监控,运维,管理,和其他平台和框架的对接,云服务的集成,丰富的客户端等等。

使用Flink sql 查询Pulsar流

Flink以前的版本并未真正实现查询Pulsar流,在Flink1.9版本中,由于阿里巴巴Blink对Flink存储库的贡献,使与Pulsar的集成更加强大。并且Flink1.9.0与Pulsar整合实现exactly-once流source和at-least-once流sink.

Pulsar作为Flink Catalog

通过集成可以将Pulsar注册为Flink Catalog【目录】,从而使在Pulsar流之上运行Flink查询只需几个命令即可。

这里补充下什么是Flink CatalogCatalog:所有对数据库和表的元数据信息都存放在Flink CataLog内部目录结构中,其存放了Flink内部所有与Table相关的元数据信息,包括表结构信息/数据源信息等。

Pulsar特点:


1.Pulsar中的数据schema与每个主题(topic)都相关联

2.生产者和消费者都发送带有预定义schema信息的数据

3.在兼容性检查中管理schema多版本化和演进

4.生产者和消费者是以POJO类方式发送和接受消息

下面是使用Struct模式创建生产者并发送消息

// Create producer with Struct schema and send messages
Producer<User> producer = client.newProducer(Schema.AVRO(User.class)).create();
producer.newMessage()
  .value(User.builder()
    .userName(“pulsar-user”)
    .userId(1L)
    .build())
  .send();

使用Struct模式创建消费者并接收消息

// Create consumer with Struct schema and receive messages
Consumer<User> consumer = client.newCOnsumer(Schema.AVRO(User.class)).create();
consumer.receive();

Pulsar与Flink schema转换

Pulsar不仅能够处理和存储schema信息,而且还能够处理任何架构演变(必要时)。Pulsar将有效地管理broker中的任何schema 演变,在执行任何必要的兼容性检查的同时跟踪schema 的所有不同版本。

此外,当消息在生产者发布时,Pulsar将使用schema 版本标记每个消息,作为每个消息元数据的一部分。在消费者方面,当收到消息并反序列化元数据时,Pulsar将检查与此消息关联的schema 版本,并从broker中获取相应的schema信息。结果,当Pulsar与Flink应用程序集成时,它使用预先存在的schema信息,并将带有schema信息的单个消息映射到Flink的类型系统中的另一行。

对于Flink不直接与模式(schema)交互或不使用原始模式(例如,使用主题存储字符串或长数字)的情况,Pulsar会将消息有效负载转换为Flink行,称为“值”或-对于结构化模式类型(例如JSON和AVRO),Pulsar将从模式信息中提取各个字段,并将这些字段映射到Flink的类型系统。最后,与每个消息关联的所有元数据信息(例如消息键,主题,发布时间或事件时间)将转换为Flink行中的元数据字段。下面我们提供原始模式和结构化模式类型的示例,以及如何将它们从Pulsar主题(topic)转换为Flink的类型系统。

19e4001252cc30b51d1aa842a7eed59f.jpg

将所有schema信息映射到Flink的类型系统后,可以根据指定的schema信息开始在Flink中构建Pulsar源,接收器(sink)或目录(catalog ),如下所示:

Flink & Pulsar: 从Pulsar读取数据

为流查询创建Pulsar源

val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("service.url", "pulsar://...")
props.setProperty("admin.url", "http://...")
props.setProperty("partitionDiscoveryIntervalMillis", "5000")
props.setProperty("startingOffsets", "earliest")
props.setProperty("topic", "test-source-topic")
val source = new FlinkPulsarSource(props)
// you don't need to provide a type information to addSource since FlinkPulsarSource is ResultTypeQueryable
val dataStream = env.addSource(source)(null)
// chain operations on dataStream of Row and sink the output
// end method chaining
env.execute()

在Pulsar注册topics 作为streaming表

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val prop = new Properties()
prop.setProperty("service.url", serviceUrl)
prop.setProperty("admin.url", adminUrl)
prop.setProperty("flushOnCheckpoint", "true")
prop.setProperty("failOnWrite", "true")
props.setProperty("topic", "test-sink-topic")
tEnv
  .connect(new Pulsar().properties(props))
  .inAppendMode()
  .registerTableSource("sink-table")
val sql = "INSERT INTO sink-table ....."
tEnv.sqlUpdate(sql)
env.execute()

Flink & Pulsar: 写数据到Pulsar

为流查询创建Pulsar sink

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = .....
val prop = new Properties()
prop.setProperty("service.url", serviceUrl)
prop.setProperty("admin.url", adminUrl)
prop.setProperty("flushOnCheckpoint", "true")
prop.setProperty("failOnWrite", "true")
props.setProperty("topic", "test-sink-topic")
stream.addSink(new FlinkPulsarSink(prop, DummyTopicKeyExtractor))
env.execute()

写streaming表到Pulsar

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val prop = new Properties()
prop.setProperty("service.url", serviceUrl)
prop.setProperty("admin.url", adminUrl)
prop.setProperty("flushOnCheckpoint", "true")
prop.setProperty("failOnWrite", "true")
props.setProperty("topic", "test-sink-topic")
tEnv
  .connect(new Pulsar().properties(props))
  .inAppendMode()
  .registerTableSource("sink-table")
val sql = "INSERT INTO sink-table ....."
tEnv.sqlUpdate(sql)
env.execute()

对于Flink开发人员只需要指定Flink如何连接到Pulsar集群,将Pulsar集群注册为Flink中的源,接收器或流表,不必担心任何schema注册表或序列化/反序列化操作。这可以简化处理和查询数据的方式。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
26天前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
90 15
|
5天前
|
SQL 资源调度 数据库
深入探究SQL查询语句执行过程
深入探究SQL查询语句执行过程
16 2
|
5天前
|
SQL Java
使用java在未知表字段情况下通过sql查询信息
使用java在未知表字段情况下通过sql查询信息
12 1
|
29天前
|
SQL 存储 缓存
高基数 GroupBy 在 SLS SQL 中的查询加速
本文详细介绍了SLS中的高基数GroupBy查询加速技术。
|
28天前
|
SQL 运维 程序员
一个功能丰富的SQL审核查询平台
一个功能丰富的SQL审核查询平台
|
28天前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【9月更文挑战第7天】在大数据时代,数据管理和处理尤为重要,尤其在保障数据安全与隐私方面。本文探讨如何利用Flink SQL实现数据脱敏,为实时数据处理提供有效的隐私保护方案。数据脱敏涉及在处理、存储或传输前对敏感数据进行加密、遮蔽或替换,以遵守数据保护法规(如GDPR)。Flink SQL通过内置函数和表达式支持这一过程。
56 2
|
28天前
|
SQL 大数据 数据处理
奇迹降临!解锁 Flink SQL 简单高效的终极秘籍,开启数据处理的传奇之旅!
【9月更文挑战第7天】在大数据处理领域,Flink SQL 因其强大功能与简洁语法成为开发者首选。本文分享了编写高效 Flink SQL 的实用技巧:理解数据特征及业务需求;灵活运用窗口函数(如 TUMBLE 和 HOP);优化连接操作,优先采用等值连接;合理选择数据类型以减少计算资源消耗。结合实际案例(如实时电商数据分析),并通过定期性能测试与调优,助力开发者在大数据处理中更得心应手,挖掘更多价值信息。
31 1
|
10天前
|
SQL
SQL: 巧妙使用CASE WHEN实现查询
文章演示了如何利用SQL中的CASE WHEN语句来有效地进行条件性聚合查询,通过具体示例展示了CASE WHEN在统计分析中的应用技巧。
25 0
|
11天前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
下一篇
无影云桌面