flink-pulsar-connector使用
1:添加依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-pulsar_2.11</artifactId> <version>1.14.3</version> </dependency>
2:source使用demo
PulsarSource<String> pulsarSource = PulsarSource.builder() .setServiceUrl(serviceUrl) .setAdminUrl(adminUrl) .setStartCursor(StartCursor.earliest()) .setTopics("my-topic") .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema())) .setSubscriptionName("my-subscription") .setSubscriptionType(SubscriptionType.Exclusive) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");
如果使用构造类构造 Pulsar 数据源,一定要提供下面几个属性:
- Pulsar 数据消费的地址,使用
setServiceUrl(String)
方法提供 - Pulsar HTTP 管理地址,使用
setAdminUrl(String)
方法提供 - Pulsar 订阅名称,使用
setSubscriptionName(String)
方法提供 - 需要消费的 topic 或者是 topic 下面的分区,详见指定消费的 Topic 或者 Topic 分区
- 解码 Pulsar 消息的反序列化器,详见反序列化器
3:指定消费的 Topic 或者 Topic 分区
Pulsar 数据源提供了两种订阅 topic 或 topic 分区的方式。
Topic 列表,从这个 Topic 的所有分区上消费消息,例如:
PulsarSource.builder().setTopics("some-topic1", "some-topic2") // 从 topic "topic-a" 的 0 和 1 分区上消费 PulsarSource.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
Topic 正则,连接器使用给定的正则表达式匹配出所有合规的 topic,例如:
PulsarSource.builder().setTopicPattern("topic-*")
Pulsar topic 有 persistent
、non-persistent
两种类型,使用正则表达式消费数据的时候,连接器会尝试从正则表达式里面解析出消息的类型。例如:PulsarSource.builder().setTopicPattern("non-persistent://my-topic*")
会解析出 non-persistent
这个 topic 类型。如果用户使用 topic 名称简写的方式,连接器会使用默认的消息类型 persistent
。
如果想用正则去消费 persistent
和 non-persistent
类型的 topic,需要使用 RegexSubscriptionMode
定义 topic 类型,例如:setTopicPattern("topic-*", RegexSubscriptionMode.AllTopics)
。
4:反序列化器
反序列化器用于解析 Pulsar 消息,连接器使用 PulsarDeserializationSchema
来定义反序列化器。用户可以在 builder 类中使用 setDeserializationSchema(PulsarDeserializationSchema)
方法配置反序列化器,它会解析 Pulsar 的 Message<byte[]>
实例。
如果用户只关心消息体的二进制字节流,并不需要其他属性来解析数据。可以直接使用预定义的 PulsarDeserializationSchema
。Pulsar 连接器里面提供了 3 种预定义好的反序列化器。
使用 Pulsar 的 Schema 解析消息。
// 基础数据类型 PulsarDeserializationSchema.pulsarSchema(Schema) // 结构类型 (JSON, Protobuf, Avro, etc.) PulsarDeserializationSchema.pulsarSchema(Schema, Class) // 键值对类型 PulsarDeserializationSchema.pulsarSchema(Schema, Class, Class)
使用 Flink 的 DeserializationSchema 解析消息。
PulsarDeserializationSchema.flinkSchema(DeserializationSchema)
使用 Flink 的 TypeInformation 解析消息。
PulsarDeserializationSchema.flinkTypeInfo(TypeInformation, ExecutionConfig)
5:Pulsar 订阅
订阅是命名好的配置规则,指导消息如何投递给消费者。连接器需要提供一个独立的订阅名称,支持 Pulsar 的四种订阅模式:
- exclusive(独占)
- shared(共享)
- failover(灾备)
- key_shared(key 共享)
当前 Pulsar 连接器里面,独占
和 灾备
的实现没有区别,如果 Flink 的一个 reader 挂了,连接器会把所有未消费的数据交给其他的 reader 来消费数据。
默认情况下,如果没有指定订阅类型,连接器使用共享订阅类型(SubscriptionType.Shared
)。
默认情况下,如果没有指定订阅类型,连接器使用共享订阅类型(SubscriptionType.Shared
)。
// 名为 "my-shared" 的共享订阅 PulsarSource.builder().setSubscriptionName("my-shared") // 名为 "my-exclusive" 的独占订阅 PulsarSource.builder().setSubscriptionName("my-exclusive").setSubscriptionType(SubscriptionType.Exclusive)
如果想在 Pulsar 连接器里面使用 key 共享
订阅,需要提供 RangeGenerator
实例。RangeGenerator
会生成一组消息 key 的 hash 范围,连接器会基于给定的范围来消费数据。
Pulsar 连接器也提供了一个名为 UniformRangeGenerator
的默认实现,它会基于 flink 数据源的并行度将 hash 范围均分。
基于 flink 数据源的并行度将 hash 范围均分。
6:起始位置消费
连接器使用 setStartCursor(StartCursor)
方法给定开始消费的位置。内置的消费位置有:
- 从 topic 里面最早的一条消息开始消费。
StartCursor.earliest()
- 从 topic 里面最新的一条消息开始消费。
StartCursor.latest()
- 从给定的消息开始消费。
StartCursor.fromMessageId(MessageId)
- 与前者不同的是,给定的消息可以跳过,再进行消费。
StartCursor.fromMessageId(MessageId, boolean)
- 从给定的消息时间开始消费。
StartCursor.fromMessageTime(long)
每条消息都有一个固定的序列号,这个序列号在 Pulsar 上有序排列,其包含了 ledger、entry、partition 等原始信息,用于在 Pulsar 底层存储上查找到具体的消息。Pulsar 称这个序列号为MessageId
,用户可以使用DefaultImplementation.newMessageId(long ledgerId, long entryId, int partitionIndex)
创建它。
7:Boundedness
Pulsar 连接器同时支持流式和批的消费方式,默认情况下,连接器使用流的方式消费数据。除非任务失败或者被取消,否则连接器将持续消费数据。用户可以使用 setBoundedStopCursor(StopCursor)
给定停止消费的位置,这种情况下连接器会使用批的方式进行消费。当所有 topic 分区都消费到了停止位置,Flink 任务就会结束。
使用流的方式一样可以给定停止位置,使用 setUnboundedStopCursor(StopCursor)
方法即可。
内置的停止位置如下:
- 永不停止。
StopCursor.never()
- 停止于 Pulsar 启动时 topic 里面最新的那条数据。
StopCursor.latest()
- 停止于某条消息,结果里不包含此消息。
StopCursor.atMessageId(MessageId)
- 停止于某条消息之后,结果里包含此消息。
StopCursor.afterMessageId(MessageId)
- 停止于某个给定的消息时间戳。
StopCursor.atEventTime(long)
8:动态分区发现
为了能在启动 Flink 任务之后还能发现在 Pulsar 上扩容的分区或者是新创建的 topic,连接器提供了动态分区发现机制。该机制不需要重启 Flink 任务。对选项 PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS
设置一个正整数即可启用。
为了能在启动 Flink 任务之后还能发现在 Pulsar 上扩容的分区或者是新创建的 topic,连接器提供了动态分区发现机制。该机制不需要重启 Flink 任务。对选项 PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS 设置一个正整数即可启用。
默认情况下,Pulsar 启用动态分区发现,查询间隔为 30 秒。用户可以给定一个负数,将该功能禁用。如果使用批的方式消费数据,将无法启用该功能。
9:event_time and watermark
默认情况下,连接器使用 Pulsar 的 Message<byte[]>
里面的时间作为解析结果的时间戳。用户可以使用 WatermarkStrategy
来自行解析出想要的消息时间,并向下游传递对应的水位线。
env.fromSource(pulsarSource, new CustomWatermarkStrategy(), "Pulsar Source With Custom Watermark Strategy")
如何定义 WatermarkStrategy:
生成 Watermark | Apache Flink
10: pulsar sink demo
PulsarSerializationSchema<Person> pulsarSerialization = new PulsarSerializationSchemaWrapper.Builder<>(JsonSer.of(Person.class)) .usePojoMode(Person. class, RecordSchemaType.JSON) .setTopicExtractor(person -> null) .build(); FlinkPulsarSink<Person> sink = new FlinkPulsarSink( serviceUrl, adminUrl, Optional.of(topic), // mandatory target topic or use `Optional.empty()` if sink to different topics for each record props, pulsarSerialization ); stream.addSink(sink);
11:读取kafka写入pulsar
1:序列化与反序列化
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; public class ByteArraySchema extends AbstractDeserializationSchema<byte[]> implements SerializationSchema<byte[]> { @Override public byte[] deserialize(byte[] bytes) { return bytes; } @Override public byte[] serialize(byte[] bytes) { return bytes; } }
2:主类
import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper; import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarProducer; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import java.util.Properties; public class KafkaToPulsarCommon { public static StreamExecutionEnvironment getEnv() { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 关闭日志 env.getConfig().disableSysoutLogging(); //确保一次语义 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 重启策略 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, Time.seconds(60))); // 设置checkpoint时间 env.enableCheckpointing(300000, CheckpointingMode.EXACTLY_ONCE); // 指定checkpoint执行的超时时间 env.getCheckpointConfig().setCheckpointTimeout(Time.seconds(300).toMilliseconds()); // checkpoint完成之后最小等多久可以触发另一个checkpoint env.getCheckpointConfig().setMinPauseBetweenCheckpoints(Time.seconds(30).toMilliseconds()); // cancel后保留checkpoint env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); return env; } public static Properties kafkaProperties(String broker, String groupid) { Properties props = new Properties(); props.setProperty("bootstrap.servers", broker); props.setProperty("flink.partition-discovery.interval-millis", String.valueOf(5 * 60 * 1000)); props.setProperty("group.id", groupid); return props; } public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); final String topic = params.get("topic", "push_single_task"); final String slb = params.get("clusterAddr", "localhost:9092"); final String groupid = params.get("groupid", "kafka2pulsar_test"); final String serviceUrl = params.get("serviceUrl", "pulsar://localhost:6650"); final String outputTopic = params.get("outputTopic", "persistent://test/ns1/river_test1"); final int sinkP = params.getInt("sinkP", 1); final String offset = params.get("offset", "groupid"); final StreamExecutionEnvironment env = getEnv(); FlinkKafkaConsumer<byte[]> consumer = new FlinkKafkaConsumer<>(topic, new KafkaDeserializationSchemaWrapper<>(new ByteArraySchema()), kafkaProperties(slb, groupid)); if ("earliest".equals(offset)) { consumer.setStartFromEarliest(); } if ("latest".equals(offset)) { consumer.setStartFromLatest(); } DataStream<byte[]> stream = env.addSource(consumer); ClientConfigurationData clientConf = new ClientConfigurationData(); clientConf.setServiceUrl(serviceUrl); clientConf.setAuthentication(new AuthenticationDisabled()); ProducerConfigurationData producerConf = new ProducerConfigurationData(); producerConf.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition); producerConf.setTopicName(outputTopic); producerConf.setBlockIfQueueFull(true); stream.addSink(new FlinkPulsarProducer<>( clientConf, producerConf, new ByteArraySchema(), null, null )).setParallelism(sinkP); env.execute("kafka2pulsar"); } }