【Azure 事件中心】使用Azure AD认证方式创建Event Hub Consume Client + 自定义Event Position

简介: 【Azure 事件中心】使用Azure AD认证方式创建Event Hub Consume Client + 自定义Event Position

问题描述

当使用SDK连接到Azure Event Hub时,最常规的方式为使用连接字符串。这种做法参考官网文档就可成功完成代码:https://docs.azure.cn/zh-cn/event-hubs/event-hubs-java-get-started-send

只是,如果使用Azure AD认证方式进行访问,代码需要如何修改呢? 如何来使用AAD的 TokenCredential呢?

 

分析问题

在使用Connection String的时候,EventProcessorClientBuilder使用connectionString方法配置连接字符串。

 

如果使用Azure AD认证,则需要先根据AAD中注册应用获取到Client ID, Tenant ID, Client Secret,然后把这些内容设置为系统环境变量

  1. AZURE_TENANT_ID :对应AAD 注册应用页面的 Tenant ID
  2. AZURE_CLIENT_ID :对应AAD 注册应用页面的 Application (Client) ID
  3. AZURE_CLIENT_SECRET :对应AAD 注册应用的 Certificates & secrets 中创建的Client Secrets

然后使用 credential 初始化 EventProcessorClientBuilder 对象

注意点:

1) DefaultAzureCredentialBuilder 需要指定 Authority Host为 Azure China

2) EventProcessorClientBuilder . Credential 方法需要指定Event Hub Namespce 的域名

 

操作实现

第一步:为AAD注册应用赋予操作Event Hub Data的权限

Azure 提供了以下 Azure 内置角色,用于通过 Azure AD 和 OAuth 授予对事件中心数据的访问权限:

  1. Azure 事件中心数据所有者 (Azure Event Hubs Data Owner): 使用此角色可以授予对事件中心资源的完全访问权限。
  2. Azure 事件中心数据发送者 (Azure Event Hubs Data Sender) : 使用此角色可以授予对事件中心资源的发送访问权限。
  3. Azure 事件中心数据接收者 (Azure Event Hubs Data Receiver): 使用此角色可以授予对事件中心资源的使用/接收访问权限。

本实例中,只需要接收数据,所以只赋予了 Azure Event Hubs Data Receiver权限。

 

第二步:在Java 项目中添加SDK依赖  

添加在pom.xml文件中 dependencies 部分的内容为:azure-identity , azure-messaging-eventhubs , azure-messaging-eventhubs-checkpointstore-blob,最好都是用最新版本,避免出现运行时出现类型冲突或找不到

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.example</groupId>
  <artifactId>spdemo</artifactId>
  <version>1.0-SNAPSHOT</version>
  <name>spdemo</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>
  <dependencies>
    <dependency>
      <groupId>com.azure</groupId>
      <artifactId>azure-messaging-eventhubs</artifactId>
      <version>5.12.2</version>
    </dependency>
    <dependency>
      <groupId>com.azure</groupId>
      <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
      <version>1.14.0</version>
    </dependency>
    <dependency>
      <groupId>com.azure</groupId>
      <artifactId>azure-identity</artifactId>
      <version>1.5.3</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>2.5.5</version>
        <configuration>
          <archive>
            <manifest>
              <mainClass>com.example.App</mainClass>
            </manifest>
          </archive>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

 

第三步:添加完整代码

