Kafka通过Hologres Connector写入Hologres

本文涉及的产品
实时数仓Hologres,5000CU*H 100GB 3个月
简介: 本文将会介绍如何通过Hologres connector将Kafka的数据写入Hologres

Hologres有着非常庞大的生态家族,支持多种异构数据源的离线、实时写入:

  • 对于开源大数据:Hologres支持当下最流行的大数据开源组件,其中包括Flink、Blink和Spark等,通过内置的Hologres Connector实现高并发实时写入。
  • 对于数据库类数据:Hologres与DataWorks数据集成(DataX和StreamX)深度集成,支持通过Hologres WriterHologres Reader,实现方便高效地将多种数据库数据离线、实时、整库同步至Hologres中,满足各类企业数据同步迁移的需求。

image.png

Kafka 是一款高吞吐量、高可扩展性的分布式消息队列服务,广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等场景。

可以通过DataWorks数据集成或者Flink将kafka数据实时同步至Hologres。Hologres也实现了开源Connector,将Kafka数据直接写入Hologres。


本文将会介绍如何使用Hologres connector将Kafka写入Hologres,本文相关内容可以移步Hologres 开源connector

准备工作

  • 需要Hologres 0.9及以上版本
  • 需要Kafka-0.10.0.0及以上版本
  • 建议使用Kafka-1.0.0及以上版本
  • kafka-0.10.0.0, kafka-0.10.1.0 不支持whole_message_info=true

从中央仓库获取jar

可以在项目pom文件中通过如下方式引入依赖,其中必须加上,防止发生依赖冲突。

<dependency><groupId>com.alibaba.hologres</groupId><artifactId>hologres-connector-kafka</artifactId><version>1.0.0</version><classifier>jar-with-dependencies</classifier></dependency>

自行编译

build jar

mvn-plhologres-connector-kafkacleanpackage-DskipTests

加载jar包(以下两种方式可选,此处以放在工作目录为例)

  • KAFKA_HOME:
  • 将hologres-connector-kafka-1.0-SNAPSHOT-jar-with-dependencies.jar放在Kafka安装目录的libs中。即 $KAFKA_HOME/libs
  • 工作目录
  • 将hologres-connector-kafka-1.0-SNAPSHOT-jar-with-dependencies.jar放入你的工作目录,以下示例皆以/user/xxx/workspace为例
  • connect-standalone.properties中设置jar包路径,plugin.path=/user/xxx/workspace
  • kafka-1.0.0版本之前,不支持设置工作目录,只能将jar文件放入$KAFKA_HOME/libs

配置文件说明

