【Azure 事件中心】开启 Apache Flink 制造者 Producer 示例代码中的日志输出 (连接 Azure Event Hub Kafka 终结点)

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
日志服务 SLS,月写入数据量 50GB 1个月
简介: 【Azure 事件中心】开启 Apache Flink 制造者 Producer 示例代码中的日志输出 (连接 Azure Event Hub Kafka 终结点)

问题描述

Azure Event Hub 在标准版以上就默认启用的Kafka终结点,所以可以通过Apache Kafka协议连接到Event Hub进行消息的生产和消费。通过示例代码下载到本地运行后,发现没有 Kafka Producer 的详细日志输出。当查看SDK源码中,发现使用的是 org.slf4j.Logger 输出日志,如:

 

但是,当运行 Producer 代码后,得到的输出取没有包含连接的详细信息,对出现连接问题的Debug没有任何帮助。

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Test Data #0 from thread #18
org.apache.kafka.common.errors.IllegalSaslStateException: Invalid SASL mechanism response, server may be expecting a different protocol

那么如何来输出更加详细的日志呢?

 

问题解决

根据日志显示, SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". 明确指出是因为没有加载到 org.slf4j.impl.StaticLoggerBinder 类,因为在程序执行的过程中,必须提供实际的日志记录实现,否则SLF4J讲忽略所有日志信息,SLF4J API 通过 SLF4J 绑定与实际的日志记录实现进行通信Log4j。所以需要在pom.xml中引入 org.slf4j 的相关依赖。

 

在pom.xml中加入

<dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.7.25</version>
  </dependency>
  <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.25</version>
  </dependency>
  <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.17</version>
  </dependency>
  <dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>1.0.13</version>

然后,添加上log4j的配置文件,在resources文件夹下添加名为 log4j.properties文件,内容为:

# Root logger option
log4j.rootLogger=INFO, stdout
# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1} - %m%n

修改后的文件内容如截图所示:

 

 

修改完成,运行得到完整日志

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/C:/Users/.m2/repository/org/slf4j/slf4j-log4j12/1.7.25/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/C:/Users/.m2/repository/ch/qos/logback/logback-classic/1.0.13/logback-classic-1.0.13.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2022-01-11 20:03:12 INFO  ProducerConfig - ProducerConfig values: 
        acks = 1
        batch.size = 16384
        bootstrap.servers = [testeventxxxxxx.servicebus.chinacloudapi.cn:9093]
        buffer.memory = 33554432
        client.id = KafkaExampleProducer
        compression.type = none
        connections.max.idle.ms = 540000
        enable.idempotence = false
        interceptor.classes = null
        key.serializer = class org.apache.kafka.common.serialization.LongSerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        security.protocol = SASL_SSL
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class org.apache.kafka.common.serialization.StringSerializer
2022-01-11 20:03:16 INFO  AbstractLogin - Successfully logged in.
2022-01-11 20:03:17 INFO  AppInfoParser - Kafka version : 1.0.0
2022-01-11 20:03:17 INFO  AppInfoParser - Kafka commitId : aaa7af6d4a11b29d
2022-01-11 20:03:21 INFO  TestProducer - test java logs  : info
2022-01-11 20:03:21 ERROR TestProducer - test java logs  : error
2022-01-11 20:03:21 WARN  TestProducer - test java logs  : warn
Test Data #0 from thread #18
2022-01-11 20:03:22 ERROR NetworkClient - [Producer clientId=KafkaExampleProducer] Connection to node -1 failed authentication due to: Invalid SASL mechanism response, server may be expecting a different protocol
org.apache.kafka.common.errors.IllegalSaslStateException: Invalid SASL mechanism response, server may be expecting a different protocol
org.apache.kafka.clients.producer.KafkaProducer@47dec663

 

 

参考资料

Slf4j Configuration File Examplehttps://examples.javacodegeeks.com/enterprise-java/slf4j/slf4j-configuration-file-example/

将 Apache Flink 与适用于 Apache Kafka 的 Azure 事件中心配合使用: https://docs.azure.cn/zh-cn/event-hubs/event-hubs-kafka-flink-tutorial

在微软云中国区 (Mooncake) 上实验以Apache Kafka协议方式发送/接受Event Hubs消息 (Java版) : https://www.cnblogs.com/lulight/p/14375190.html

 

