【Azure事件中心】使用Python SDK(Confluent)相关方法获取offset或lag时提示SSL相关错误

简介: 【Azure事件中心】使用Python SDK(Confluent)相关方法获取offset或lag时提示SSL相关错误

问题描述

使用Python SDK(Confluent)相关方法获取offset或lag时, 提示SSL相关错误, 是否有更清晰的实例以便参考呢?

 

问题解决

执行代码,因为一直连接不成功,所以检查 confluent_kafka 的连接配置,最后定位是 sasl.password 值设置有误。此处,需要使用Event Hub Namespace级别的连接字符串(Connection String).

 

在Event Hub中,获取方式为: (1: Shared access policies ---> 2: RootManageSharedAccessKey or ..----> 3: Connection String )

 

完整的示例代码:

import confluent_kafka
topics = ["<Your_topic_name>"]
broker = "<Eventhub-namespace-name>.servicebus.chinacloudapi.cn:9093"
group_name = "<Consumer-group-name>"
sasl_password = "<Connection-string>"
# Create consumer.
# This consumer will not join the group, but the group.id is required by
# committed() to know which group to get offsets for.
consumer = confluent_kafka.Consumer({'bootstrap.servers': broker,
                                     'security.protocol': 'SASL_SSL',
                                     'sasl.mechanism': 'PLAIN',
                                     'sasl.username': '$ConnectionString',
                                     'sasl.password': sasl_password,
                                     'group.id': group_name})
print("%-50s  %9s  %9s" % ("Topic [Partition]", "Committed", "Lag"))
print("=" * 72)
for topic in topics:
    # Get the topic's partitions
    metadata = consumer.list_topics(topic, timeout=10)
    if metadata.topics[topic].error is not None:
        raise confluent_kafka.KafkaException(metadata.topics[topic].error)
    # Construct TopicPartition list of partitions to query
    partitions = [confluent_kafka.TopicPartition(topic, p) for p in metadata.topics[topic].partitions]
    # Query committed offsets for this group and the given partitions
    committed = consumer.committed(partitions, timeout=10)
    for partition in committed:
        # Get the partitions low and high watermark offsets.
        (lo, hi) = consumer.get_watermark_offsets(partition, timeout=10, cached=False)
        if partition.offset == confluent_kafka.OFFSET_INVALID:
            offset = "-"
        else:
            offset = "%d" % (partition.offset)
        if hi < 0:
            lag = "no hwmark"  # Unlikely
        elif partition.offset < 0:
            # No committed offset, show total message count as lag.
            # The actual message count may be lower due to compaction
            # and record deletions.
            lag = "%d" % (hi - lo)
        else:
            lag = "%d" % (hi - partition.offset)
        print("%-50s  %9s  %9s" % (
            "{} [{}]".format(partition.topic, partition.partition), offset, lag))
consumer.close()

 

参考文档


confluent-kafka-python : https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/list_offsets.py

