flink-pulsar-connector

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: flink-pulsar-connector

flink-pulsar-connector使用

1:添加依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-pulsar_2.11</artifactId>
    <version>1.14.3</version>
</dependency>

2:source使用demo

PulsarSource<String> pulsarSource = PulsarSource.builder()
    .setServiceUrl(serviceUrl)
    .setAdminUrl(adminUrl)
    .setStartCursor(StartCursor.earliest())
    .setTopics("my-topic")
    .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema()))
    .setSubscriptionName("my-subscription")
    .setSubscriptionType(SubscriptionType.Exclusive)
    .build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");

如果使用构造类构造 Pulsar 数据源,一定要提供下面几个属性:

  • Pulsar 数据消费的地址,使用 setServiceUrl(String) 方法提供
  • Pulsar HTTP 管理地址,使用 setAdminUrl(String) 方法提供
  • Pulsar 订阅名称,使用 setSubscriptionName(String) 方法提供
  • 需要消费的 topic 或者是 topic 下面的分区,详见指定消费的 Topic 或者 Topic 分区
  • 解码 Pulsar 消息的反序列化器,详见反序列化器

3:指定消费的 Topic 或者 Topic 分区

Pulsar 数据源提供了两种订阅 topic 或 topic 分区的方式。

Topic 列表,从这个 Topic 的所有分区上消费消息,例如:

PulsarSource.builder().setTopics("some-topic1", "some-topic2")
// 从 topic "topic-a" 的 0 和 1 分区上消费
PulsarSource.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")

Topic 正则,连接器使用给定的正则表达式匹配出所有合规的 topic,例如:

PulsarSource.builder().setTopicPattern("topic-*")

Pulsar topic 有 persistentnon-persistent 两种类型,使用正则表达式消费数据的时候,连接器会尝试从正则表达式里面解析出消息的类型。例如:PulsarSource.builder().setTopicPattern("non-persistent://my-topic*") 会解析出 non-persistent 这个 topic 类型。如果用户使用 topic 名称简写的方式,连接器会使用默认的消息类型 persistent

如果想用正则去消费 persistentnon-persistent 类型的 topic,需要使用 RegexSubscriptionMode 定义 topic 类型,例如:setTopicPattern("topic-*", RegexSubscriptionMode.AllTopics)

4:反序列化器

反序列化器用于解析 Pulsar 消息,连接器使用 PulsarDeserializationSchema 来定义反序列化器。用户可以在 builder 类中使用 setDeserializationSchema(PulsarDeserializationSchema) 方法配置反序列化器,它会解析 Pulsar 的 Message<byte[]> 实例。

如果用户只关心消息体的二进制字节流,并不需要其他属性来解析数据。可以直接使用预定义的 PulsarDeserializationSchema。Pulsar 连接器里面提供了 3 种预定义好的反序列化器。

使用 Pulsar 的 Schema 解析消息。

// 基础数据类型
PulsarDeserializationSchema.pulsarSchema(Schema)
// 结构类型 (JSON, Protobuf, Avro, etc.)
PulsarDeserializationSchema.pulsarSchema(Schema, Class)
// 键值对类型
PulsarDeserializationSchema.pulsarSchema(Schema, Class, Class)

使用 Flink 的 DeserializationSchema 解析消息。

PulsarDeserializationSchema.flinkSchema(DeserializationSchema)

使用 Flink 的 TypeInformation 解析消息。

PulsarDeserializationSchema.flinkTypeInfo(TypeInformation, ExecutionConfig)


5:Pulsar 订阅

订阅是命名好的配置规则,指导消息如何投递给消费者。连接器需要提供一个独立的订阅名称,支持 Pulsar 的四种订阅模式:

  • exclusive(独占)
  • shared(共享)
  • failover(灾备)
  • key_shared(key 共享)

当前 Pulsar 连接器里面,独占灾备 的实现没有区别,如果 Flink 的一个 reader 挂了,连接器会把所有未消费的数据交给其他的 reader 来消费数据。

默认情况下,如果没有指定订阅类型,连接器使用共享订阅类型(SubscriptionType.Shared)。

默认情况下,如果没有指定订阅类型,连接器使用共享订阅类型(SubscriptionType.Shared)。

// 名为 "my-shared" 的共享订阅
PulsarSource.builder().setSubscriptionName("my-shared")
// 名为 "my-exclusive" 的独占订阅
PulsarSource.builder().setSubscriptionName("my-exclusive").setSubscriptionType(SubscriptionType.Exclusive)

如果想在 Pulsar 连接器里面使用 key 共享 订阅,需要提供 RangeGenerator 实例。RangeGenerator 会生成一组消息 key 的 hash 范围,连接器会基于给定的范围来消费数据。