相关文章
|
26天前
|
人工智能 运维 Java
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
本文基于Apache Flink PMC成员宋辛童在Community Over Code Asia 2025的演讲,深入解析Flink Agents项目的技术背景、架构设计与应用场景。该项目聚焦事件驱动型AI智能体,结合Flink的实时处理能力,推动AI在工业场景中的工程化落地,涵盖智能运维、直播分析等典型应用,展现其在AI发展第四层次——智能体AI中的重要意义。
329 27
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
|
10月前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
690 33
The Past, Present and Future of Apache Flink
|
9月前
|
存储 监控 安全
实时记录和查看Apache 日志
Apache 是一个开源、跨平台的 Web 服务器,保护其平台需监控活动和事件。Apache 日志分为访问日志和错误日志,分别记录用户请求和服务器错误信息。EventLog Analyzer 是一款强大的日志查看工具,提供集中收集、分析、实时警报和安全监控功能,帮助管理员识别趋势、检测威胁并确保合规性。通过直观的仪表板和自动化响应,它简化了大规模日志管理,增强了 Apache 服务器的安全性和性能。
174 5
|
10月前
|
监控 安全 Apache
什么是Apache日志?为什么Apache日志分析很重要?
Apache是全球广泛使用的Web服务器软件,支持超过30%的活跃网站。它通过接收和处理HTTP请求,与后端服务器通信,返回响应并记录日志,确保网页请求的快速准确处理。Apache日志分为访问日志和错误日志,对提升用户体验、保障安全及优化性能至关重要。EventLog Analyzer等工具可有效管理和分析这些日志,增强Web服务的安全性和可靠性。
262 9
|
4月前
|
人工智能 运维 监控
Aipy实战:分析apache2日志中的网站攻击痕迹
Apache2日志系统灵活且信息全面,但安全分析、实时分析和合规性审计存在较高技术门槛。为降低难度,可借助AI工具如aipy高效分析日志,快速发现攻击痕迹并提供反制措施。通过结合AI与学习技术知识,新手运维人员能更轻松掌握复杂日志分析任务,提升工作效率与技能水平。
|
7月前
|
SQL 存储 人工智能
Apache Flink 2.0.0: 实时数据处理的新纪元
Apache Flink 2.0.0 正式发布!这是自 Flink 1.0 发布九年以来的首次重大更新,凝聚了社区两年的努力。此版本引入分离式状态管理、物化表、流批统一等创新功能,优化云原生环境下的资源利用与性能表现,并强化了对人工智能工作流的支持。同时,Flink 2.0 对 API 和配置进行了全面清理,移除了过时组件,为未来的发展奠定了坚实基础。感谢 165 位贡献者的辛勤付出,共同推动实时计算进入新纪元!
823 1
Apache Flink 2.0.0: 实时数据处理的新纪元
|
6月前
|
监控 安全 BI
优化 Apache 日志记录的 5 个最佳实践
Apache 日志记录对于维护系统运行状况和网络安全至关重要,其核心包括访问日志与错误日志的管理。通过制定合理的日志策略,如选择合适的日志格式、利用条件日志减少冗余、优化日志级别、使用取证模块提升安全性及实施日志轮换,可有效提高日志可用性并降低系统负担。此外,借助 Eventlog Analyzer 等专业工具,能够实现日志的高效收集、可视化分析与威胁检测,从而精准定位安全隐患、评估服务器性能,并满足合规需求,为强化网络安全提供有力支持。
141 0
优化 Apache 日志记录的 5 个最佳实践
|
9月前
|
消息中间件 网络协议 Java
【Azure Event Hub】Kafka消息发送失败(Timeout Exception)
Azure closes inbound Transmission Control Protocol (TCP) idle > 240,000 ms, which can result in sending on dead connections (shown as expired batches because of send timeout).
221 75
|
9月前
|
存储 运维 监控
金融场景 PB 级大规模日志平台:中信银行信用卡中心从 Elasticsearch 到 Apache Doris 的先进实践
中信银行信用卡中心每日新增日志数据 140 亿条(80TB),全量归档日志量超 40PB,早期基于 Elasticsearch 构建的日志云平台,面临存储成本高、实时写入性能差、文本检索慢以及日志分析能力不足等问题。因此使用 Apache Doris 替换 Elasticsearch,实现资源投入降低 50%、查询速度提升 2~4 倍,同时显著提高了运维效率。
385 3
金融场景 PB 级大规模日志平台:中信银行信用卡中心从 Elasticsearch 到 Apache Doris 的先进实践

推荐镜像

更多