开发者社区> 问答> 正文

解析kafka的mysql binlog问题

解析kafka的mysql binlog问题 你好。这是我的解析sql。我想读取binlog的数据数据和表数据。为什么可以取到表不能取到数据呢?

私有静态最终字符串 KAFKA_SQL = "创建表 kafkaTable (\n" + " 数据 VARCHAR , " + " table VARCHAR " + ") WITH (" + " '连接器' = '卡夫卡'," + " 'topic' = 'order_source'," + " 'properties.bootstrap.servers' = '***'," + " 'properties.group.id' = 'real1'," + " '格式' = 'json'," + " 'scan.startup.mode' = '最早的偏移'" +

")"; 你好测试代码如下

私有静态最终字符串 KAFKA_SQL = "创建表 kafkaTable (\n" + " 数据 VARCHAR , " + " table VARCHAR " + ") WITH (" + " '连接器' = '卡夫卡'," + " 'topic' = 'source_databases'," + " 'properties.bootstrap.servers' = '***'," + " 'properties.group.id' = 'real1'," + " '格式' = 'json'," + " 'scan.startup.mode' = '最早的偏移'" + ")"; public static void main(String[] args) 抛出异常 {

//绑定表 StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);

TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);

tableResult.print();

Table table = bsTableEnv.sqlQuery("select * from kafkaTable");

bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);

bsEnv.execute("aa");

}

输出结果如下数据都是空的。数据格式为canal解析的mysql binlog ,order_operation_time ,inventory_batch_log ,order_log ,order_address_book ,product_inventory ,order_physical_relation ,bil_business_attach ,picking_detail ,picking_detail ,订单

1.11版本blink datastream转表吗? 看到的例子都是用OldPlanner来转表的。 致谢*来自志愿者整理的flink邮件归档

展开
收起
小阿怪 2021-12-07 10:03:09 792 0
1 条回答
写回答
取消 提交回答
  • 有kafka 中json 数据的样例不? 有没有任务管理器有没有异常日志信息? 因为“数据”是一个复杂的结构,不是容易的字符串结构。所以1.11至今,这个功能还不支持。 1.12中已经支持读取复杂结构为字符串类型了。*来自志愿者整理的flink邮件归档

    2021-12-07 12:14:02
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
One Box: 解读事务与分析一体化数据库 HybridDB for MySQL 立即下载
One Box:解读事务与分析一体化数据库HybridDB for MySQL 立即下载
如何支撑HTAP场景-HybridDB for MySQL系统架构和技术演进 立即下载

相关镜像