开发者社区> 问答> 正文

Sql Client读取Kafka报错

lib 下的Jar flink-csv-1.11.3.jar flink-table-blink_2.11-1.11.3.jar flink-dist_2.11-1.11.3.jar flink-table_2.11-1.11.3.jar flink-jdbc_2.11-1.11.3.jar log4j-1.2-api-2.12.1.jar flink-json-1.11.3.jar log4j-api-2.12.1.jar flink-shaded-zookeeper-3.4.14.jar log4j-core-2.12.1.jar flink-sql-connector-elasticsearch6_2.11-1.11.3.jar log4j-slf4j-impl-2.12.1.jar flink-sql-connector-kafka_2.11-1.11.3.jar mysql-connector-java-5.1.48.jar

flink bin/sql-client.sh embedded

CREATE TABLE user_behavior ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3), proctime as PROCTIME(), -- 通过计算列产生一个处理时间列 WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- 在ts上定义watermark,ts成为事件时间列

) WITH ( 'connector' = 'kafka', -- 使用 kafka connector 'topic' = 'data_test', -- kafka topic 'startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取 'properties.bootstrap.servers' = 'localhost:9092', -- kafka broker 地址 'format' = 'json' -- 数据源格式为 json );*来自志愿者整理的flink邮件归档

展开
收起
毛毛虫雨 2021-12-08 11:35:43 602 0
1 条回答
写回答
取消 提交回答
  • 下载个 flink-sql-connector-kafka 这个jar 放在lib下试下*来自志愿者整理的flink邮件归档

    2021-12-08 16:13:38
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
SQL Server 2017 立即下载
GeoMesa on Spark SQL 立即下载
原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载