开发者社区> 问答> 正文

请问kafka connector ddl能不能像flinkKafkaConsumer.setSta

kafka connector ddl能不能像flinkKafkaConsumer.setStartFromTimestamp(xx)一样从指定timestamp开始消费,我看文档里只提到了earliest-offset,latest-offset,group-offsets,specific-offsets

CREATE TABLE MyUserTable (   ... ) WITH (   'connector.type' = 'kafka',      

  'connector.version' = '0.11',     -- required: valid connector versions are                                     -- "0.8", "0.9", "0.10", "0.11", and "universal"

  'connector.topic' = 'topic_name', -- required: topic name from which the table is read

  'connector.properties.zookeeper.connect' = 'localhost:2181', -- required: specify the ZooKeeper connection string   'connector.properties.bootstrap.servers' = 'localhost:9092', -- required: specify the Kafka server connection string   'connector.properties.group.id' = 'testGroup', --optional: required in Kafka consumer, specify consumer group   'connector.startup-mode' = 'earliest-offset',    -- optional: valid modes are "earliest-offset",                                                    -- "latest-offset", "group-offsets",                                                    -- or "specific-offsets"

  -- optional: used in case of startup mode with specific offsets   'connector.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300',

  'connector.sink-partitioner' = '...',  -- optional: output partitioning from Flink's partitions                                          -- into Kafka's partitions valid are "fixed"                                          -- (each Flink partition ends up in at most one Kafka partition),                                          -- "round-robin" (a Flink partition is distributed to                                          -- Kafka partitions round-robin)                                          -- "custom" (use a custom FlinkKafkaPartitioner subclass)   -- optional: used in case of sink partitioner custom   'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner',     'format.type' = '...',                 -- required: Kafka connector requires to specify a format,   ...                                    -- the supported formats are 'csv', 'json' and 'avro'.                                          -- Please refer to Table Formats section for more details. ) *来自志愿者整理的flink邮件归档

展开
收起
游客nnqbtnagn7h6s 2021-12-06 20:12:55 495 0
1 条回答
写回答
取消 提交回答
  • 目前版本不支持,我看1.11版本支持,其实可以自己修改支持

    *来自志愿者整理的flink邮件归档

    2021-12-06 21:40:58
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载

相关实验场景

更多