开发者社区> 问答> 正文

代码本地ide 能正常执行, 有正常输出,打包成fat-jar包后,提交到yarn-session

代码本地ide 能正常执行, 有正常输出,

打包成fat-jar包后,提交到yarn-session 上执行 报: Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer for configuration key.deserializer: Class org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer could not be found.

请教下是什么原因?

lib目录下文件为: flink-dist_2.11-1.9.1.jar                 flink-sql-connector-kafka-0.10_2.11-1.9.0.jar  flink-sql-connector-kafka_2.11-1.9.0.jar  log4j-1.2.17.jar flink-json-1.9.0-sql-jar.jar flink-sql-connector-kafka-0.11_2.11-1.9.0.jar  flink-table_2.11-1.9.1.jar                slf4j-log4j12-1.7.15.jar flink-shaded-hadoop-2-uber-2.6.5-7.0.jar  flink-sql-connector-kafka-0.9_2.11-1.9.0.jar   flink-table-blink_2.11-1.9.1.jar

代码:

import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
import org.apache.flink.types.Row

object StreamingTable2 extends App{
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
  val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)
  env.setParallelism(2)

  val sourceDDL1 =
    """create table kafka_json_source(
                            `timestamp` BIGINT,
                            id int,
                            name varchar
                          ) with (
                            'connector.type' = 'kafka',
                            'connector.version' = '0.11',
                            'connector.topic' = 'hbtest2',
                            'connector.startup-mode' = 'earliest-offset',
                            'connector.properties.0.key' = 'bootstrap.servers',
                            'connector.properties.0.value' = '192.168.1.160:19092',
                            'connector.properties.1.key' = 'group.id',
                            'connector.properties.1.value' = 'groupId1',
                            'connector.properties.2.key' = 'zookeeper.connect',
                            'connector.properties.2.value' = '192.168.1.160:2181',
                            'update-mode' = 'append',
                            'format.type' = 'json',
                            'format.derive-schema' = 'true'
                          )
    """

  tEnv.sqlUpdate(sourceDDL1)
  tEnv.sqlQuery("select * from kafka_json_source").toAppendStream[Row].print()
  env.execute("table-example2")
}
```*来自志愿者整理的flink邮件归档

展开
收起
彗星halation 2021-12-08 10:16:47 898 0
1 条回答
写回答
取消 提交回答
  • 看着像是包冲突了,你lib包下有4个flink_kafka的jar包,去掉不用的看看呢*来自志愿者整理的flink邮件归档

    2021-12-08 10:46:17
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
D2前端工程下一站 IDE ——上坡&吭头 立即下载
D2前端工程下一站 IDE 立即下载
基于Docker on Yarn系统的微服务实践 立即下载