解析kafka的mysql binlog问题 你好。这是我的解析sql。我想读取binlog的数据数据和表数据。为什么可以取到表不能取到数据呢?
私有静态最终字符串 KAFKA_SQL = "创建表 kafkaTable (\n" + " 数据
VARCHAR , " + " table
VARCHAR " + ") WITH (" + " '连接器' = '卡夫卡'," + " 'topic' = 'order_source'," + " 'properties.bootstrap.servers' = '***'," + " 'properties.group.id' = 'real1'," + " '格式' = 'json'," + " 'scan.startup.mode' = '最早的偏移'" +
")"; 你好测试代码如下
私有静态最终字符串 KAFKA_SQL = "创建表 kafkaTable (\n" + " 数据
VARCHAR , " + " table
VARCHAR " + ") WITH (" + " '连接器' = '卡夫卡'," + " 'topic' = 'source_databases'," + " 'properties.bootstrap.servers' = '***'," + " 'properties.group.id' = 'real1'," + " '格式' = 'json'," + " 'scan.startup.mode' = '最早的偏移'" + ")"; public static void main(String[] args) 抛出异常 {
//绑定表 StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
tableResult.print();
Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
bsEnv.execute("aa");
}
输出结果如下数据都是空的。数据格式为canal解析的mysql binlog ,order_operation_time ,inventory_batch_log ,order_log ,order_address_book ,product_inventory ,order_physical_relation ,bil_business_attach ,picking_detail ,picking_detail ,订单
1.11版本blink datastream转表吗? 看到的例子都是用OldPlanner来转表的。 致谢*来自志愿者整理的flink邮件归档
有kafka 中json 数据的样例不? 有没有任务管理器有没有异常日志信息? 因为“数据”是一个复杂的结构,不是容易的字符串结构。所以1.11至今,这个功能还不支持。 1.12中已经支持读取复杂结构为字符串类型了。*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。