配置文件在本项目的hologres-connector-kafka/src/main/resources/*路径中

  • holo-sink.properties是hologres-connector-kafka的配置文件, 用于配置connector相关的参数,参数详情见下方的参数说明
  • connect-standalone.properties是kafka原有的配置文件,原路径在kafka的安装目录的$KAFKA_HOME/libexec/config中,本项目将其复制一份方便使用

消费模式

  • 本 connector 目前含有如下三种消费模式,使用方式见下方示例
  • json: 接受只包含键值对的json{key1:value1,key2:value2},其中key名称与hologres表字段名对应。
  • struct_json: 接受特定的json格式输入(包含schema以及payload的定义),可以指定表schema,包括表的字段名、类型等,存入相应的hologres表中。
  • string: 接受key:value格式的输入,将key、value当作字符串存入特定的hologres表中,注意kafka的key,value通过tab键分割。


使用示例1:json模式

  • json 模式消费的是只包含键值对的json{key1:value1,key2:value2},其中key名称与hologres表字段名对应。

1.将jar包、配置文件复制到工作目录,本示例以单节点模式standalone为例:

cp hologres-connector-kafka/target/hologres-connector-kafka-1.0-SNAPSHOT-jar-with-dependencies.jar /user/xxx/workspace/
cp hologres-connector-kafka/src/main/resources/connect-standalone.properties /user/xxx/workspace/
cp hologres-connector-kafka/src/main/resources/holo-sink.properties /user/xxx/workspace/

2.修改配置文件

修改holo-sink.properties

input_format=json
connection.jdbcUrl=jdbc:postgresql://hostname:port/database_name
connection.username=your_username
connection.password=your_password
table=test_json

修改connect-standalone.properties

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=falseplugin.path=/user/xxx/workspace

3.创建Hologres表

createtable test_json(  id int PRIMARY KEY,  name text,  weight DOUBLEPRECISION,  thick FLOAT,  deci_0 numeric(15,3),  time_0 timestamptz,  kafkaTopic text,  kafkaPartition int,  kafkaOffset bigint,  kafkaTimestamp timestamptz
);

4.开启producer并通过connector消费

4.1 创建名称为kafka_sink_test的kafka topic:

kafka-topics.sh --zookeeper localhost:2181 --delete--topic kafka_sink_test
kafka-topics.sh --create--topic kafka_sink_test --partitions3--replication-factor1--zookeeper127.0.0.1:2181


4.2 开启producer并输入数据

  • 启动producer
kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_sink_test


  • 复制下列json到producer terminal
  • 其中 timestamptz类型可以是时间戳(long)或者字符串
{"id":1,"name":"abcd","weight":123.456,"thick":12.34,"deci_0":456.789,"time_0":"2021-05-21 16:00:45"}
{"id":2,"name":"efgh","weight":123.456,"thick":12.34,"deci_0":456.789,"time_0":1567688965261}

4.3 开启connector-cunstomer进行消费

connect-standalone.sh connect-standalone.properties holo-sink.properties

4.4 写入hologres成功

数据如下图所示

id | name | weight  | thick | deci_0  |           time_0           |   kafkatopic    | kafkapartition | kafkaoffset |       kafkatimestamp       
----+------+---------+-------+---------+----------------------------+-----------------+----------------+-------------+----------------------------
  2 | efgh | 123.456 | 12.34 | 456.789 | 2019-09-05 21:09:25.261+08 | kafka_sink_test |              0 |           0 | 2021-05-25 17:26:36.818+08
  1 | abcd | 123.456 | 12.34 | 456.789 | 2021-05-21 16:00:45+08     | kafka_sink_test |              2 |           2 | 2021-05-25 17:26:35.335+08


使用示例2:struct_json模式

  • struct_json 模式消费的是特殊格式的json(包含schema以及payload的定义),可以指定表的字段名、字段类型等,存入相应的hologres表中

1.将jar包、配置文件复制到工作目录,本示例以单节点模式standalone为例:

cp hologres-connector-kafka/target/hologres-connector-kafka-1.0-SNAPSHOT-jar-with-dependencies.jar /user/xxx/workspace/
cp hologres-connector-kafka/src/main/resources/connect-standalone.properties /user/xxx/workspace/
cp hologres-connector-kafka/src/main/resources/holo-sink.properties /user/xxx/workspace/

2.修改配置文件

修改holo-sink.properties

input_format=struct_json
connection.jdbcUrl=jdbc:postgresql://hostname:port/database_name
connection.username=your_username
connection.password=your_password
table=test_struct_json

修改connect-standalone.properties

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=trueplugin.path=/user/xxx/workspace

3.创建Hologres表

createtable test_struct_json(  id int PRIMARY KEY,  name text,  weight DOUBLEPRECISION,  thick FLOAT,  deci_0 numeric(15,3),  time_0 timestamptz,  deci_1 numeric(15,3),  time_1 timestamptz,  kafkaTopic text,  kafkaPartition int,  kafkaOffset bigint,  kafkaTimestamp timestamptz
);

4.开启producer并通过connector消费

4.1 创建名称为kafka_sink_test的kafka topic:

kafka-topics.sh --zookeeper localhost:2181 --delete--topic kafka_sink_test
kafka-topics.sh --create--topic kafka_sink_test --partitions3--replication-factor1--zookeeper127.0.0.1:2181

4.2 开启producer并输入数据

  • 启动producer
kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_sink_test
  • 复制下列json到producer terminal
{
"schema": {
"type": "struct",
"fields": [
      {
"type": "int32",
"optional": false,
"field": "id"      },
      {
"type": "string",
"optional": false,
"field": "name"      },
      {
"type": "double",
"optional": false,
"field": "weight"      },
      {
"type": "float",
"optional": false,
"field": "thick"      },
      {
"name": "org.apache.kafka.connect.data.Decimal",
"type": "bytes",
"optional": false,
"parameters": {
"scale": "3",
"connect.decimal.precision": "15"        },
"field": "deci_0"      },
      {
"name": "org.apache.kafka.connect.data.Timestamp",
"type": "int64",
"optional": false,
"field": "time_0"      },
      {
"name": "Decimal",
"type": "string",
"optional": false,
"field": "deci_1"      },
      {
"name": "Timestamp",
"type": "string",
"optional": false,
"field": "time_1"      }
    ],
"optional": false,
"name": "test_struct_json"  },
"payload": {
"id": 1,
"name": "abcd",
"weight": 123.456,
"thick": 12.34,
"deci_0": "W42A7",
"time_0": 1567688965261,
"deci_1": "999.8888",
"time_1": "2021-05-20 11:09:25"  }
}
  • 以上是消费时被压缩为一行的json,格式化的完整json如下:
  • 可以看到,Decimal、Date、Timestamp三种类型需要通过fields.name声明,详见 4.5类型映射
{
"schema": {
"type": "struct",
"fields": [
      {
"type": "int32",
"optional": false,
"field": "id"      },
      {
"type": "string",
"optional": false,
"field": "name"      },
      {
"type": "double",
"optional": false,
"field": "weight"      },
      {
"type": "float",
"optional": false,
"field": "thick"      },
      {
"name": "org.apache.kafka.connect.data.Decimal",
"type": "bytes",
"optional": false,
"parameters": {
"scale": "3",
"connect.decimal.precision": "15"        },
"field": "deci_0"      },
      {
"name": "org.apache.kafka.connect.data.Timestamp",
"type": "int64",
"optional": false,
"field": "time_0"      },
      {
"name": "Decimal",
"type": "string",
"optional": false,
"field": "deci_1"      },
      {
"name": "Timestamp",
"type": "string",
"optional": false,
"field": "time_1"      }
    ],
"optional": false,
"name": "test_struct_json"  },
"payload": {
"id": 1,
"name": "abcd",
"weight": 123.456,
"thick": 12.34,
"deci_0": "W42A7",
"time_0": 1567688965261,
"deci_1": "999.8888",
"time_1": "2021-05-20 11:09:25"  }
}

4.3 开启connector-cunstomer进行消费

connect-standalone.sh connect-standalone.properties holo-sink.properties

4.4 写入hologres成功,数据如下图所示

id | name | weight  |      thick       |  deci_0  |           time_0           | deci_1  |         time_1         |   kafkatopic    | kafkapartition | kafkaoffset |       kafkatimestamp       
----+------+---------+------------------+----------+----------------------------+---------+------------------------+-----------------+----------------+-------------+----------------------------
  1 | abcd | 123.456 | 12.3400001525879 | 6000.000 | 2019-09-05 21:09:25.261+08 | 999.889 | 2021-05-20 11:09:25+08 | kafka_sink_test |              0 |           0 | 2021-05-24 13:40:41.264+08

4.5 类型映射

  • 仅针对input_format=struct_json,定义了相应字段的类型,可以进行类型映射
kafka holo
int8 INT
int16 INT
int32 INT
int64 BIGINT
boolean BOOL
float FLOAT
double DOUBLE PRECISION
string TEXT
Decimal NUMERIC(38,18)
Date DATE
Timestamp TIMESTAMPTZ

注1: Decimal、Date、Timestamp三种类型需要通过fields.name声明,声明方式分为两种,区别如下表。 前三行为org.apache.kafka.connect.data类型, 对输入格式要求较为严格,适合消费其他组件生成的struct_json数据时使用; 而直接消费文件等数据建议使用后三行的类型,即通过string写入,具有较高的可读性。

name 需要的数据类型 对应的holo数据类型
org.apache.kafka.connect.data.Decimal bytes NUMERIC(38,18)
org.apache.kafka.connect.data.Date long DATE
org.apache.kafka.connect.data.Timestamp long TIMESTAMPTZ
Decimal string NUMERIC(38,18)
Date string DATE
Timestamp string TIMESTAMPTZ


使用示例3:string模式

  • string模式消费的是key:value格式的输入,将key、value当作字符串存入特定的hologres表中

1.将jar包、配置文件复制到工作目录,本示例以单节点模式standalone为例:

cp hologres-connector-kafka/target/hologres-connector-kafka-1.0-SNAPSHOT-jar-with-dependencies.jar /user/xxx/workspace/
cp hologres-connector-kafka/src/main/resources/connect-standalone.properties /user/xxx/workspace/
cp hologres-connector-kafka/src/main/resources/holo-sink.properties /user/xxx/workspace/

2.修改配置文件

修改holo-sink.properties

input_format=string
connection.jdbcUrl=jdbc:postgresql://hostname:port/database_name
connection.username=your_username
connection.password=your_password
table=test_string

修改connect-standalone.properties

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=trueplugin.path=/user/xxx/workspace

3.创建Hologres表

createtable test_string(  key text,  value text,  kafkaTopic text,  kafkaPartition int,  kafkaOffset bigint,  kafkaTimestamp timestamptz
);

4.开启producer并通过connector消费

4.1 创建名称为kafka_sink_test的kafka topic:

kafka-topics.sh --zookeeper localhost:2181 --delete--topic kafka_sink_test
kafka-topics.sh --create--topic kafka_sink_test --partitions3--replication-factor1--zookeeper127.0.0.1:2181

4.2 开启producer并输入数据

  • 启动producer
kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_sink_test --property parse.key=true
  • 复制下列数据到producer terminal,注意kafka消息的key:value通过tab键分割。
key1  value1
key2  value2
key3  value3

4.3 开启connector-cunstomer进行消费

connect-standalone.sh connect-standalone.properties holo-sink.properties

4.4 写入hologres成功,数据如下图所示

key  | value  |   kafkatopic    | kafkapartition | kafkaoffset |       kafkatimestamp       
------+--------+-----------------+----------------+-------------+----------------------------
 key3 | value3 | kafka_sink_test |              0 |           2 | 2021-05-21 14:55:43.037+08
 key2 | value2 | kafka_sink_test |              0 |           1 | 2021-05-21 14:55:39.301+08
 key1 | value1 | kafka_sink_test |              0 |           0 | 2021-05-21 14:55:35.352+08


使用示例4:脏数据处理

  • 基于使用示例1: json 模式进行展示

1.开启producer并通过connector消费

1.1 创建名称为kafka_sink_test的kafka topic:

kafka-topics.sh --zookeeper localhost:2181 --delete--topic kafka_sink_test
kafka-topics.sh --create--topic kafka_sink_test --partitions3--replication-factor1--zookeeper127.0.0.1:2181

1.2 开启producer并输入数据

  • 启动producer
kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_sink_test
  • 复制下列json到producer terminal
  • 其中 timestamptz类型可以是时间戳(long)或者字符串
{"id":1,"name":"abcd","weight":123.456,"thick":12.34,"deci_0":456.789,"time_0":"2021-05-21 16:00:45"}
{"id":2,"wrong_name":"efgh","weight":123.456,"thick":12.34,"deci_0":456.789,"time_0":"2021-05-21 16:00:45"}

1.3 开启connector-cunstomer进行消费

connect-standalone.sh connect-standalone.properties holo-sink.properties

1.4 写入hologres抛出异常

com.alibaba.hologres.kafka.exception.KafkaHoloException: 
If you want to skip this dirty data, please add < dirty_date_strategy=SKIP_ONCE > and < dirty_date_to_skip_once=kafka_sink_test,0,0 > in holo-sink.properties; or add < dirty_date_strategy=SKIP > to skip all dirty data(not recommended).

2.配置脏数据处理策略之后重试

2.1 配置脏数据处理策略

dirty_date_strategy=SKIP_ONCE
dirty_date_to_skip_once=kafka_sink_test,0,0

2.1 脏数据SKIP日志

[2021-09-02 20:00:56,586] WARN Skip(once) Dirty Data: SinkRecord{kafkaOffset=0, timestampType=CreateTime} ConnectRecord{topic='kafka_sink_test', kafkaPartition=0, key=null, keySchema=Schema{STRING}, value={time_0=2021-05-21 16:00:45, wrong_name=efgh, weight=123.456, deci_0=456.789, id=2, thick=12.34}, valueSchema=null, timestamp=1630583824222, headers=ConnectHeaders(headers=)} (com.alibaba.hologres.kafka.sink.HoloSinkWriter:85)

2.3 其他数据写入成功

id | name | weight  | thick | deci_0  |         time_0         |   kafkatopic    | kafkapartition | kafkaoffset |       kafkatimestamp       
----+------+---------+-------+---------+------------------------+-----------------+----------------+-------------+----------------------------
  1 | abcd | 123.456 | 12.34 | 456.789 | 2021-05-21 16:00:45+08 | kafka_sink_test |              1 |           0 | 2021-09-02 19:57:00.005+08


Kafka的两种connect模式

  • kafka有两种connect模式,上述示例使用的是单节点模式,还有一种是分布式模式,其与单节点模式类似,相应的配置文件参数也相同

单节点模式

connect-standalone.sh connect-standalone.properties holo-sink.properties

分布式模式

分布式模式的holo-sink.json文件与单节点模式的holo-sink.properties参数相同,部分版本需要将value.converter等参数也在json中声明,

connect-distributed.properties文件与单节点模式的connect-standalone.properties参数相同,详情请参考下方参数说明

connect-distributed.sh connect-distributed.properties
curl-s-X POST -H'Content-Type: application/json'--data @holo-sink.json http://localhost:8083/connectors


参数说明

参数在配置文件holo-sink.properties 以及 holo-sink.json中指定

参数名 默认值 是否必填 说明
name 此次运行的connector的名称
connector.class 必须为com.alibaba.hologres.kafka.HoloSinkConnector
tasks.max 创建的最大任务数
driver.class 必须为org.postgresql.Driver
topics connector消费的topic名称
input_format json 不同的消费模式,具体见使用示例
whole_message_info true 是否需要在holo表中写入message的所有相关信息,为true则可能需要设置相关字段名称,详见下方注释[1]
connection.jdbcUrl Hologres实时数据API的jdbcUrl,包含数据库名称
connection.username 阿里云账号的AccessKey ID
connection.password 阿里云账号的Accesskey SECRET
table Hologres用于接收数据的表名称
connection.writeMode INSERT_OR_REPLACE 当INSERT目标表为有主键的表时采用不同策略:
INSERT_OR_IGNORE 当主键冲突时,不写入
INSERT_OR_UPDATE 当主键冲突时,更新相应列
INSERT_OR_REPLACE 当主键冲突时,更新所有列
connection.writeBatchSize 512 每个写入线程的最大批次大小,
在经过WriteMode合并后的Put数量达到writeBatchSize时进行一次批量提交
connection.writeBatchByteSize 2097152(2 * 1024 * 1024) 每个写入线程的最大批次bytes大小,单位为Byte,默认2MB,
在经过WriteMode合并后的Put数据字节数达到writeBatchByteSize时进行一次批量提交
connection.rewriteSqlMaxBatchSize 1024 单条sql进行INSERT/DELETE操作的最大批次大小,
比如写入操作,所攒的批会通过 writeBatchSize/rewriteSqlMaxBatchSize 条INSERT语句完成插入
connection.writeMaxIntervalMs 10000 距离上次提交超过writeMaxIntervalMs会触发一次批量提交
connection.writeFailStrategy TYR_ONE_BY_ONE 当发生写失败时的重试策略:
TYR_ONE_BY_ONE 当某一批次提交失败时,会将批次内的记录逐条提交(保序),其中某单条提交失败的记录将会跟随异常被抛出
NONE 直接抛出异常
connection.writeThreadSize 1 写入并发线程数(每个并发占用1个数据库连接)
connection.dynamicPartition false 若为true,写入分区表父表时,当分区不存在时自动创建分区
connection.retryCount 3 当连接故障时,写入和查询的重试次数
connection.retrySleepInitMs 1000 每次重试的等待时间=retrySleepInitMs+retry*retrySleepStepMs
connection.retrySleepStepMs 10000 每次重试的等待时间=retrySleepInitMs+retry*retrySleepStepMs
connection.connectionMaxIdleMs 60000 写入线程和点查线程数据库连接的最大Idle时间,超过连接将被释放
initial_timestamp -1 从某个时间点开始消费kafka数据写入hologres
dirty_date_strategy EXCEPTION 脏数据处理策略,只对空数据或者schema错误的脏数据有效,不能处理格式错误的数据(input_format错误,乱码等)
EXCEPTION: 脏数据抛出异常
SKIP: 跳过脏数据,打印warn日志
SKIP_ONCE: 跳过特定的一条脏数据,详见dirty_date_to_skip_once参数
dirty_date_to_skip_once null,-1,-1 在dirty_date_strategy=SKIP_ONCE 时生效,由三个部分组成,分别是需要跳过的脏数据对应的topic,partition,offset,通过,隔开
metrics_report_interval 60 metrics report间隔,单位为s
设置为 -1 表示不开启

说明:写入message相关信息,可以设置相关字段的名称

参数名 默认值 是否必填 说明
message_topic kafkatopic message的topic在hologres中字段名
message_partition kafkapartition message的分区在hologres中字段名
message_offset kafkaoffset message的offset在hologres中字段名
message_timestamp kafkatimestamp message生成时的时间戳在hologres中的字段名


了解Hologres:https://www.aliyun.com/product/bigdata/hologram

合集.png

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
相关文章
|
7月前
|
消息中间件 Kafka Apache
Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
【2月更文挑战第6天】Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
228 2
|
4月前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
消息中间件 SQL Java
实时数仓 Hologres产品使用合集之如何用python将kafka数据写入
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
5月前
|
消息中间件 Kafka 数据处理
实时计算 Flink版操作报错合集之使用kafka connector时,报错:java.lang.ClassNotFoundException,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
消息中间件 Kafka Serverless
ffc的kafka触发器和kafka的connector的主要区别
c的kafka触发器和kafka的connector的主要区别
69 1
|
消息中间件 SQL JSON
flink kafka connector源码解读(超详细)
flink kafka connector源码解读(超详细)
359 0
|
消息中间件 存储 SQL
使用Flink 读取kafka Catalog 源表数据展开json写入下游hologres表示例操作
本文主要介绍如何使用Flink 读取kafka Catalog 源表数据写入下游hologres存储,并演示&#39;json.infer-schema.flatten-nested-columns.enable参数,递归的展开JSON中的嵌套列的效果。
使用Flink 读取kafka Catalog 源表数据展开json写入下游hologres表示例操作
|
消息中间件 存储 缓存
Flink 1.14.0 全新的 Kafka Connector
Apache Kafka Connector# Flink 提供了一个 Apache Kafka 连接器,用于从 Kafka Topic 读取数据和向 Kafka Topic 写入数据,并保证恰好一次次语义。 Dependency# Apache Flink 附带了一个通用的 Kafka 连接器,它试图跟踪最新版本的 Kafka 客户端。它使用的客户端版本可能会在 Flink 版本之间发生变化
Flink 1.14.0 全新的 Kafka Connector
|
消息中间件 Kafka 流计算
Flink Kafka Connector 与 Exactly Once 剖析
Flink Kafka Connector 是 Flink 内置的 Kafka 连接器,它包含了从 Kafka Topic 读入数据的 Flink Kafka Consumer 以及向 Kafka Topic 写出数据的 Flink Kafka Producer,除此之外 Flink Kafa Connector 基于 Flink Checkpoint 机制提供了完善的容错能力。
|
2月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。

热门文章

最新文章

相关产品

  • 实时数仓 Hologres