开发者社区 问答 正文

解析kafka的mysql binlog问题

解析kafka的mysql binlog问题 你好。这是我的解析sql。我想读取binlog的数据数据和表数据。为什么可以取到表不能取到数据呢?

私有静态最终字符串 KAFKA_SQL = "创建表 kafkaTable (\n" + " 数据 VARCHAR , " + " table VARCHAR " + ") WITH (" + " '连接器' = '卡夫卡'," + " 'topic' = 'order_source'," + " 'properties.bootstrap.servers' = '***'," + " 'properties.group.id' = 'real1'," + " '格式' = 'json'," + " 'scan.startup.mode' = '最早的偏移'" +

")"; 你好测试代码如下

私有静态最终字符串 KAFKA_SQL = "创建表 kafkaTable (\n" + " 数据 VARCHAR , " + " table VARCHAR " + ") WITH (" + " '连接器' = '卡夫卡'," + " 'topic' = 'source_databases'," + " 'properties.bootstrap.servers' = '***'," + " 'properties.group.id' = 'real1'," + " '格式' = 'json'," + " 'scan.startup.mode' = '最早的偏移'" + ")"; public static void main(String[] args) 抛出异常 {

//绑定表 StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);

TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);

tableResult.print();

Table table = bsTableEnv.sqlQuery("select * from kafkaTable");

bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);

bsEnv.execute("aa");

}

输出结果如下数据都是空的。数据格式为canal解析的mysql binlog ,order_operation_time ,inventory_batch_log ,order_log ,order_address_book ,product_inventory ,order_physical_relation ,bil_business_attach ,picking_detail ,picking_detail ,订单

1.11版本blink datastream转表吗? 看到的例子都是用OldPlanner来转表的。 致谢*来自志愿者整理的flink邮件归档

展开
收起
小阿怪 2021-12-07 10:01:03 502 分享 版权
阿里云 AI 助理回答

从你的描述来看,你正在尝试使用Apache Flink读取Kafka中由Canal解析的MySQL binlog数据,并且已经设置了一个表kafkaTable来接收这些数据。但是,你在查询这个表时发现没有数据输出。

有几个可能的问题点和解决方向:

  1. SQL语法错误:首先,你的建表语句中的字段名似乎有误,中文的“数据”和“表”作为字段名在大多数数据库系统中是不被推荐的,尤其是如果配置文件或系统不支持中文字符的话,这可能导致解析错误。建议将字段名改为英文,例如data VARCHAR, table_name VARCHAR

  2. 连接器配置:确认你的Kafka连接器配置是否正确无误,特别是topic名称、bootstrap.servers地址、以及group.id是否与Kafka集群实际配置相匹配。此外,确保binlog数据确实被发送到了指定的Kafka topic中。

  3. 数据格式问题:你提到数据格式为Canal解析的MySQL binlog,而配置中使用的格式是'format' = 'json'。需要确认Canal输出到Kafka的数据确实是JSON格式的。如果不是,你需要调整Flink作业中的格式配置以匹配实际的数据格式。

  4. Flink版本兼容性:你提到了Flink 1.11版本和Blink Planner。虽然Blink Planner在Flink 1.11中可用,但确保所有组件(包括Flink自身、连接器等)都兼容该版本。同时,旧版Planner(也称为Batch Table Planner)在某些场景下可能与特定的连接器或功能更兼容,因此如果看到的例子多用OldPlanner,可以考虑切换计划器进行测试。

  5. 数据流处理模式:检查你的Flink作业是否正确地设置了执行模式(如流处理模式),并且确保作业成功启动并消费了Kafka中的数据。可以通过Flink的Web UI监控作业状态和数据流入情况。

  6. SQL查询问题:最后,确保你的查询语句正确无误。尽管看起来简单,但有时候小错误也会导致无法获取数据,比如表名、字段名的大小写敏感问题等。

建议步骤: - 调整表定义中的字段名为英文。 - 确认Kafka topic中的数据格式与Flink配置匹配。 - 检查并验证所有配置项,包括Kafka连接器的配置。 - 使用Flink Web UI监控作业运行状态,查看是否有错误日志。 - 如果可能,尝试使用旧版Planner对比测试,看是否为规划器兼容性问题。

希望这些建议能帮助你解决问题。如果还有具体错误信息或进一步的细节,提供更多信息可能会有助于更精确地定位问题所在。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答