1. 通过配置项 taskmanager.memory.task.off-heap.size 增大 TM 的 direct memory 大小
2. 如果确认是 Kafka 证书问题,则需要修复 kafka 配置,如下
SSL 根证书下载(
https://help.aliyun.com/document_detail/68325.html#section-dge-h2n-pf3) 上传到附加依赖中,添加以下 with 参数
'properties.ssl.endpoint.identification.algorithm' = '',
'properties.security.protocol' = 'SASL_SSL',
'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore.jks',
'properties.sasl.mechanism' = 'SCRAM-SHA-256',
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username=""***"" password=""***"";'
注意:
● 如果 properties.sasl.mechanism 是 SCRAM-SHA-256 的话,properties.sasl.jaas.config 用org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule 这个
● 如果 properties.sasl.mechanism 是 PLAINTEXT 的话,properties.sasl.jaas.config 用 org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule 这个