解析kafka的mysql binlog问题 你好。这是我的解析sql。我想读取binlog的数据数据和表数据。为什么可以取到表不能取到数据呢?
私有静态最终字符串 KAFKA_SQL = "创建表 kafkaTable (\n" + " 数据
VARCHAR , " + " table
VARCHAR " + ") WITH (" + " '连接器' = '卡夫卡'," + " 'topic' = 'order_source'," + " 'properties.bootstrap.servers' = '***'," + " 'properties.group.id' = 'real1'," + " '格式' = 'json'," + " 'scan.startup.mode' = '最早的偏移'" +
")"; 你好测试代码如下
私有静态最终字符串 KAFKA_SQL = "创建表 kafkaTable (\n" + " 数据
VARCHAR , " + " table
VARCHAR " + ") WITH (" + " '连接器' = '卡夫卡'," + " 'topic' = 'source_databases'," + " 'properties.bootstrap.servers' = '***'," + " 'properties.group.id' = 'real1'," + " '格式' = 'json'," + " 'scan.startup.mode' = '最早的偏移'" + ")"; public static void main(String[] args) 抛出异常 {
//绑定表 StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
tableResult.print();
Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
bsEnv.execute("aa");
}
输出结果如下数据都是空的。数据格式为canal解析的mysql binlog ,order_operation_time ,inventory_batch_log ,order_log ,order_address_book ,product_inventory ,order_physical_relation ,bil_business_attach ,picking_detail ,picking_detail ,订单
1.11版本blink datastream转表吗? 看到的例子都是用OldPlanner来转表的。 致谢*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
从你的描述来看,你正在尝试使用Apache Flink读取Kafka中由Canal解析的MySQL binlog数据,并且已经设置了一个表kafkaTable
来接收这些数据。但是,你在查询这个表时发现没有数据输出。
有几个可能的问题点和解决方向:
SQL语法错误:首先,你的建表语句中的字段名似乎有误,中文的“数据”和“表”作为字段名在大多数数据库系统中是不被推荐的,尤其是如果配置文件或系统不支持中文字符的话,这可能导致解析错误。建议将字段名改为英文,例如data VARCHAR, table_name VARCHAR
。
连接器配置:确认你的Kafka连接器配置是否正确无误,特别是topic
名称、bootstrap.servers
地址、以及group.id
是否与Kafka集群实际配置相匹配。此外,确保binlog数据确实被发送到了指定的Kafka topic中。
数据格式问题:你提到数据格式为Canal解析的MySQL binlog,而配置中使用的格式是'format' = 'json'
。需要确认Canal输出到Kafka的数据确实是JSON格式的。如果不是,你需要调整Flink作业中的格式配置以匹配实际的数据格式。
Flink版本兼容性:你提到了Flink 1.11版本和Blink Planner。虽然Blink Planner在Flink 1.11中可用,但确保所有组件(包括Flink自身、连接器等)都兼容该版本。同时,旧版Planner(也称为Batch Table Planner)在某些场景下可能与特定的连接器或功能更兼容,因此如果看到的例子多用OldPlanner,可以考虑切换计划器进行测试。
数据流处理模式:检查你的Flink作业是否正确地设置了执行模式(如流处理模式),并且确保作业成功启动并消费了Kafka中的数据。可以通过Flink的Web UI监控作业状态和数据流入情况。
SQL查询问题:最后,确保你的查询语句正确无误。尽管看起来简单,但有时候小错误也会导致无法获取数据,比如表名、字段名的大小写敏感问题等。
建议步骤: - 调整表定义中的字段名为英文。 - 确认Kafka topic中的数据格式与Flink配置匹配。 - 检查并验证所有配置项,包括Kafka连接器的配置。 - 使用Flink Web UI监控作业运行状态,查看是否有错误日志。 - 如果可能,尝试使用旧版Planner对比测试,看是否为规划器兼容性问题。
希望这些建议能帮助你解决问题。如果还有具体错误信息或进一步的细节,提供更多信息可能会有助于更精确地定位问题所在。