kafka->Flink->kafka->mysql Flink用sql处理之后数据写入kafka里面,格式为json,再用kafka-connect-jdbc将数据导出到mysql中。 使用kafka-connect是方便数据同时导出到其他存储
Flink定义输出表结构:
CREATE TABLE print_table \
(total_count BIGINT, username STRING, update_time TIMESTAMP(6)) \
WITH (\
'connector' = 'kafka', \
'topic' = 'test_out', \
'properties.bootstrap.servers' = '127.0.0.1:9092', \
'sink.partitioner' = 'round-robin', \
'format' = 'json')
输出的数据格式示例:
{"total_count":12,"username":"admin","update_time":"2020-07-27 17:23:00"}
但是kafka-connect-jdbc的json格式需要schema和payload,示例:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int64",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "name"
}
],
"optional": true,
"name": "user"
},
"payload": {
"id": 1,
"name": "admin"
}
}
请教下在Flink里面如何处理(补上schema和payload?),才能形成kafka connect匹配的json格式?
当前Flink处理sql:
INSERT INTO test_out(total_count,username,update_time) SELECT count(1) AS total_count,username,TUMBLE_START(update_time,INTERVAL '1' MINUTE) as update_time FROM table1 GROUP BY username,TUMBLE(update_time,INTERVAL '1' MINUTE)*来自志愿者整理的flink邮件归档
你需要在 DDL 和 query 上都补上 schema 和 payload:
CREATE TABLE print_table \
(schema
STRING, payload
ROW<total_count BIGINT, username STRING,
update_time TIMESTAMP(6)>) \
WITH (\
'connector' = 'kafka', \
'topic' = 'test_out', \
'properties.bootstrap.servers' = '127.0.0.1:9092', \
'sink.partitioner' = 'round-robin', \
'format' = 'json')
-- DML 上可以用常量写死 schema, 用 ROW 函数封装 payload
INSERT INTO output
SELECT '{ "type": "struct", ...}' as schema, ROW(totall_count, username,
update_time) as payload
FROM ...
Btw, 我想问一下,为什么一定要用 kafka-jdbc-connect 去同步到 mysql 呢?个人觉得直接用 Flink SQL 同步到
mysql 不是很方便么?*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。