开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink1.18.0版本连接外部kafka本地执行测试代码报错

版本:Flink1.18.0
代码如下:```
public static void main(String[] args) {

    //1、创建TableEnvironment
    EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
    TableEnvironment tEnv = TableEnvironment.create(settings);

    //2、创建source  table
    Schema schema = Schema.newBuilder()
            .column("user", DataTypes.STRING())
            .column("url",DataTypes.STRING())
            .column("cTime", DataTypes.STRING())
            .build();
    tEnv.createTemporaryTable( "sourceTable", TableDescriptor.forConnector("kafka")
            .schema(schema)
            .option("topic","spring_test_kafka_topic")
            .option("properties.bootstrap.servers","xxxxx:9092")
            .option("properties.group.id","clicklog")
            .option("scan.startup.mode","earliest-offset")
            .option("format","json")
            .build()
    );

    //3、创建sink  table
    tEnv.createTemporaryTable("sinkTable",
            TableDescriptor.forConnector("print").schema(schema).build());

    //4、执行sql查询
    //Table resultTable = tEnv.sqlQuery(" select ,userurl,cTime from sourceTable ");
    Table resultTable = tEnv.from("sourceTable")
            .select($("user"), $("url"),$("cTime"));

    //5、输出
    resultTable.executeInsert("sinkTable");
}

```

操作流程:运行代码报错:image.png
image.png
请问是什么原因呢,请大佬指点

展开
收起
nhwop6pvkcwic 2023-12-29 11:31:21 143 0
2 条回答
写回答
取消 提交回答
  • 是引入的依赖包造成的,刚开始参考官方文档引入kafka依赖时,项目的pom文件引入的是flink-connector-kafka,而不是flink-sql-connector-kafka,修改pom文件如下即可,同时要将对应的jar包上传到flink集群各节点的lib包下并重启集群image.png

    此外,kafka上的主题名称不要使用下划线,否则会报类似如下的错误:image.png

    2024-01-03 17:41:12
    赞同 展开评论 打赏
  • 天下风云出我辈,一入江湖岁月催,皇图霸业谈笑中,不胜人生一场醉。

    该错误通常是由于您在连接外部Kafka时没有正确配置 connectors 所致。具体来说,这个错误是由于在创建 Kafka Connector 时,缺少必要的配置项导致的。另外,可能是由于字段名不正确导致的报错。在这种情况下,您需要检查您的代码,确保字段名在 MySQL CDC 表中定义正确。可以通过 scan.mode 参数设置 Paimon 源表的消费位点。如果您的问题与 Paimon 源表的消费位点相关,您可以尝试更改 scan.mode 参数以解决问题。https://help.aliyun.com/zh/flink/support/faq-about-upstream-and-downstream-storage
    image.png
    ---来自实时计算 Flink版产品文档

    2023-12-29 14:23:32
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    移动互联网测试到质量的转变 立即下载
    给ITer的技术实战进阶课-阿里CIO学院独家教材(四) 立即下载
    F2etest — 多浏览器兼容性测试整体解决方案 立即下载