SQL Server Service Broker创建单个数据库会话(消息队列)

简介:
2018-01-08 18:27 by pursuer.chen, 287 阅读, 1 评论, 收藏, 编辑

概述  

SQL Server Service Broker 用来创建用于交换消息的会话。消息在目标和发起方这两个端点之间进行交换。消息用于传输数据和触发消息收到时的处理过程。目标和发起方既可以在同一数据库引擎实例的同一数据库或不同数据库中,也可以在不同数据库引擎实例的同一数据库或不同数据库中。

 

每个 Service Broker 会话都有两个端点:会话发起方和目标。您将执行下列任务:

  • 为目标创建一个服务和队列,并为发起方创建一个服务和队列。
  • 创建请求消息类型和答复消息类型。
  • 创建约定,指定请求消息从发起方传递到目标并且答复消息从目标传递到发起方。

然后执行一个简单会话:

  • 启动会话。
  • 从发起方向目标发送一个请求。
  • 在目标处接收请求并将答复发送到发起方。
  • 在发起方处接收答复。
  • 结束会话。

对于其两端在同一 数据库引擎 实例中的会话,其消息不通过网络传输。数据库引擎 安全性和权限将限制对授权主体的访问。此方案不需要网络加密。

一、创建会话对象

1.启用Service Broker

复制代码
----创建数据库
IF NOT EXISTS(SELECT * FROM SYS.DATABASES WHERE name='Dsend')
BEGIN
     CREATE DATABASE Dsend;

END

USE master;
GO
---开启数据库BROKER
ALTER DATABASE Dsend  SET ENABLE_BROKER;
GO

SELECT is_broker_enabled FROM SYS.DATABASES WHERE NAME='Dsend'

USE Dsend;
GO
复制代码

2.创建消息类型

由于经常在多个数据库引擎实例间引用 Service Broker 对象,因而大多数 Service Broker 对象的名称都是 URI 格式的。这有助于确保它们在多台计算机上是唯一的。这两种消息类型都指定 Service Broker 将只验证消息是否是格式正确的 XML 文档,并且指定 Service Broker 将不按照特定架构验证 XML。

