开发者社区> 问答> 正文

kafka-connect json格式适配问题?

kafka->Flink->kafka->mysql Flink用sql处理之后数据写入kafka里面,格式为json,再用kafka-connect-jdbc将数据导出到mysql中。 使用kafka-connect是方便数据同时导出到其他存储

Flink定义输出表结构:

CREATE TABLE print_table \

(total_count BIGINT, username STRING, update_time TIMESTAMP(6)) \

WITH (\

'connector' = 'kafka', \

'topic' = 'test_out', \

'properties.bootstrap.servers' = '127.0.0.1:9092', \

'sink.partitioner' = 'round-robin', \

'format' = 'json')

输出的数据格式示例:

{"total_count":12,"username":"admin","update_time":"2020-07-27 17:23:00"}

但是kafka-connect-jdbc的json格式需要schema和payload,示例:

{

"schema": {

"type": "struct",

"fields": [

{

"type": "int64",

"optional": false,

"field": "id"

},

{

"type": "string",

"optional": true,

"field": "name"

}

],

"optional": true,

"name": "user"

},

"payload": {

"id": 1,

"name": "admin"

}

}

请教下在Flink里面如何处理(补上schema和payload?),才能形成kafka connect匹配的json格式?

当前Flink处理sql:

INSERT INTO test_out(total_count,username,update_time) SELECT count(1) AS total_count,username,TUMBLE_START(update_time,INTERVAL '1' MINUTE) as update_time FROM table1 GROUP BY username,TUMBLE(update_time,INTERVAL '1' MINUTE)*来自志愿者整理的flink邮件归档

展开
收起
说了是一只鲳鱼 2021-12-07 10:13:13 895 0
1 条回答
写回答
取消 提交回答
  • 你需要在 DDL 和 query 上都补上 schema 和 payload:

    CREATE TABLE print_table \

    (schema STRING, payload ROW<total_count BIGINT, username STRING,

    update_time TIMESTAMP(6)>) \

    WITH (\

    'connector' = 'kafka', \

    'topic' = 'test_out', \

    'properties.bootstrap.servers' = '127.0.0.1:9092', \

    'sink.partitioner' = 'round-robin', \

    'format' = 'json')

    -- DML 上可以用常量写死 schema, 用 ROW 函数封装 payload

    INSERT INTO output

    SELECT '{ "type": "struct", ...}' as schema, ROW(totall_count, username,

    update_time) as payload

    FROM ...

    Btw, 我想问一下,为什么一定要用 kafka-jdbc-connect 去同步到 mysql 呢?个人觉得直接用 Flink SQL 同步到

    mysql 不是很方便么?*来自志愿者整理的flink邮件归档

    2021-12-07 11:35:52
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载