Pulsar 连接器也提供了一个名为 UniformRangeGenerator 的默认实现,它会基于 flink 数据源的并行度将 hash 范围均分。

基于 flink 数据源的并行度将 hash 范围均分。

6:起始位置消费

连接器使用 setStartCursor(StartCursor) 方法给定开始消费的位置。内置的消费位置有:

  • 从 topic 里面最早的一条消息开始消费。
StartCursor.earliest()
  • 从 topic 里面最新的一条消息开始消费。
StartCursor.latest()
  • 从给定的消息开始消费。
StartCursor.fromMessageId(MessageId)
  • 与前者不同的是,给定的消息可以跳过,再进行消费。
StartCursor.fromMessageId(MessageId, boolean)
  • 从给定的消息时间开始消费。
StartCursor.fromMessageTime(long)
每条消息都有一个固定的序列号,这个序列号在 Pulsar 上有序排列,其包含了 ledger、entry、partition 等原始信息,用于在 Pulsar 底层存储上查找到具体的消息。Pulsar 称这个序列号为 MessageId,用户可以使用 DefaultImplementation.newMessageId(long ledgerId, long entryId, int partitionIndex) 创建它。

7:Boundedness

Pulsar 连接器同时支持流式和批的消费方式,默认情况下,连接器使用流的方式消费数据。除非任务失败或者被取消,否则连接器将持续消费数据。用户可以使用 setBoundedStopCursor(StopCursor) 给定停止消费的位置,这种情况下连接器会使用批的方式进行消费。当所有 topic 分区都消费到了停止位置,Flink 任务就会结束。

使用流的方式一样可以给定停止位置,使用 setUnboundedStopCursor(StopCursor) 方法即可。

内置的停止位置如下:

  • 永不停止。
StopCursor.never()
  • 停止于 Pulsar 启动时 topic 里面最新的那条数据。
StopCursor.latest()
  • 停止于某条消息,结果里不包含此消息。
StopCursor.atMessageId(MessageId)
  • 停止于某条消息之后,结果里包含此消息。
StopCursor.afterMessageId(MessageId)
  • 停止于某个给定的消息时间戳。
StopCursor.atEventTime(long)

8:动态分区发现

为了能在启动 Flink 任务之后还能发现在 Pulsar 上扩容的分区或者是新创建的 topic,连接器提供了动态分区发现机制。该机制不需要重启 Flink 任务。对选项 PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS 设置一个正整数即可启用。

为了能在启动 Flink 任务之后还能发现在 Pulsar 上扩容的分区或者是新创建的 topic,连接器提供了动态分区发现机制。该机制不需要重启 Flink 任务。对选项 PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS 设置一个正整数即可启用。

默认情况下,Pulsar 启用动态分区发现,查询间隔为 30 秒。用户可以给定一个负数,将该功能禁用。如果使用批的方式消费数据,将无法启用该功能。

9:event_time and watermark

默认情况下,连接器使用 Pulsar 的 Message<byte[]> 里面的时间作为解析结果的时间戳。用户可以使用 WatermarkStrategy 来自行解析出想要的消息时间,并向下游传递对应的水位线。

env.fromSource(pulsarSource, new CustomWatermarkStrategy(), "Pulsar Source With Custom Watermark Strategy")

如何定义 WatermarkStrategy:

生成 Watermark | Apache Flink

10: pulsar  sink demo

PulsarSerializationSchema<Person> pulsarSerialization = new PulsarSerializationSchemaWrapper.Builder<>(JsonSer.of(Person.class))
    .usePojoMode(Person. class, RecordSchemaType.JSON)
    .setTopicExtractor(person -> null)
    .build();
FlinkPulsarSink<Person> sink = new FlinkPulsarSink(
    serviceUrl,
    adminUrl,
    Optional.of(topic), // mandatory target topic or use `Optional.empty()` if sink to different topics for each record
    props,
    pulsarSerialization
);
stream.addSink(sink);

11:读取kafka写入pulsar

1:序列化与反序列化

import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
public class ByteArraySchema extends AbstractDeserializationSchema<byte[]> implements SerializationSchema<byte[]> {
    @Override
    public byte[] deserialize(byte[] bytes) {
        return bytes;
    }
    @Override
    public byte[] serialize(byte[] bytes) {
        return bytes;
    }
}

