【Azure Event Hub】Schema Registry 在China Azure门户上不能创建的替代方案

简介: 【Azure Event Hub】Schema Registry 在China Azure门户上不能创建的替代方案

问题描述

创建Event Hub服务后,标准版的定价层功能中有Schema Registry的功能,但是根据官方文档,在门户中确无法创建。

 

问题解答

什么是 Azure 架构注册表?

Azure 架构注册表是事件中心的一项功能,它为事件驱动的应用程序和以消息为中心的应用程序的架构提供一个中心存储库。

它使生成者和使用者应用程序可以灵活地交换数据,而无需管理和共享架构

它还为可重用架构提供了一个简单的治理框架,并通过分组构造(架构组)定义了架构之间的关系。

使用 Apache Avro 等架构驱动的序列化框架,将序列化元数据移到共享架构还有助于降低每条消息的开销。 这是因为每条消息不需要像 JSON 等标记格式那样包含元数据 (类型信息和字段名称)。

将架构与事件一起存储,并将其存储在事件基础结构中,可确保序列化或反序列化所需的元数据始终是可访问的,并且架构始终置于正确的位置。

 

由于在UI上无法创建Scheme Registry功能, 可以通过REST API 或者是Azure CLI命令:

# REST API

PUT https://management.chinacloudapi.cn/subscriptions/<>/resourceGroups/<>/providers/Microsoft.EventHub/namespaces/<>/schemagroups/<>?api-version=2021-11-01
{
  "properties": {
    "groupProperties": {},
    "schemaCompatibility": "Forward",
    "schemaType": "Avro"
  }
}

参考资料:https://learn.microsoft.com/en-us/rest/api/eventhub/schema-registry/create-or-update?view=rest-eventhub-2021-11-01&tabs=HTTP

 

# Azure Cli

az eventhubs namespace schema-registry create --name
                                              --namespace-name
                                              --resource-group
                                              [--group-properties]
                                              [--schema-compatibility {Backward, Forward, None}]
                                              [--schema-type {Avro, Unknown}]

 

参考资料:https://learn.microsoft.com/en-us/cli/azure/eventhubs/namespace/schema-registry?view=azure-cli-latest

 

# 使用示例

在使用事件中心 .NET SDK (AMQP) 流式处理事件时使用 Avro 架构验证 : https://learn.microsoft.com/zh-cn/azure/event-hubs/schema-registry-dotnet-send-receive-quickstart

发送消息

using Azure.Data.SchemaRegistry;
using Azure.Identity;
using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
using Microsoft.Azure.Data.SchemaRegistry.example;
// connection string to the Event Hubs namespace
const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
// name of the event hub
const string eventHubName = "EVENTHUBNAME";
// Schema Registry endpoint 
const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
// name of the consumer group   
const string schemaGroup = "SCHEMAGROUPNAME";
// The Event Hubs client types are safe to cache and use as a singleton for the lifetime
// of the application, which is best practice when events are being published or read regularly.
EventHubProducerClient producerClient;
// Create a producer client that you can use to send events to an event hub
producerClient = new EventHubProducerClient(connectionString, eventHubName);
// Create a schema registry client that you can use to serialize and validate data.  
var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
// Create an Avro object serializer using the Schema Registry client object. 
var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
// Create a new order object using the generated type/class 'Order'. 
var sampleOrder = new Order { id = "1234", amount = 45.29, description = "First sample order." };
EventData eventData = (EventData)await serializer.SerializeAsync(sampleOrder, messageType: typeof(EventData));
// Create a batch of events 
using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
// Add the event data to the event batch. 
eventBatch.TryAdd(eventData);
// Send the batch of events to the event hub. 
await producerClient.SendAsync(eventBatch);
Console.WriteLine("A batch of 1 order has been published.");

 

消费消息