package com.example;
import com.azure.core.credential.TokenCredential;
import com.azure.identity.AzureAuthorityHosts;
import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.messaging.eventhubs.*;
import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
import com.azure.messaging.eventhubs.models.*;
import com.azure.storage.blob.*;
import java.io.IOException;
import java.sql.Date;
import java.time.Instant;
import java.time.temporal.TemporalUnit;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
/**
 * Hello world!
 *
*/
public class App {
    // private static final String connectionString ="<connectionString>";
    // private static final String eventHubName = "<eventHubName>";
    // private static final String storageConnectionString = "<storageConnectionString>";
    // private static final String storageContainerName = "<storageContainerName>";
    public static void main(String[] args) throws IOException {
        System.out.println("Hello World!");
        // String connectionString ="<connectionString>"; 
        // The fully qualified namespace for the Event Hubs instance. This is likely to
        // be similar to:
        // {your-namespace}.servicebus.windows.net
        // String fullyQualifiedNamespace ="<your event hub namespace>.servicebus.chinacloudapi.cn";
        // String eventHubName = "<eventHubName>";
        String storageConnectionString = System.getenv("storageConnectionString");
        String storageContainerName = System.getenv("storageContainerName");
        String fullyQualifiedNamespace = System.getenv("fullyQualifiedNamespace");
        String eventHubName = System.getenv("eventHubName");
        TokenCredential credential = new DefaultAzureCredentialBuilder().authorityHost(AzureAuthorityHosts.AZURE_CHINA)
                .build();
        // Create a blob container client that you use later to build an event processor
        // client to receive and process events
        BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
                .connectionString(storageConnectionString)
                .containerName(storageContainerName)
                .buildAsyncClient();
        
    
        // EventHubProducerClient
        // EventHubProducerClient client = new EventHubClientBuilder()
        // .credential(fullyQualifiedNamespace, eventHubName, credential)
        // .buildProducerClient();
        Map<String, EventPosition> initialPartitionEventPosition = new HashMap<>();
        initialPartitionEventPosition.put("0", EventPosition.fromSequenceNumber(3000));
 
        // EventProcessorClientBuilder
        // Create a builder object that you will use later to build an event processor
        // client to receive and process events and errors.
        EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder()
                // .connectionString(connectionString, eventHubName)
                .credential(fullyQualifiedNamespace, eventHubName, credential)
                .initialPartitionEventPosition(initialPartitionEventPosition)
                .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
                .processEvent(PARTITION_PROCESSOR)
                .processError(ERROR_HANDLER)
                .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient));
        // EventPosition.f
        // Use the builder object to create an event processor client
        EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient();
        System.out.println("Starting event processor");
        eventProcessorClient.start();
        System.out.println("Press enter to stop.");
        System.in.read();
        System.out.println("Stopping event processor");
        eventProcessorClient.stop();
        System.out.println("Event processor stopped.");
        System.out.println("Exiting process");
    }
    public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
        PartitionContext partitionContext = eventContext.getPartitionContext();
        EventData eventData = eventContext.getEventData();
        System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n",
                partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());
        // Every 10 events received, it will update the checkpoint stored in Azure Blob
        // Storage.
        if (eventData.getSequenceNumber() % 10 == 0) {
            eventContext.updateCheckpoint();
        }
    };
    public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
                errorContext.getPartitionContext().getPartitionId(),
                errorContext.getThrowable());
    };
}

 

 

 

附录:自定义设置 Event Position,当程序运行时,指定从Event Hub中获取消息的 Sequence Number

使用EventPosition对象中的fromSequenceNumber方法,可以指定一个序列号,Consume端会根据这个号码获取之后的消息。其他的方法还有 fromOffset(指定游标) / fromEnqueuedTime(指定一个时间点,获取之后的消息) / earliest(从最早开始) / latest(从最后开始获取新的数据,旧数据不获取)

Map<String, EventPosition> initialPartitionEventPosition = new HashMap<>();
    initialPartitionEventPosition.put("0", EventPosition.fromSequenceNumber(3000));


注意:

Map<String, EventPosition> 中的String 对象为Event Hub的分区ID,如果Event Hub有2个分区,则它的值分别时0,1.

EventPosition 设置的值,只在Storage Account所保存在CheckPoint Store中的值没有,或者小于此处设定的值时,才会起效果。否则,Consume 会根据从Checkpoint中获取的SequenceNumber为准。

 

 

 

参考资料

授予对 Azure 事件中心的访问权限 :https://docs.azure.cn/zh-cn/event-hubs/authorize-access-event-hubs

