Flink CDC on yarn时,代码里使用了kafka producer,如果用模式提交,会?

Flink CDC on yarn时,代码里使用了kafka producer,如果用application模式提交,会遇到StringDeserializer not found的问题,如果用per-job模式提交就正常,这是啥情况?

展开
收起
真的很搞笑 2023-06-11 15:27:11 80 分享 版权
阿里云 AI 助理回答

在使用Flink CDC on YARN时,如果通过Application模式提交作业时遇到StringDeserializer not found的问题,而通过Per-Job模式提交时正常,这通常与Flink的类加载机制和依赖管理方式有关。以下是详细分析和解决方案:


1. 问题原因分析

1.1 Application模式与Per-Job模式的区别

  • Application模式:每次提交一个Flink Application时,YARN会为该Application启动一个独立的Flink集群。在这种模式下,用户代码(包括Kafka Producer相关的依赖)会被打包到Flink集群中运行。
  • Per-Job模式:每次提交作业时,Flink会启动一个独立的JobManager和TaskManager,但客户端仍然负责部分依赖的加载和分发。

由于Application模式下,Flink集群是动态创建的,所有依赖必须显式地包含在用户代码的JAR包中,否则可能会导致类加载失败。

1.2 StringDeserializer not found的具体原因

  • Kafka Producer依赖的StringDeserializer类属于Kafka客户端库的一部分。如果在Application模式下提交作业时,Kafka客户端库未正确打包到用户JAR中,或者未被正确加载,就会出现StringDeserializer not found的错误。
  • 在Per-Job模式下,客户端可能已经包含了Kafka客户端库,因此不会出现此类问题。

2. 解决方案

2.1 确保Kafka客户端依赖被打包到用户JAR中

在Application模式下,所有依赖必须被打包到用户JAR中。可以通过以下方式解决:

  1. 使用FAT JAR

    • 使用Maven或Gradle构建工具,将Kafka客户端依赖打包到用户JAR中。
    • 示例Maven配置:
      <build>
       <plugins>
           <plugin>
               <groupId>org.apache.maven.plugins</groupId>
               <artifactId>maven-shade-plugin</artifactId>
               <version>3.2.4</version>
               <executions>
                   <execution>
                       <phase>package</phase>
                       <goals>
                           <goal>shade</goal>
                       </goals>
                       <configuration>
                           <filters>
                               <filter>
                                   <artifact>*:*</artifact>
                                   <excludes>
                                       <exclude>META-INF/*.SF</exclude>
                                       <exclude>META-INF/*.DSA</exclude>
                                       <exclude>META-INF/*.RSA</exclude>
                                   </excludes>
                               </filter>
                           </filters>
                       </configuration>
                   </execution>
               </executions>
           </plugin>
       </plugins>
      </build>
      
    • 构建完成后,生成的FAT JAR会包含所有依赖。
  2. 手动添加依赖

    • 如果不使用FAT JAR,可以将Kafka客户端库(如kafka-clients-x.x.x.jar)手动上传到Flink集群的lib目录中,并重启集群。

2.2 检查类加载冲突

  • 如果Kafka客户端库已经正确打包,但仍出现StringDeserializer not found的问题,可能是类加载冲突导致的。
  • 解决方法:
    • 确保Flink和Kafka客户端库的版本兼容。
    • flink-conf.yaml中设置以下参数以调整类加载策略:
    classloader.resolve-order: parent-first
    

2.3 验证依赖是否正确加载

  • 提交作业后,可以通过以下命令检查Flink集群的日志,确认Kafka客户端库是否被正确加载:
    yarn logs -applicationId <application_XXXX_YY>
    
  • 如果日志中显示ClassNotFoundExceptionNoClassDefFoundError,说明依赖未正确加载。

3. 注意事项

  • 依赖冲突:Flink和Kafka客户端库可能存在依赖冲突,建议使用maven-shade-plugin对冲突的依赖进行重命名(Shading)。
  • 版本兼容性:确保使用的Kafka客户端版本与Flink版本兼容。例如,Flink 1.15及以上版本推荐使用Kafka 2.8及以上版本的客户端库。
  • 调试建议:在本地运行Flink作业时,可以使用--classloader parent-first参数模拟YARN环境下的类加载行为,提前发现问题。

4. 总结

通过上述分析可知,StringDeserializer not found的问题主要是由于Application模式下依赖未正确打包或加载导致的。通过构建FAT JAR、调整类加载策略以及验证依赖加载情况,可以有效解决该问题。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答

大数据领域前沿技术分享与交流,这里不止有技术干货、学习心得、企业实践、社区活动,还有未来。

还有其他疑问?
咨询AI助理