【Azure事件中心】使用Python SDK(Confluent)相关方法获取offset或lag时提示SSL相关错误

简介: 【Azure事件中心】使用Python SDK(Confluent)相关方法获取offset或lag时提示SSL相关错误

问题描述

使用Python SDK(Confluent)相关方法获取offset或lag时, 提示SSL相关错误, 是否有更清晰的实例以便参考呢?

 

问题解决

执行代码,因为一直连接不成功,所以检查 confluent_kafka 的连接配置,最后定位是 sasl.password 值设置有误。此处,需要使用Event Hub Namespace级别的连接字符串(Connection String).

 

在Event Hub中,获取方式为: (1: Shared access policies ---> 2: RootManageSharedAccessKey or ..----> 3: Connection String )

 

完整的示例代码:

import confluent_kafka
topics = ["<Your_topic_name>"]
broker = "<Eventhub-namespace-name>.servicebus.chinacloudapi.cn:9093"
group_name = "<Consumer-group-name>"
sasl_password = "<Connection-string>"
# Create consumer.
# This consumer will not join the group, but the group.id is required by
# committed() to know which group to get offsets for.
consumer = confluent_kafka.Consumer({'bootstrap.servers': broker,
                                     'security.protocol': 'SASL_SSL',
                                     'sasl.mechanism': 'PLAIN',
                                     'sasl.username': '$ConnectionString',
                                     'sasl.password': sasl_password,
                                     'group.id': group_name})
print("%-50s  %9s  %9s" % ("Topic [Partition]", "Committed", "Lag"))
print("=" * 72)
for topic in topics:
    # Get the topic's partitions
    metadata = consumer.list_topics(topic, timeout=10)
    if metadata.topics[topic].error is not None:
        raise confluent_kafka.KafkaException(metadata.topics[topic].error)
    # Construct TopicPartition list of partitions to query
    partitions = [confluent_kafka.TopicPartition(topic, p) for p in metadata.topics[topic].partitions]
    # Query committed offsets for this group and the given partitions
    committed = consumer.committed(partitions, timeout=10)
    for partition in committed:
        # Get the partitions low and high watermark offsets.
        (lo, hi) = consumer.get_watermark_offsets(partition, timeout=10, cached=False)
        if partition.offset == confluent_kafka.OFFSET_INVALID:
            offset = "-"
        else:
            offset = "%d" % (partition.offset)
        if hi < 0:
            lag = "no hwmark"  # Unlikely
        elif partition.offset < 0:
            # No committed offset, show total message count as lag.
            # The actual message count may be lower due to compaction
            # and record deletions.
            lag = "%d" % (hi - lo)
        else:
            lag = "%d" % (hi - partition.offset)
        print("%-50s  %9s  %9s" % (
            "{} [{}]".format(partition.topic, partition.partition), offset, lag))
consumer.close()

 

参考文档


confluent-kafka-python : https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/list_offsets.py

相关文章
|
7月前
|
搜索推荐 API 开发工具
百宝箱开放平台 ✖️ Python SDK
百宝箱提供Python SDK,支持开发者集成其开放能力。需先发布应用,安装Python 3.6+环境后,通过pip安装tboxsdk,即可调用对话型、生成型智能体及文件上传等功能。
940 87
百宝箱开放平台 ✖️  Python SDK
|
SQL 自然语言处理 数据库
【Azure Developer】分享两段Python代码处理表格(CSV格式)数据 : 根据每列的内容生成SQL语句
本文介绍了使用Python Pandas处理数据收集任务中格式不统一的问题。针对两种情况:服务名对应多人拥有状态(1/0表示),以及服务名与人名重复列的情况,分别采用双层for循环和字典数据结构实现数据转换,最终生成Name对应的Services列表(逗号分隔)。此方法高效解决大量数据的人工处理难题,减少错误并提升效率。文中附带代码示例及执行结果截图,便于理解和实践。
343 5
|
9月前
|
JavaScript 前端开发 机器人
【Azure Bot Service】在中国区Azure上部署机器人的 Python 版配置
本文介绍了在中国区Azure上使用Python SDK配置Azure Bot Service时遇到的问题及解决方案,涵盖参数设置与适配器配置,适用于希望在Azure中国区部署Python机器人的开发者。
247 8
|
API 开发工具 网络架构
【Azure Service Bus】使用Python SDK创建Service Bus Namespace资源(中国区)
本文介绍了如何使用Python SDK创建Azure Service Bus Namespace资源。首先,通过Microsoft Entra ID注册应用获取Client ID、Client Secret和Tenant ID,完成中国区Azure认证。接着,初始化ServiceBusManagementClient对象,并调用`begin_create_or_update`方法创建资源。
292 29
|
12月前
|
安全 网络安全 Python
应对requests 2.28.x版本中SSL证书验证失败的错误处理方法。
在你安全地解决了这一切之后,你的请求就能够在HTTPS的加持下,犹如骑士横扫战场,高枕无忧地传递信息了。记住,网络的世界里,安全永远是你的头号任务。遵循正道,才能让你的代码在数据的草原上,驰骋千里。
560 16
|
存储 监控 API
【Azure App Service】分享使用Python Code获取App Service的服务器日志记录管理配置信息
本文介绍了如何通过Python代码获取App Service中“Web服务器日志记录”的配置状态。借助`azure-mgmt-web` SDK,可通过初始化`WebSiteManagementClient`对象、调用`get_configuration`方法来查看`http_logging_enabled`的值,从而判断日志记录是否启用及存储方式(关闭、存储或文件系统)。示例代码详细展示了实现步骤,并附有执行结果与官方文档参考链接,帮助开发者快速定位和解决问题。
355 22
|
API 开发工具 Python
|
人工智能 API 开发工具
【AI大模型】使用Python调用DeepSeek的API,原来SDK是调用这个,绝对的一分钟上手和使用
本文详细介绍了如何使用Python调用DeepSeek的API,从申请API-Key到实现代码层对话,手把手教你快速上手。DeepSeek作为领先的AI大模型,提供免费体验机会,帮助开发者探索其语言生成能力。通过简单示例代码与自定义界面开发,展示了API的实际应用,让对接过程在一分钟内轻松完成,为项目开发带来更多可能。
|
8月前
|
数据采集 机器学习/深度学习 人工智能
Python:现代编程的首选语言
Python:现代编程的首选语言
1346 102
|
8月前
|
数据采集 机器学习/深度学习 算法框架/工具
Python:现代编程的瑞士军刀
Python:现代编程的瑞士军刀
475 104

热门文章

最新文章

推荐镜像

更多