【Azure 事件中心】从Azure Event Hub中消费数据,如何查看当前消费客户端消费数据的Offset和SequenceNumber呢(消息偏移量和序列号)?

简介: 【Azure 事件中心】从Azure Event Hub中消费数据,如何查看当前消费客户端消费数据的Offset和SequenceNumber呢(消息偏移量和序列号)?

问题描述

当通过Azure Event Hub SDK消费Event Hub中的消息时,必须指定一个Storage Account(存储账号)用于保存 Checkpoint (检查点)。

 

比如在C#代码中,需要指定Storage Account Connection String 和一个Blob Container的名称。其他语言代码也是一样,都需要指定Storage Account。

private const string ehubNamespaceConnectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>";
    private const string eventHubName = "<EVENT HUB NAME>";
    private const string blobStorageConnectionString = "<AZURE STORAGE CONNECTION STRING>";
    private const string blobContainerName = "<BLOB CONTAINER NAME>";
...... 
  static async Task Main()
    {
        // Read from the default consumer group: $Default
        string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
        // Create a blob container client that the event processor will use 
        storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
        // Create an event processor client to process events in the event hub
        processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);
        // Register handlers for processing events and handling errors
        processor.ProcessEventAsync += ProcessEventHandler;
        processor.ProcessErrorAsync += ProcessErrorHandler;
        // Start the processing
        await processor.StartProcessingAsync();
        // Wait for 30 seconds for the events to be processed
        await Task.Delay(TimeSpan.FromSeconds(30));
        // Stop the processing
        await processor.StopProcessingAsync();
    }

 

那么,在Storage Account中如何查看Offest 和 Sequence Number的值呢?

 

解答如下

 

如上图,在Azure门户页面中,进入当前使用的Storage Account页面:

  1. 在指定的Container中,SDK会自动创建一个以Event Hub Namespace主机域名为名的Folder
  2. 然后是Event Hub -> 消费组(默认为 $default) --> Checkpint --> 分区号(从0开始,1,2 ...)
  3. 在以分区号为名称的Blob的元数据(Metadata)记录了sequencenumber 和 offset的值。

 

PS: 当删除或修改这个值后,如果重启Event Hub的消费端,这会导致数据从新的OFFSET开始获取。

 

参考资料

Checkpoint: https://docs.azure.cn/zh-cn/event-hubs/event-processor-balance-partition-load#checkpointing

向 Azure 事件中心发送事件及从 Azure 事件中心接收事件 - .NET (Azure.Messaging.EventHubs): https://docs.azure.cn/zh-cn/event-hubs/event-hubs-dotnet-standard-getstarted-send#receive-events

相关文章
|
3月前
|
消息中间件 存储 Kafka
【Azure 事件中心】Flink消费Event Hub中事件, 使用Azure默认示例代码,始终获取新产生的事件,如何消费旧事件呢?
【Azure 事件中心】Flink消费Event Hub中事件, 使用Azure默认示例代码,始终获取新产生的事件,如何消费旧事件呢?
|
3月前
|
监控 Java 开发工具
【事件中心 Azure Event Hub】Event Hub Java SDK的消费端出现不消费某一个分区中数据的情况,出现IdleTimerExpired错误消息记录
【事件中心 Azure Event Hub】Event Hub Java SDK的消费端出现不消费某一个分区中数据的情况,出现IdleTimerExpired错误消息记录
|
3月前
|
存储 Java 开发工具
【Azure 事件中心】Event Hubs如何获取其中存放的历史消息
【Azure 事件中心】Event Hubs如何获取其中存放的历史消息
|
3月前
|
存储
【Azure 事件中心】Event Hubs中存在非常多的错误数据,是否能提前删除这些数据呢?
【Azure 事件中心】Event Hubs中存在非常多的错误数据,是否能提前删除这些数据呢?
|
3月前
|
消息中间件 Java Kafka
【Azure 事件中心】在微软云中国区 (Mooncake) 上实验以Apache Kafka协议方式发送/接受Event Hubs消息 (Java版)
【Azure 事件中心】在微软云中国区 (Mooncake) 上实验以Apache Kafka协议方式发送/接受Event Hubs消息 (Java版)
|
3月前
|
网络安全
【Azure 事件中心】如何查看Event Hub的生产者或者是消费者端的IP地址呢?
【Azure 事件中心】如何查看Event Hub的生产者或者是消费者端的IP地址呢?
|
3月前
|
存储
【Azure 事件中心】EventHub 中同一条消息不停的推送给消费端问题记录
【Azure 事件中心】EventHub 中同一条消息不停的推送给消费端问题记录
|
3月前
|
消息中间件 缓存 Kafka
【Azure 事件中心】使用Kafka消费Azure EventHub中数据,遇见消费慢的情况可以如何来调节呢?
【Azure 事件中心】使用Kafka消费Azure EventHub中数据,遇见消费慢的情况可以如何来调节呢?
|
3月前
|
消息中间件 安全 机器人
【Azure 事件中心】Kafka 生产者发送消息失败,根据失败消息询问机器人得到的分析步骤
【Azure 事件中心】Kafka 生产者发送消息失败,根据失败消息询问机器人得到的分析步骤
|
3月前
|
API
【Azure 服务总线】查看Service Bus中消息多次发送的日志信息,消息是否被重复消费
【Azure 服务总线】查看Service Bus中消息多次发送的日志信息,消息是否被重复消费