对使用 Azure Active Directory 访问事件中心资源的应用程序进行身份验证 : https://docs.azure.cn/zh-cn/event-hubs/authenticate-application

使用 Java 向/从 Azure 事件中心 (azure-messaging-eventhubs) 发送/接收事件 : https://docs.azure.cn/zh-cn/event-hubs/event-hubs-java-get-started-send

 

 

[END]

相关文章
|
3月前
|
Java 网络安全 开发工具
【Azure 事件中心】Event Hub 无法连接,出现 Did not observe any item or terminal signal within 60000ms in 'flatMapMany' 的错误消息
【Azure 事件中心】Event Hub 无法连接,出现 Did not observe any item or terminal signal within 60000ms in 'flatMapMany' 的错误消息
|
3月前
【Azure 事件中心】Azure Event Hub客户端遇见 Expired Heartbeat 错误
【Azure 事件中心】Azure Event Hub客户端遇见 Expired Heartbeat 错误
|
3月前
|
Linux API 网络架构
【Azure 事件中心】通过 az rest --method get 如何获得Event Hub Entity 级的统计指标
【Azure 事件中心】通过 az rest --method get 如何获得Event Hub Entity 级的统计指标
|
3月前
|
安全 Windows
【Azure 云服务】Azure Cloud Service中的错误事件 Error Event(Defrag/Perflib) 解答
【Azure 云服务】Azure Cloud Service中的错误事件 Error Event(Defrag/Perflib) 解答
|
2月前
【Azure Logic App】使用Event Hub 连接器配置 Active Directory OAuth 认证无法成功连接到中国区Event Hub的解决之法
An exception occurred while retrieving properties for Event Hub: logicapp. Error Message: 'ClientSecretCredential authentication failed: AADSTS90002: Tenant 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx' not found. Check to make sure you have the correct tenant ID and are signing into the correct cloud. Che
|
3月前
|
C++
【Azure Logic App】使用Event Hub 连接器配置 Active Directory OAuth 认证无法成功连接到中国区Event Hub
【Azure Logic App】使用Event Hub 连接器配置 Active Directory OAuth 认证无法成功连接到中国区Event Hub
|
3月前
|
C++
【Azure Logic App】使用Event Hub 连接器配置 Active Directory OAuth 认证无法成功连接到中国区Event Hub
在尝试使用Azure Logic App创建由Event Hub触发的工作流时,配置了Active Directory OAuth认证但仍遇到认证失败的问题。错误信息提示找不到指定的租户ID。尽管已设置了正确的Azure中国环境Authority,认证请求似乎仍指向全球Azure环境。这可能是Logic App服务本身的局限导致。作为替代方案,可采用Connection String或Managed Identity方式进行认证,两者均可正常工作。此外,通过Azure Function App复现此问题,进一步验证这是服务层面而非配置问题。相关文档和教程可在Azure官方文档中找到。
|
3月前
|
JSON 数据格式 Python
【Azure 应用服务】Azure Function Python函数中,如何获取Event Hub Trigger的消息Event所属于的PartitionID呢?
【Azure 应用服务】Azure Function Python函数中,如何获取Event Hub Trigger的消息Event所属于的PartitionID呢?
|
3月前
|
消息中间件 开发工具
【Azure 事件中心】Event Hub 消费端出现 Timeout Exception,errorContext中 LINK_CREDIT为0的解释
【Azure 事件中心】Event Hub 消费端出现 Timeout Exception,errorContext中 LINK_CREDIT为0的解释
|
3月前
|
消息中间件 存储 Kafka
【Azure 事件中心】适用Mirror Maker生产数据发送到Azure Event Hub出现发送一段时间后Timeout Exception: Expiring 18 record(s) for xxxxxxx: 79823 ms has passed since last append
【Azure 事件中心】适用Mirror Maker生产数据发送到Azure Event Hub出现发送一段时间后Timeout Exception: Expiring 18 record(s) for xxxxxxx: 79823 ms has passed since last append