【Azure事件中心】使用Python SDK(Confluent)相关方法获取offset或lag时提示SSL相关错误

简介: 【Azure事件中心】使用Python SDK(Confluent)相关方法获取offset或lag时提示SSL相关错误

问题描述

使用Python SDK(Confluent)相关方法获取offset或lag时, 提示SSL相关错误, 是否有更清晰的实例以便参考呢?

 

问题解决

执行代码,因为一直连接不成功,所以检查 confluent_kafka 的连接配置,最后定位是 sasl.password 值设置有误。此处,需要使用Event Hub Namespace级别的连接字符串(Connection String).

 

在Event Hub中,获取方式为: (1: Shared access policies ---> 2: RootManageSharedAccessKey or ..----> 3: Connection String )

 

完整的示例代码:

import confluent_kafka
topics = ["<Your_topic_name>"]
broker = "<Eventhub-namespace-name>.servicebus.chinacloudapi.cn:9093"
group_name = "<Consumer-group-name>"
sasl_password = "<Connection-string>"
# Create consumer.
# This consumer will not join the group, but the group.id is required by
# committed() to know which group to get offsets for.
consumer = confluent_kafka.Consumer({'bootstrap.servers': broker,
                                     'security.protocol': 'SASL_SSL',
                                     'sasl.mechanism': 'PLAIN',
                                     'sasl.username': '$ConnectionString',
                                     'sasl.password': sasl_password,
                                     'group.id': group_name})
print("%-50s  %9s  %9s" % ("Topic [Partition]", "Committed", "Lag"))
print("=" * 72)
for topic in topics:
    # Get the topic's partitions
    metadata = consumer.list_topics(topic, timeout=10)
    if metadata.topics[topic].error is not None:
        raise confluent_kafka.KafkaException(metadata.topics[topic].error)
    # Construct TopicPartition list of partitions to query
    partitions = [confluent_kafka.TopicPartition(topic, p) for p in metadata.topics[topic].partitions]
    # Query committed offsets for this group and the given partitions
    committed = consumer.committed(partitions, timeout=10)
    for partition in committed:
        # Get the partitions low and high watermark offsets.
        (lo, hi) = consumer.get_watermark_offsets(partition, timeout=10, cached=False)
        if partition.offset == confluent_kafka.OFFSET_INVALID:
            offset = "-"
        else:
            offset = "%d" % (partition.offset)
        if hi < 0:
            lag = "no hwmark"  # Unlikely
        elif partition.offset < 0:
            # No committed offset, show total message count as lag.
            # The actual message count may be lower due to compaction
            # and record deletions.
            lag = "%d" % (hi - lo)
        else:
            lag = "%d" % (hi - partition.offset)
        print("%-50s  %9s  %9s" % (
            "{} [{}]".format(partition.topic, partition.partition), offset, lag))
consumer.close()

 

参考文档


confluent-kafka-python : https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/list_offsets.py

相关文章
|
20天前
|
NoSQL 安全 网络安全
【Azure Redis】PHPRedis遇见SSL Connection Timeout问题
【Azure Redis】PHPRedis遇见SSL Connection Timeout问题
【Azure Redis】PHPRedis遇见SSL Connection Timeout问题
|
16天前
|
数据采集 开发工具 Python
海康威视工业相机SDK+Python+PyQt开发数据采集系统(支持软件触发、编码器触发)
该系统基于海康威视工业相机SDK,使用Python与PyQt开发,支持Gige与USB相机设备的搜索及双相机同时显示。系统提供软件触发与编码器触发模式,并可在数据采集过程中实时保存图像。此外,用户可以调节曝光时间和增益,并进行信息输入,这些信息将被保存至配置文件以便下次自动加载。参数调节与实时预览等功能进一步增强了系统的实用性。
40 1
|
20天前
|
存储 API 开发工具
【Azure Storage Blob】如何通过.NET Azure Storage Blobs SDK获取到Blob的MD5值呢?
【Azure Storage Blob】如何通过.NET Azure Storage Blobs SDK获取到Blob的MD5值呢?
|
20天前
|
API 开发工具 网络架构
【Azure Developer】使用Python SDK去Azure Container Instance服务的Execute命令的疑问解释
【Azure Developer】使用Python SDK去Azure Container Instance服务的Execute命令的疑问解释
【Azure Developer】使用Python SDK去Azure Container Instance服务的Execute命令的疑问解释
|
20天前
|
安全 网络安全 Windows
【Azure App Service】遇见az命令访问HTTPS App Service 时遇见SSL证书问题,暂时跳过证书检查的办法
【Azure App Service】遇见az命令访问HTTPS App Service 时遇见SSL证书问题,暂时跳过证书检查的办法
【Azure App Service】遇见az命令访问HTTPS App Service 时遇见SSL证书问题,暂时跳过证书检查的办法
|
20天前
|
安全 前端开发 网络安全
【Azure App Service】访问App Service应用报错 SSL: WRONG_VERSION_NUMBER
【Azure App Service】访问App Service应用报错 SSL: WRONG_VERSION_NUMBER
|
20天前
|
开发工具 iOS开发 容器
【Azure Blob】关闭Blob 匿名访问,iOS Objective-C SDK连接Storage Account报错
【Azure Blob】关闭Blob 匿名访问,iOS Objective-C SDK连接Storage Account报错
|
20天前
|
消息中间件 开发工具
【Azure Service Bus】Service Bus SDK 抛出 ERROR c.a.c.a.i.ActiveClientTokenManager - Error is transient. Rescheduling authorization task at interval 1079000 ms.
【Azure Service Bus】Service Bus SDK 抛出 ERROR c.a.c.a.i.ActiveClientTokenManager - Error is transient. Rescheduling authorization task at interval 1079000 ms.
|
20天前
|
存储 Linux 网络安全
【Azure App Service】.NET代码实验App Service应用中获取TLS/SSL 证书 (App Service Linux/Linux Container)
【Azure App Service】.NET代码实验App Service应用中获取TLS/SSL 证书 (App Service Linux/Linux Container)
|
20天前
|
网络安全 API 数据安全/隐私保护
【Azure App Service】.NET代码实验App Service应用中获取TLS/SSL 证书 (App Service Windows)
【Azure App Service】.NET代码实验App Service应用中获取TLS/SSL 证书 (App Service Windows)