Flink1.9新增了很多的功能,其中一个对我们非常实用的特性通过Flink SQL查询Pulsar给大家介绍。
我们以前可能遇到过这样的问题。通过Spark读取Kafka,但是如果我们想查询kafka困难度有点大的,当然当前Spark也已经实现了可以通过Spark sql来查询kafka的数据。那么Flink 1.9又是如何实现通过Flink sql来查询Pulsar。
可能我们大多对kafka的比较熟悉的,但是对于Pulsar或许只是听说过,所以这里将Pulsar介绍下。
Pulsar简介
Pulsar由雅虎开发并开源的一个多租户、高可用,服务间的消息系统,目前是Apache软件基金会的孵化器项目。
Apache Pulsar是一个开源的分布式pub-sub消息系统,用于服务器到服务器消息传递的多租户,高性能解决方案,包括多个功能,例如Pulsar实例中对多个集群的本机支持,跨集群的消息的无缝geo-replication,非常低的发布和端到端 - 延迟,超过一百万个主题的无缝可扩展性,以及由Apache BookKeeper等提供的持久消息存储保证消息传递。
Pulsar已经在一些名企应用,比如腾讯用它类计费。而且它的扩展性是非常优秀的。下面是实际使用用户对他的认识。
和Puslar的原创团队做过一次挺深入交流,说说我的想法。我觉得Puslar是一个非常优秀的开源系统,它的整体框架偏向于HBase的设计,在其上实现了流数据的处理和服务。从与Kafka的对比上说,我个人对Kafka还是有比较深入的理解,Kafka也是很优秀的框架,给人一种非常纯粹和简洁的感觉。不过Puslar确实可以解决一些Kafka由于体系设计无法避免的痛点,最让我印象深刻的是Puslar的横向扩展能力要比Kafka好,因为Kafka的topic的性能扩展受限于partitions的个数,而Puslar是对数据分片,容易扩展。这对我们这种碰到大赛事需要扩展数倍系统吞吐能力的情景是很有用的。现在Puslar的框架都好了,缺的是整个生态,如监控,运维,管理,和其他平台和框架的对接,云服务的集成,丰富的客户端等等。
使用Flink sql 查询Pulsar流
Flink以前的版本并未真正实现查询Pulsar流,在Flink1.9版本中,由于阿里巴巴Blink对Flink存储库的贡献,使与Pulsar的集成更加强大。并且Flink1.9.0与Pulsar整合实现exactly-once流source和at-least-once流sink.
Pulsar作为Flink Catalog
通过集成可以将Pulsar注册为Flink Catalog【目录】,从而使在Pulsar流之上运行Flink查询只需几个命令即可。
这里补充下什么是Flink CatalogCatalog:所有对数据库和表的元数据信息都存放在Flink CataLog内部目录结构中,其存放了Flink内部所有与Table相关的元数据信息,包括表结构信息/数据源信息等。
Pulsar特点:
1.Pulsar中的数据schema与每个主题(topic)都相关联
2.生产者和消费者都发送带有预定义schema信息的数据
3.在兼容性检查中管理schema多版本化和演进
4.生产者和消费者是以POJO类方式发送和接受消息
下面是使用Struct模式创建生产者并发送消息
// Create producer with Struct schema and send messages Producer<User> producer = client.newProducer(Schema.AVRO(User.class)).create(); producer.newMessage() .value(User.builder() .userName(“pulsar-user”) .userId(1L) .build()) .send();
使用Struct模式创建消费者并接收消息
// Create consumer with Struct schema and receive messages Consumer<User> consumer = client.newCOnsumer(Schema.AVRO(User.class)).create(); consumer.receive();
Pulsar与Flink schema转换
Pulsar不仅能够处理和存储schema信息,而且还能够处理任何架构演变(必要时)。Pulsar将有效地管理broker中的任何schema 演变,在执行任何必要的兼容性检查的同时跟踪schema 的所有不同版本。
此外,当消息在生产者发布时,Pulsar将使用schema 版本标记每个消息,作为每个消息元数据的一部分。在消费者方面,当收到消息并反序列化元数据时,Pulsar将检查与此消息关联的schema 版本,并从broker中获取相应的schema信息。结果,当Pulsar与Flink应用程序集成时,它使用预先存在的schema信息,并将带有schema信息的单个消息映射到Flink的类型系统中的另一行。
对于Flink不直接与模式(schema)交互或不使用原始模式(例如,使用主题存储字符串或长数字)的情况,Pulsar会将消息有效负载转换为Flink行,称为“值”或-对于结构化模式类型(例如JSON和AVRO),Pulsar将从模式信息中提取各个字段,并将这些字段映射到Flink的类型系统。最后,与每个消息关联的所有元数据信息(例如消息键,主题,发布时间或事件时间)将转换为Flink行中的元数据字段。下面我们提供原始模式和结构化模式类型的示例,以及如何将它们从Pulsar主题(topic)转换为Flink的类型系统。
将所有schema信息映射到Flink的类型系统后,可以根据指定的schema信息开始在Flink中构建Pulsar源,接收器(sink)或目录(catalog ),如下所示:
Flink & Pulsar: 从Pulsar读取数据
为流查询创建Pulsar源
val env = StreamExecutionEnvironment.getExecutionEnvironment val props = new Properties() props.setProperty("service.url", "pulsar://...") props.setProperty("admin.url", "http://...") props.setProperty("partitionDiscoveryIntervalMillis", "5000") props.setProperty("startingOffsets", "earliest") props.setProperty("topic", "test-source-topic") val source = new FlinkPulsarSource(props) // you don't need to provide a type information to addSource since FlinkPulsarSource is ResultTypeQueryable val dataStream = env.addSource(source)(null) // chain operations on dataStream of Row and sink the output // end method chaining env.execute()
在Pulsar注册topics 作为streaming表
val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) val prop = new Properties() prop.setProperty("service.url", serviceUrl) prop.setProperty("admin.url", adminUrl) prop.setProperty("flushOnCheckpoint", "true") prop.setProperty("failOnWrite", "true") props.setProperty("topic", "test-sink-topic") tEnv .connect(new Pulsar().properties(props)) .inAppendMode() .registerTableSource("sink-table") val sql = "INSERT INTO sink-table ....." tEnv.sqlUpdate(sql) env.execute()
Flink & Pulsar: 写数据到Pulsar
为流查询创建Pulsar sink
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = ..... val prop = new Properties() prop.setProperty("service.url", serviceUrl) prop.setProperty("admin.url", adminUrl) prop.setProperty("flushOnCheckpoint", "true") prop.setProperty("failOnWrite", "true") props.setProperty("topic", "test-sink-topic") stream.addSink(new FlinkPulsarSink(prop, DummyTopicKeyExtractor)) env.execute()
写streaming表到Pulsar
val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) val prop = new Properties() prop.setProperty("service.url", serviceUrl) prop.setProperty("admin.url", adminUrl) prop.setProperty("flushOnCheckpoint", "true") prop.setProperty("failOnWrite", "true") props.setProperty("topic", "test-sink-topic") tEnv .connect(new Pulsar().properties(props)) .inAppendMode() .registerTableSource("sink-table") val sql = "INSERT INTO sink-table ....." tEnv.sqlUpdate(sql) env.execute()
对于Flink开发人员只需要指定Flink如何连接到Pulsar集群,将Pulsar集群注册为Flink中的源,接收器或流表,不必担心任何schema注册表或序列化/反序列化操作。这可以简化处理和查询数据的方式。