相关文章
|
22天前
|
人工智能 API 开发工具
【AI大模型】使用Python调用DeepSeek的API,原来SDK是调用这个,绝对的一分钟上手和使用
本文详细介绍了如何使用Python调用DeepSeek的API,从申请API-Key到实现代码层对话,手把手教你快速上手。DeepSeek作为领先的AI大模型,提供免费体验机会,帮助开发者探索其语言生成能力。通过简单示例代码与自定义界面开发,展示了API的实际应用,让对接过程在一分钟内轻松完成,为项目开发带来更多可能。
|
1月前
|
API 开发工具 Python
|
2月前
|
API 开发工具 Python
【Azure Developer】编写Python SDK代码实现从China Azure中VM Disk中创建磁盘快照Snapshot
本文介绍如何使用Python SDK为中国区微软云(China Azure)中的虚拟机磁盘创建快照。通过Azure Python SDK的Snapshot Class,指定`location`和`creation_data`参数,使用`Copy`选项从现有磁盘创建快照。代码示例展示了如何配置Default Azure Credential,并设置特定于中国区Azure的`base_url`和`credential_scopes`。参考资料包括官方文档和相关API说明。
|
2月前
|
安全 网络协议 搜索推荐
【荐】免费一年SSL证书申请方法全攻略
锁图 申请免费一年SSL证书的优势包括:提升网站安全性,避免中间人攻击;增强用户信任感;改善SEO排名;降低安全成本。申请流程如下: 1. 访问JoySSL官网选择免费证书套餐。 2. 填写注册信息并输入注册码230922。 3. 验证域名所有权,通常通过电子邮件或DNS设置中的TXT记录。 4. 下载并安装证书到服务器。 5. 检查浏览器地址栏是否显示安全锁图标及“https”。 大部分免费SSL证书有效期为一年,到期后需重新申请或升级付费版。通过上述步骤,您可轻松为网站启用免费SSL证书,保障数据安全并提升用户体验。
|
1月前
|
存储 XML 开发工具
【Azure Storage Account】利用App Service作为反向代理, 并使用.NET Storage Account SDK实现上传/下载操作
本文介绍了如何在Azure上使用App Service作为反向代理,以自定义域名访问Storage Account。主要内容包括: 1. **设置反向代理**:通过配置`applicationhost.xdt`和`web.config`文件,启用IIS代理功能并设置重写规则。 2. **验证访问**:测试原生URL和自定义域名的访问效果,确保两者均可正常访问Storage Account。 3. **.NET SDK连接**:使用共享访问签名(SAS URL)初始化BlobServiceClient对象,实现通过自定义域名访问存储服务。
|
4月前
|
网络协议 应用服务中间件 网络安全
IP申请SSL证书的条件和方法
为IP地址申请SSL证书与域名证书流程不同,主要因SSL基于域名验证。部分CA允许为公有或私有IP地址申请证书,需满足拥有IP所有权、支持单IP或自签名证书、IP可公开访问及符合CA政策等条件。申请步骤包括访问CA官网、选择证书类型、提交申请、验证所有权并安装证书。替代方案是使用自签名证书,适合内部网络或开发环境。
|
4月前
|
弹性计算 安全 开发工具
灵码评测-阿里云提供的ECS python3 sdk做安全组管理
批量变更阿里云ECS安全组策略(批量变更)
|
5月前
|
Java 开发工具 Windows
【Azure App Service】在App Service中调用Stroage SDK上传文件时遇见 System.OutOfMemoryException
System.OutOfMemoryException: Exception of type 'System.OutOfMemoryException' was thrown.
|
6月前
|
JavaScript 前端开发 开发工具
【Azure Developer】使用JavaScript通过SDK进行monitor-query的client认证报错问题
AADSTS90002: Tenant 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx' not found. Check to make sure you have the correct tenant ID and are signing into the correct cloud. Check with your subscription administrator, this may happen if there are no active subscriptions for the tenant.
|
2月前
|
前端开发 Java Shell
【08】flutter完成屏幕适配-重建Android,增加GetX路由,屏幕适配,基础导航栏-多版本SDK以及gradle造成的关于fvm的使用(flutter version manage)-卓伊凡换人优雅草Alex-开发完整的社交APP-前端客户端开发+数据联调|以优雅草商业项目为例做开发-flutter开发-全流程-商业应用级实战开发-优雅草Alex
【08】flutter完成屏幕适配-重建Android,增加GetX路由,屏幕适配,基础导航栏-多版本SDK以及gradle造成的关于fvm的使用(flutter version manage)-卓伊凡换人优雅草Alex-开发完整的社交APP-前端客户端开发+数据联调|以优雅草商业项目为例做开发-flutter开发-全流程-商业应用级实战开发-优雅草Alex
225 20
【08】flutter完成屏幕适配-重建Android,增加GetX路由,屏幕适配,基础导航栏-多版本SDK以及gradle造成的关于fvm的使用(flutter version manage)-卓伊凡换人优雅草Alex-开发完整的社交APP-前端客户端开发+数据联调|以优雅草商业项目为例做开发-flutter开发-全流程-商业应用级实战开发-优雅草Alex

热门文章

最新文章