using Azure.Data.SchemaRegistry;
using Azure.Identity;
using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
using Azure.Storage.Blobs;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Processor;
using Microsoft.Azure.Data.SchemaRegistry.example;
// connection string to the Event Hubs namespace
const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
// name of the event hub
const string eventHubName = "EVENTHUBNAME";
// Schema Registry endpoint 
const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
// name of the consumer group   
const string schemaGroup = "SCHEMAGROUPNAME";
// connection string for the Azure Storage account
const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
// name of the blob container that will be userd as a checkpoint store
const string blobContainerName = "BLOBCONTAINERNAME";
// Create a blob container client that the event processor will use 
BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
// Create an event processor client to process events in the event hub
EventProcessorClient processor = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, connectionString, eventHubName);
// Register handlers for processing events and handling errors
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;
// Start the processing
await processor.StartProcessingAsync();
// Wait for 30 seconds for the events to be processed
await Task.Delay(TimeSpan.FromSeconds(30));
// Stop the processing
await processor.StopProcessingAsync();
static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
{
    // Create a schema registry client that you can use to serialize and validate data.  
    var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    // Create an Avro object serializer using the Schema Registry client object. 
    var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    // Deserialized data in the received event using the schema 
    Order sampleOrder = (Order)await serializer.DeserializeAsync(eventArgs.Data, typeof(Order));
    // Print the received event
    Console.WriteLine($"Received order with ID: {sampleOrder.id}, amount: {sampleOrder.amount}, description: {sampleOrder.description}");
       await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
    }
    static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
{
    // Write details about the error to the console window
    Console.WriteLine($"\tPartition '{eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
    Console.WriteLine(eventArgs.Exception.Message);
    return Task.CompletedTask;
}

 

 

 

【END】

相关文章
|
2月前
|
Java
【Azure Developer】com.azure:azure-identity jar包版本从1.2.0 升级到1.12.2 版本之后报错
在将 `com.azure:azure-identity` 的 jar 包版本从 1.2.0 升级到 1.12.2 后出现错误,错误信息表明存在方法未找到的问题。分析显示这是由于依赖管理错误导致,需要调整 classpath 确保使用兼容版本的依赖包。当前项目中 msal4j-1.11.0.jar 与 azure-identity-1.12.2 不兼容。
|
3月前
【Azure Logic App】使用Event Hub 连接器配置 Active Directory OAuth 认证无法成功连接到中国区Event Hub的解决之法
An exception occurred while retrieving properties for Event Hub: logicapp. Error Message: 'ClientSecretCredential authentication failed: AADSTS90002: Tenant 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx' not found. Check to make sure you have the correct tenant ID and are signing into the correct cloud. Che
|
4月前
|
C++
【Azure Logic App】使用Event Hub 连接器配置 Active Directory OAuth 认证无法成功连接到中国区Event Hub
【Azure Logic App】使用Event Hub 连接器配置 Active Directory OAuth 认证无法成功连接到中国区Event Hub
|
4月前
|
JavaScript 网络协议 API
【Azure API 管理】Azure APIM服务集成在内部虚拟网络后,在内部环境中打开APIM门户使用APIs中的TEST功能失败
【Azure API 管理】Azure APIM服务集成在内部虚拟网络后,在内部环境中打开APIM门户使用APIs中的TEST功能失败
|
4月前
|
开发者
【Azure 应用服务】如果发现当前使用的订阅无法在China North 3 区中创建App Service服务,如何来解决这个问题呢?
【Azure 应用服务】如果发现当前使用的订阅无法在China North 3 区中创建App Service服务,如何来解决这个问题呢?
|
4月前
|
C++
【Azure Logic App】使用Event Hub 连接器配置 Active Directory OAuth 认证无法成功连接到中国区Event Hub
在尝试使用Azure Logic App创建由Event Hub触发的工作流时,配置了Active Directory OAuth认证但仍遇到认证失败的问题。错误信息提示找不到指定的租户ID。尽管已设置了正确的Azure中国环境Authority,认证请求似乎仍指向全球Azure环境。这可能是Logic App服务本身的局限导致。作为替代方案,可采用Connection String或Managed Identity方式进行认证,两者均可正常工作。此外,通过Azure Function App复现此问题,进一步验证这是服务层面而非配置问题。相关文档和教程可在Azure官方文档中找到。
|
4月前
|
存储 API C#
【Azure Developer】解决Azure Key Vault管理Storage的示例代码在中国区Azure遇见的各种认证/授权问题 - C# Example Code
【Azure Developer】解决Azure Key Vault管理Storage的示例代码在中国区Azure遇见的各种认证/授权问题 - C# Example Code
|
4月前
|
存储 API 开发工具
【Azure API 管理】讨论APIM是否适合直接存储文件到Azure Storage Account呢?
【Azure API 管理】讨论APIM是否适合直接存储文件到Azure Storage Account呢?
|
4月前
|
开发工具
【Azure 环境】使用Microsoft Graph PS SDK 登录到中国区Azure, 命令Connect-MgGraph -Environment China xxxxxxxxx 遇见登录错误
【Azure 环境】使用Microsoft Graph PS SDK 登录到中国区Azure, 命令Connect-MgGraph -Environment China xxxxxxxxx 遇见登录错误
|
4月前
|
开发工具
【Azure Developer】在Azure Storage Account的两个Blob可以同步吗?可以跨订阅拷贝吗?
【Azure Developer】在Azure Storage Account的两个Blob可以同步吗?可以跨订阅拷贝吗?