2:主类

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarProducer;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import java.util.Properties;
public class KafkaToPulsarCommon {
    public static StreamExecutionEnvironment getEnv() {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 关闭日志
        env.getConfig().disableSysoutLogging();
        //确保一次语义
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, Time.seconds(60)));
        // 设置checkpoint时间
        env.enableCheckpointing(300000, CheckpointingMode.EXACTLY_ONCE);
        // 指定checkpoint执行的超时时间
        env.getCheckpointConfig().setCheckpointTimeout(Time.seconds(300).toMilliseconds());
        // checkpoint完成之后最小等多久可以触发另一个checkpoint
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(Time.seconds(30).toMilliseconds());
        // cancel后保留checkpoint
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        return env;
    }
    public static Properties kafkaProperties(String broker, String groupid) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", broker);
        props.setProperty("flink.partition-discovery.interval-millis", String.valueOf(5 * 60 * 1000));
        props.setProperty("group.id", groupid);
        return props;
    }
    public static void main(String[] args) throws Exception {
        final ParameterTool params = ParameterTool.fromArgs(args);
        final String topic = params.get("topic", "push_single_task");
        final String slb = params.get("clusterAddr", "localhost:9092");
        final String groupid = params.get("groupid", "kafka2pulsar_test");
        final String serviceUrl = params.get("serviceUrl", "pulsar://localhost:6650");
        final String outputTopic = params.get("outputTopic", "persistent://test/ns1/river_test1");
        final int sinkP = params.getInt("sinkP", 1);
        final String offset = params.get("offset", "groupid");
        final StreamExecutionEnvironment env = getEnv();
        FlinkKafkaConsumer<byte[]> consumer = new FlinkKafkaConsumer<>(topic, new KafkaDeserializationSchemaWrapper<>(new ByteArraySchema()), kafkaProperties(slb, groupid));
        if ("earliest".equals(offset)) {
            consumer.setStartFromEarliest();
        }
        if ("latest".equals(offset)) {
            consumer.setStartFromLatest();
        }
        DataStream<byte[]> stream = env.addSource(consumer);
        ClientConfigurationData clientConf = new ClientConfigurationData();
        clientConf.setServiceUrl(serviceUrl);
        clientConf.setAuthentication(new AuthenticationDisabled());
        ProducerConfigurationData producerConf = new ProducerConfigurationData();
        producerConf.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
        producerConf.setTopicName(outputTopic);
        producerConf.setBlockIfQueueFull(true);
        stream.addSink(new FlinkPulsarProducer<>(
                clientConf,
                producerConf,
                new ByteArraySchema(),
                null,
                null
        )).setParallelism(sinkP);
        env.execute("kafka2pulsar");
    }
}



相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
5月前
|
消息中间件 SQL Java
Flink自定义Connector
Flink自定义Connector
107 0
|
5月前
|
存储 SQL API
Flink教程(23)- Flink高级特性(Streaming File Sink)
Flink教程(23)- Flink高级特性(Streaming File Sink)
123 0
|
1月前
|
监控 Apache 开发工具
Apache Flink 1.12.2集成Hudi 0.9.0运行指南
Apache Flink 1.12.2集成Hudi 0.9.0运行指南
67 0
|
3月前
|
流计算
在Flink中,如果一个任务的输入依赖于前面两个任务的输出,可以使用`connector`来连接这三个任务
【1月更文挑战第19天】【1月更文挑战第93篇】在Flink中,如果一个任务的输入依赖于前面两个任务的输出,可以使用`connector`来连接这三个任务
25 2
|
3月前
|
关系型数据库 Apache DataX
BDCC - 数据集成领域的主流中间件_ Apache SeaTunnel vs Flink CDC vs DataX vs Apache Sqoop vs Apache Flume
BDCC - 数据集成领域的主流中间件_ Apache SeaTunnel vs Flink CDC vs DataX vs Apache Sqoop vs Apache Flume
186 0
|
4月前
|
流计算
Flink 写es
Flink 写es
38 0
|
4月前
|
消息中间件 Apache 流计算
Apache Flink的RabbitMQ connector使用的是`org.apache.flink:flink-sql-connector-rabbitmq`库
Apache Flink的RabbitMQ connector使用的是`org.apache.flink:flink-sql-connector-rabbitmq`库
70 2
|
消息中间件 SQL 存储
Flink1.9新特性解读:通过Flink SQL查询Pulsar
Flink1.9新特性解读:通过Flink SQL查询Pulsar
377 0
Flink1.9新特性解读:通过Flink SQL查询Pulsar
|
消息中间件 NoSQL 关系型数据库
Flink常用Sink(elasticsearch(es)Sink、RedisSink、KafkaSink、MysqlSink、FileSink)
Flink常用Sink(elasticsearch(es)Sink、RedisSink、KafkaSink、MysqlSink、FileSink)
Flink常用Sink(elasticsearch(es)Sink、RedisSink、KafkaSink、MysqlSink、FileSink)
|
消息中间件 缓存 资源调度
Flink 1.14.0 全新的 Kafka Connector
Flink 1.14.0 全新的 Kafka Connector