复制代码
CREATE MESSAGE TYPE
       [//Dsend/test/RequestMessage]
       VALIDATION = WELL_FORMED_XML;
CREATE MESSAGE TYPE
       [//Dsend/test/ReplyMessage]
       VALIDATION = WELL_FORMED_XML;
复制代码

创建请求消息和答复消息,并且消息的格式为XML格式。

注意:Service Broker 验证传入消息。如果消息包含的消息正文与指定的验证类型不符,则 Service Broker 将放弃此无效消息,并向发送此消息的服务返回一条错误消息。会话双方必须定义相同的消息类型名称。为便于排除故障,尽管 Service Broker 不要求会话双方使用相同的验证,但通常会话双方还是会为消息类型指定相同的验证。消息类型不能是临时对象。允许使用以 # 开头的消息类型名称,但它们是永久对象。

3.创建约定

约定用于定义在 Service Broker 会话中所使用的消息类型,还用于确定会话的哪一端可以发送该类型的消息。每个会话都要遵循一个约定。当会话开始时,启动服务为会话指定约定。目标服务指定该目标服务将接受其会话的约定。 

复制代码
/*
SENT BY INITIATOR    ----指示只有会话的发起方才能发送指定消息类型的消息。启动会话的服务称为会话的“发起方”
SENT BY TARGET       ----指示只有会话的目标才能发送指定消息类型的消息。接受由另一个服务启动的会话的服务称为会话的目标。
SENT BY ANY          ----指示发起方和目标都可以发送此类型的消息。
*/

CREATE CONTRACT [//Dsend/test/RequestContract]
      ([//Dsend/test/RequestMessage]    
       SENT BY INITIATOR,             ---约定只有发起方才能使用//Dsend/test/RequestMessage消息类型
       [//Dsend/test/ReplyMessage]
       SENT BY TARGET                 ---约定只有答复方才能使用//Dsend/test/ReplyMessage消息类型
      );
复制代码

4.创建队列

队列可以存储消息。当一条针对某项服务的消息到达时,Service Broker 会将该消息放入与该服务关联的队列中。

创建发起方和答复方的队列。

CREATE QUEUE RequestQueue WITH STATUS=ON;
CREATE QUEUE ReplyQueue WITH STATUS=ON;

注意:

1.队列可以通过SELECT 语句查询,但是不能使用INSERT、UPDATE、DELETE 或 TRUNCATE 语句来操作。只能使用在 Service Broker 会话中运行的语句(如 SEND、RECEIVE 和 END CONVERSATION)来修改队列的内容。

2.队列可能不是临时对象。因此,以 # 开头的队列名称无效。

3.通过以不可用状态创建队列,可以先准备好服务的基础结构,然后再允许在队列中接收消息。

4.如果队列中没有消息,则 Service Broker 不会停止激活存储过程。如果队列中在短时间内没有可用消息,应退出激活存储过程。

5.在 Service Broker 启动存储过程时将检查激活存储过程的权限,而不是在创建队列时检查。CREATE QUEUE 语句不验证 EXECUTE AS 子句中指定的用户是否有权限执行 PROCEDURE NAME 子句中指定的存储过程。

6.队列不可用时,Service Broker 将在数据库的传输队列中保存使用该队列的服务的消息。sys.transmission_queue 目录视图提供传输队列的视图。

 

例:创建具有多个参数的队列

以下示例在 DEFAULT 文件组中创建一个队列。该队列不可用。消息被保留在队列中,直到消息所属的会话结束。通过 ALTER QUEUE 启用队列后,该队列将启动存储过程 2008R2.dbo.expense_procedure 来处理消息。此存储过程以运行 CREATE QUEUE 语句的用户的身份执行。该队列最多启动存储过程的 10 个实例。

复制代码
CREATE QUEUE ExpenseQueue
    WITH STATUS = OFF,
      RETENTION = ON,
      ACTIVATION (
          PROCEDURE_NAME = AdventureWorks2008R2.dbo.expense_procedure,
          MAX_QUEUE_READERS = 10,
          EXECUTE AS SELF )
    ON [DEFAULT] ;
复制代码

5.创建服务

Service Broker 使用服务的名称路由消息、将消息传递到数据库中的正确队列,以及强制执行会话的约定。一个服务可以同时绑定多个约束。

复制代码
--1.创建要用于发起方的队列和服务。由于未指定约定名称,因而其他服务不可将此服务用作目标服务,此服务只能启动会话。
CREATE SERVICE
       [//Dsend/test/RequestService]
       ON QUEUE RequestQueue     
GO
---2.创建接答复服务
CREATE SERVICE
       [//Dsend/test/ReplyService]
       ON QUEUE ReplyQueue
       ([//Dsend/test/RequestContract]
       )
       ;
GO
复制代码

注意:服务公开与其关联的约定提供的功能,以便其他服务可使用该功能。CREATE SERVICE 语句指定针对此服务的约定。一个服务只能是使用该服务指定的约定会话的目标。未指定约定的服务不会向其他服务公开任何功能。

从此服务启动的会话可使用任何约定。如果服务仅启动会话,则创建服务时可不指定约定。Service Broker 从远程服务接受新会话时,目标服务的名称决定了 Broker 在会话中放入消息的队列。

 

例:创建具有多个约定的服务

CREATE SERVICE [//Adventure-Works.com/Expenses] ON QUEUE ExpenseQueue
    ([//Adventure-Works.com/Expenses/ExpenseSubmission],
     [//Adventure-Works.com/Expenses/ExpenseProcessing]) ;

5.开启回话

复制代码
DECLARE @InitDlgHandle UNIQUEIDENTIFIER;
DECLARE @RequestMsg NVARCHAR(100);

BEGIN TRANSACTION;

BEGIN DIALOG @InitDlgHandle
     FROM SERVICE
      [//Dsend/test/RequestService] ---指定的服务是用于答复消息的返回地址
     TO SERVICE
      N'//Dsend/test/ReplyService'  ---指定的服务是消息发送到的地址。
     ON CONTRACT
      [//Dsend/test/RequestContract]
     WITH
         ENCRYPTION = OFF;

SELECT @InitDlgHandle;

SELECT @RequestMsg =
       N'<RequestMsg>3</RequestMsg>';

SEND ON CONVERSATION @InitDlgHandle
     MESSAGE TYPE 
     [//Dsend/test/RequestMessage]
     (@RequestMsg);

SELECT @RequestMsg AS SentRequestMsg;

COMMIT TRANSACTION;
GO
复制代码

 

6.接收方接收消息并返回消息给发送方

复制代码
DECLARE @RecvReqDlgHandle UNIQUEIDENTIFIER;
DECLARE @RecvReqMsg NVARCHAR(100);
DECLARE @RecvReqMsgName sysname;
DECLARE @Message NVARCHAR(100)

BEGIN TRANSACTION;

WAITFOR
( RECEIVE TOP(1)
    @RecvReqDlgHandle = conversation_handle,
    @RecvReqMsg = message_body,
    @RecvReqMsgName = message_type_name
  FROM ReplyQueue
), TIMEOUT 1000;

SELECT @RecvReqMsg AS ReceivedRequestMsg;
SELECT @RecvReqMsgName;


BEGIN

     -----返回接收消息确认结果到发起方

     DECLARE @ReplyMsg NVARCHAR(100);
     SELECT @ReplyMsg =
     N'<ReplyMsg>Reply Message</ReplyMsg>';

     SEND ON CONVERSATION @RecvReqDlgHandle
          MESSAGE TYPE 
          [//Dsend/test/ReplyMessage]
          (@ReplyMsg);

     SELECT @RecvReqDlgHandle;
      
     ---正常的流程不是在这里结束会话
     END CONVERSATION @RecvReqDlgHandle;
END

IF @RecvReqMsgName =N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
    BEGIN
         END CONVERSATION @RecvReqDlgHandle;
    END

COMMIT TRANSACTION;
GO
复制代码

例:接收数据同时插入表变量

DECLARE @conversation_group_id UNIQUEIDENTIFIER ;

DECLARE @procTable TABLE(
     service_instance_id UNIQUEIDENTIFIER,
     handle UNIQUEIDENTIFIER,
     message_sequence_number BIGINT,
     service_name NVARCHAR(512),
     service_contract_name NVARCHAR(256),
     message_type_name NVARCHAR(256),
     validation NCHAR,
     message_body VARBINARY(MAX)) ;

SET @conversation_group_id = <retrieve conversation group ID from database> ;

RECEIVE TOP (1)
    conversation_group_id,
    conversation_handle,
    message_sequence_number,
    service_name,
    service_contract_name,
    message_type_name,
    validation,
    message_body
FROM ExpenseQueue
INTO @procTable
WHERE conversation_group_id = @conversation_group_id ;
View Code

 

7.发送方收到消息终止会话

复制代码
DECLARE @RecvReplyMsg NVARCHAR(100);
DECLARE @RecvReplyDlgHandle UNIQUEIDENTIFIER;
DECLARE @RecvReqMsgName sysname;

BEGIN TRANSACTION;

WAITFOR
( RECEIVE TOP(1)
    @RecvReplyDlgHandle = conversation_handle,
    @RecvReplyMsg = message_body,
    @RecvReqMsgName=message_type_name
  FROM RequestQueue
), TIMEOUT 1000;

IF @RecvReqMsgName =N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
    BEGIN
         END CONVERSATION @RecvReqDlgHandle;
    END

SELECT @RecvReplyMsg AS ReceivedReplyMsg;

COMMIT TRANSACTION;

GO
复制代码

三、查询

复制代码
SELECT state_desc,*
FROM sys.conversation_endpoints

SELECT message_type_name, CAST(message_body as xml) message,*
FROM dbo.RequestQueue

SELECT message_type_name, CAST(message_body as xml) message,*
FROM dbo.ReplyQueue

SELECT CAST(message_body as xml) message,* FROM sys.transmission_queue

--END CONVERSATION '236AF2C5-57F4-E711-A9E6-005056C00008';
复制代码

四、删除会话对象

复制代码
IF EXISTS (SELECT * FROM sys.services
           WHERE name =
           N'//AWDB/1DBSample/TargetService')
     DROP SERVICE
     [//AWDB/1DBSample/TargetService];

IF EXISTS (SELECT * FROM sys.service_queues
           WHERE name = N'TargetQueue1DB')
     DROP QUEUE TargetQueue1DB;

-- Drop the intitator queue and service if they already exist.
IF EXISTS (SELECT * FROM sys.services
           WHERE name =
           N'//AWDB/1DBSample/InitiatorService')
     DROP SERVICE
     [//AWDB/1DBSample/InitiatorService];

IF EXISTS (SELECT * FROM sys.service_queues
           WHERE name = N'InitiatorQueue1DB')
     DROP QUEUE InitiatorQueue1DB;

IF EXISTS (SELECT * FROM sys.service_contracts
           WHERE name =
           N'//AWDB/1DBSample/SampleContract')
     DROP CONTRACT
     [//AWDB/1DBSample/SampleContract];

IF EXISTS (SELECT * FROM sys.service_message_types
           WHERE name =
           N'//AWDB/1DBSample/RequestMessage')
     DROP MESSAGE TYPE
     [//AWDB/1DBSample/RequestMessage];

IF EXISTS (SELECT * FROM sys.service_message_types
           WHERE name =
           N'//AWDB/1DBSample/ReplyMessage')
     DROP MESSAGE TYPE
     [//AWDB/1DBSample/ReplyMessage];
GO
复制代码

 

参考:http://www.cnblogs.com/downmoon/archive/2011/04/05/2005900.html

参考:https://docs.microsoft.com/zh-cn/sql/t-sql/statements/create-message-type-transact-sql

总结

使用单数据库会话可以处理一般的队列发送和读写消息的场景,其中sys.conversation_endpoints系统视图需要重点关注。

 

 





本文转自pursuer.chen(陈敏华)博客园博客,原文链接:http://www.cnblogs.com/chenmh/p/8204765.html,如需转载请自行联系原作者

相关实践学习
使用SQL语句管理索引
本次实验主要介绍如何在RDS-SQLServer数据库中,使用SQL语句管理索引。
SQL Server on Linux入门教程
SQL Server数据库一直只提供Windows下的版本。2016年微软宣布推出可运行在Linux系统下的SQL Server数据库,该版本目前还是早期预览版本。本课程主要介绍SQLServer On Linux的基本知识。 相关的阿里云产品:云数据库RDS&nbsp;SQL Server版 RDS SQL Server不仅拥有高可用架构和任意时间点的数据恢复功能,强力支撑各种企业应用,同时也包含了微软的License费用,减少额外支出。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/sqlserver
目录
相关文章
|
2月前
|
SQL 存储 关系型数据库
第二篇:关系型数据库的核心概念与 SQL 基础
本篇内容深入浅出地讲解了关系型数据库的核心概念与SQL基础,适合有一定计算机基础的学习者。文章涵盖数据库的基本操作(CRUD)、数据类型、表的创建与管理等内容,并通过实例解析SELECT、INSERT、UPDATE、DELETE等语句的用法。此外,还推荐了多种学习资源与实践建议,帮助读者巩固知识。学完后,你将掌握基础数据库操作,为后续高级学习铺平道路。
131 1
|
3月前
|
SQL 数据库 数据安全/隐私保护
数据库数据恢复——sql server数据库被加密的数据恢复案例
SQL server数据库数据故障: SQL server数据库被加密,无法使用。 数据库MDF、LDF、log日志文件名字被篡改。 数据库备份被加密,文件名字被篡改。
|
27天前
|
SQL 人工智能 关系型数据库
GitHub 热门!MindsDB 破解 AI + 数据库瓶颈,究竟有什么惊艳亮点?只需 SQL 即可实现智能预测
MindsDB 是一款将 AI 能力直接注入数据库的开源工具,支持 MySQL、PostgreSQL 等多种数据库连接,通过 SQL 即可完成模型训练与预测。它提供 AutoML 引擎、LLM 集成、联邦查询等功能,简化 MLOps 流程,实现数据到智能的无缝衔接。项目在 GitHub 上已获 32.4k 星,社区活跃,适用于客户流失预警、推荐系统、情感分析等场景。开发者无需深入模型细节,即可快速构建智能解决方案。项目地址:https://github.com/mindsdb/mindsdb。
131 0
|
3月前
|
SQL 关系型数据库 MySQL
大数据新视界--大数据大厂之MySQL数据库课程设计:MySQL 数据库 SQL 语句调优方法详解(2-1)
本文深入介绍 MySQL 数据库 SQL 语句调优方法。涵盖分析查询执行计划,如使用 EXPLAIN 命令及理解关键指标;优化查询语句结构,包括避免子查询、减少函数使用、合理用索引列及避免 “OR”。还介绍了索引类型知识,如 B 树索引、哈希索引等。结合与 MySQL 数据库课程设计相关文章,强调 SQL 语句调优重要性。为提升数据库性能提供实用方法,适合数据库管理员和开发人员。
|
3月前
|
关系型数据库 MySQL 大数据
大数据新视界--大数据大厂之MySQL 数据库课程设计:MySQL 数据库 SQL 语句调优的进阶策略与实际案例(2-2)
本文延续前篇,深入探讨 MySQL 数据库 SQL 语句调优进阶策略。包括优化索引使用,介绍多种索引类型及避免索引失效等;调整数据库参数,如缓冲池、连接数和日志参数;还有分区表、垂直拆分等其他优化方法。通过实际案例分析展示调优效果。回顾与数据库课程设计相关文章,强调全面认识 MySQL 数据库重要性。为读者提供综合调优指导,确保数据库高效运行。
|
4月前
|
SQL 数据库连接 Linux
数据库编程:在PHP环境下使用SQL Server的方法。
看看你吧,就像一个调皮的小丑鱼在一片广阔的数据库海洋中游弋,一路上吞下大小数据如同海中的珍珠。不管有多少难关,只要记住这个流程,剩下的就只是探索未知的乐趣,沉浸在这个充满挑战的数据库海洋中。
104 16
|
4月前
|
SQL 关系型数据库 MySQL
如何优化SQL查询以提高数据库性能?
这篇文章以生动的比喻介绍了优化SQL查询的重要性及方法。它首先将未优化的SQL查询比作在自助餐厅贪多嚼不烂的行为,强调了只获取必要数据的必要性。接着,文章详细讲解了四种优化策略:**精简选择**(避免使用`SELECT *`)、**专业筛选**(利用`WHERE`缩小范围)、**高效联接**(索引和限制数据量)以及**使用索引**(加速搜索)。此外,还探讨了如何避免N+1查询问题、使用分页限制结果、理解执行计划以及定期维护数据库健康。通过这些技巧,可以显著提升数据库性能,让查询更高效流畅。
|
3月前
|
SQL IDE 关系型数据库
JetBrains DataGrip 2025.1 发布 - 数据库和 SQL 跨平台 IDE
JetBrains DataGrip 2025.1 (macOS, Linux, Windows) - 数据库和 SQL 跨平台 IDE
207 0
|
5月前
|
SQL 数据库
数据库数据恢复—SQL Server报错“错误 823”的数据恢复案例
SQL Server数据库附加数据库过程中比较常见的报错是“错误 823”,附加数据库失败。 如果数据库有备份则只需还原备份即可。但是如果没有备份,备份时间太久,或者其他原因导致备份不可用,那么就需要通过专业手段对数据库进行数据恢复。
|
5月前
|
SQL 存储 关系型数据库
【SQL技术】不同数据库引擎 SQL 优化方案剖析
不同数据库系统(MySQL、PostgreSQL、Doris、Hive)的SQL优化策略。存储引擎特点、SQL执行流程及常见操作(如条件查询、排序、聚合函数)的优化方法。针对各数据库,索引使用、分区裁剪、谓词下推等技术,并提供了具体的SQL示例。通用的SQL调优技巧,如避免使用`COUNT(DISTINCT)`、减少小文件问题、慎重使用`SELECT *`等。通过合理选择和应用这些优化策略,可以显著提升数据库查询性能和系统稳定性。
165 9