【Azure Event Hub】Schema Registry 在China Azure门户上不能创建的替代方案

简介: 【Azure Event Hub】Schema Registry 在China Azure门户上不能创建的替代方案


创建Event Hub服务后,标准版的定价层功能中有Schema Registry的功能,但是根据官方文档,在门户中确无法创建。



什么是 Azure 架构注册表?

Azure 架构注册表是事件中心的一项功能,它为事件驱动的应用程序和以消息为中心的应用程序的架构提供一个中心存储库。



使用 Apache Avro 等架构驱动的序列化框架,将序列化元数据移到共享架构还有助于降低每条消息的开销。 这是因为每条消息不需要像 JSON 等标记格式那样包含元数据 (类型信息和字段名称)。



由于在UI上无法创建Scheme Registry功能, 可以通过REST API 或者是Azure CLI命令:


PUT https://management.chinacloudapi.cn/subscriptions/<>/resourceGroups/<>/providers/Microsoft.EventHub/namespaces/<>/schemagroups/<>?api-version=2021-11-01
  "properties": {
    "groupProperties": {},
    "schemaCompatibility": "Forward",
    "schemaType": "Avro"



# Azure Cli

az eventhubs namespace schema-registry create --name
                                              [--schema-compatibility {Backward, Forward, None}]
                                              [--schema-type {Avro, Unknown}]




# 使用示例

在使用事件中心 .NET SDK (AMQP) 流式处理事件时使用 Avro 架构验证 : https://learn.microsoft.com/zh-cn/azure/event-hubs/schema-registry-dotnet-send-receive-quickstart


using Azure.Data.SchemaRegistry;
using Azure.Identity;
using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
using Microsoft.Azure.Data.SchemaRegistry.example;
// connection string to the Event Hubs namespace
// name of the event hub
const string eventHubName = "EVENTHUBNAME";
// Schema Registry endpoint 
const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
// name of the consumer group   
const string schemaGroup = "SCHEMAGROUPNAME";
// The Event Hubs client types are safe to cache and use as a singleton for the lifetime
// of the application, which is best practice when events are being published or read regularly.
EventHubProducerClient producerClient;
// Create a producer client that you can use to send events to an event hub
producerClient = new EventHubProducerClient(connectionString, eventHubName);
// Create a schema registry client that you can use to serialize and validate data.  
var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
// Create an Avro object serializer using the Schema Registry client object. 
var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
// Create a new order object using the generated type/class 'Order'. 
var sampleOrder = new Order { id = "1234", amount = 45.29, description = "First sample order." };
EventData eventData = (EventData)await serializer.SerializeAsync(sampleOrder, messageType: typeof(EventData));
// Create a batch of events 
using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
// Add the event data to the event batch. 
// Send the batch of events to the event hub. 
await producerClient.SendAsync(eventBatch);
Console.WriteLine("A batch of 1 order has been published.");



using Azure.Data.SchemaRegistry;
using Azure.Identity;
using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
using Azure.Storage.Blobs;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Processor;
using Microsoft.Azure.Data.SchemaRegistry.example;
// connection string to the Event Hubs namespace
// name of the event hub
const string eventHubName = "EVENTHUBNAME";
// Schema Registry endpoint 
const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
// name of the consumer group   
const string schemaGroup = "SCHEMAGROUPNAME";
// connection string for the Azure Storage account
const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
// name of the blob container that will be userd as a checkpoint store
const string blobContainerName = "BLOBCONTAINERNAME";
// Create a blob container client that the event processor will use 
BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
// Create an event processor client to process events in the event hub
EventProcessorClient processor = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, connectionString, eventHubName);
// Register handlers for processing events and handling errors
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;
// Start the processing
await processor.StartProcessingAsync();
// Wait for 30 seconds for the events to be processed
await Task.Delay(TimeSpan.FromSeconds(30));
// Stop the processing
await processor.StopProcessingAsync();
static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
    // Create a schema registry client that you can use to serialize and validate data.  
    var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    // Create an Avro object serializer using the Schema Registry client object. 
    var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    // Deserialized data in the received event using the schema 
    Order sampleOrder = (Order)await serializer.DeserializeAsync(eventArgs.Data, typeof(Order));
    // Print the received event
    Console.WriteLine($"Received order with ID: {sampleOrder.id}, amount: {sampleOrder.amount}, description: {sampleOrder.description}");
       await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
    static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
    // Write details about the error to the console window
    Console.WriteLine($"\tPartition '{eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
    return Task.CompletedTask;





Java 开发工具
【Azure Developer】Azure Graph SDK获取用户列表的问题: SDK中GraphServiceClient如何指向中国区的Endpoint:https://microsoftgraph.chinacloudapi.cn/v1.0
【Azure Developer】Azure Graph SDK获取用户列表的问题: SDK中GraphServiceClient如何指向中国区的Endpoint:https://microsoftgraph.chinacloudapi.cn/v1.0
【Azure Logic App】使用Event Hub 连接器配置 Active Directory OAuth 认证无法成功连接到中国区Event Hub的解决之法
An exception occurred while retrieving properties for Event Hub: logicapp. Error Message: 'ClientSecretCredential authentication failed: AADSTS90002: Tenant 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx' not found. Check to make sure you have the correct tenant ID and are signing into the correct cloud. Che
【Azure Logic App】使用Event Hub 连接器配置 Active Directory OAuth 认证无法成功连接到中国区Event Hub
【Azure Logic App】使用Event Hub 连接器配置 Active Directory OAuth 认证无法成功连接到中国区Event Hub
JavaScript 网络协议 API
【Azure API 管理】Azure APIM服务集成在内部虚拟网络后,在内部环境中打开APIM门户使用APIs中的TEST功能失败
【Azure API 管理】Azure APIM服务集成在内部虚拟网络后,在内部环境中打开APIM门户使用APIs中的TEST功能失败
【Azure Logic App】使用Event Hub 连接器配置 Active Directory OAuth 认证无法成功连接到中国区Event Hub
在尝试使用Azure Logic App创建由Event Hub触发的工作流时,配置了Active Directory OAuth认证但仍遇到认证失败的问题。错误信息提示找不到指定的租户ID。尽管已设置了正确的Azure中国环境Authority,认证请求似乎仍指向全球Azure环境。这可能是Logic App服务本身的局限导致。作为替代方案,可采用Connection String或Managed Identity方式进行认证,两者均可正常工作。此外,通过Azure Function App复现此问题,进一步验证这是服务层面而非配置问题。相关文档和教程可在Azure官方文档中找到。
存储 API 数据安全/隐私保护
【Azure Developer】开发模式下使用AAD账号访问Azure Blob的相关参考
【Azure Developer】开发模式下使用AAD账号访问Azure Blob的相关参考
存储 API C#
【Azure Developer】解决Azure Key Vault管理Storage的示例代码在中国区Azure遇见的各种认证/授权问题 - C# Example Code
【Azure Developer】解决Azure Key Vault管理Storage的示例代码在中国区Azure遇见的各种认证/授权问题 - C# Example Code
【Azure Developer】使用Azure Resource Graph的查询语法的示例
【Azure Developer】使用Azure Resource Graph的查询语法的示例
存储 API 开发工具
【Azure API 管理】讨论APIM是否适合直接存储文件到Azure Storage Account呢?
【Azure API 管理】讨论APIM是否适合直接存储文件到Azure Storage Account呢?
【Azure Developer】使用 Azure VM 上的用户分配托管标识访问 Azure Key Vault 中国区代码示例
【Azure Developer】使用 Azure VM 上的用户分配托管标识访问 Azure Key Vault 中国区代码示例