MSSQL · 实现分析 · SQL Server实现审计日志的方案探索

本文涉及的产品
云数据库 RDS SQL Server,基础系列 2核4GB
RDS SQL Server Serverless,2-4RCU 50GB 3个月
推荐场景:
云解析 DNS,旗舰版 1个月
简介:

摘要

这篇文章介绍四种实现MSSQL Server审计日志功能的方法探索,即解析数据库事务日志、SQL Profiler、SQL Audit以及Extended Event。详细介绍了这四种方法的具体实现,以及通过优缺点的对比和总结,最终得出结论,使用Extended Event实现审计日志是最好的选择,为产品化选型提供参考。

审计日志需求分析

对于关系型数据库来而言,在生产环境SQL Server数据库实例中,审计日志是一个非常重要且必须的强需求功能,主要体现在以下几个方面。

  • 安全审计
  • 问题排查
  • 性能调优

安全审计

在一些存取敏感信息的产品环境数据库SQL Server实例中(比如:财务系统、设计到国家安全层面的数据库系统),对数据操作要求十分谨慎,安全要求等级十分严密,需要对每一条数据操作语句进行审计,以便做到每次数据变动或查看均可追溯。在这个场景中,对敏感信息操作的审计是基于数据安全性的要求。

问题排查

在日常生产系统管理维护过程中,我们经常会遇到类似的场景和疑问:能否找到是谁在哪个时间点执行了什么语句把数据XXX给删除(更新)了呢?笔者在从事DBA行业的几年工作经历过程中,无数次被问及到类似的问题。要解决这个场景中的问题,审计日志功能是不二选择。

性能调优

利用审计日志对数据库系统进行性能调优是审计日志非常重要的功能和用途。比如,以下是几个审计日志典型的应用场景:

  • 找出某段时间内哪些语句导致了系统性能消耗严重(比如:CPU、IOPS等)
  • 找出某段时间内的TOP CPU SQL语句
  • 找出某段时间内的TOP IO SQL语句
  • 找出某段时间内的TOP Time Cost SQL语句
  • 找出某段时间内哪个用户使用的数据库系统资源最多
  • 找出某段时间内哪个应用使用的数据库系统资源最多
  • ……

实现审计日志的方法

基于以上对审计日志的需求分析,我们了解到审计日志的功能是关系型数据至关重要的强需求,让我们来看看SQL Server数据库系统有哪些实现审计日志功能的方法和具体实现,以及这些方法的优缺点对比。

数据库日志分析

在SQL Server数据库事务日志中,记录了每个事务的数据变更操作的详细信息,包含谁在哪个时间点做了什么操作。所以,我们可以基于SQL Server数据库事务日志的分析,来获取数据变更的详细审计日志信息。使用这个方法来实现审计日志功能的,有一家叫着ApexSQL的公司产品做的很不错,产品ApexSQL Log就是通过数据库事务日志来实现审计日志功能的产品,详情参见:ApexSQL Log。附一张来自ApexSQL官网的截图:

01.png

但是,由于SQL Server本身是微软的闭源产品,对于事务日志格式外界很难知道,所以这个方法的实现门槛很高,实现难度极大。加之,有可能不同版本的SQL Server事务日志格式存在差异,必须要对每个版本的事务日志解析做相应的适配,导致维护成本极高,产品功能延续性存在极大风险和挑战。

SQL Profiler

SQL Profiler是微软从SQL Server 2000开始引入的数据库引擎跟踪工具,具有使用界面操作的接口、使用SQL语句创建接口以及使用SMO编程创建接口。使用SQL Profiler,可以实现非常多的功能,比如:

  • 图形化监控数据库引擎执行的SQL语句(也可以将执行语句保存到表中)
  • 查看执行语句实时的执行计划
  • 数据库引擎错误信息排查
  • 数据库性能分析
  • 阻塞,锁等待、锁升级及死锁跟踪
  • 后台收集查询语句信息
  • ……

所以,从功能完整性角度来说,我们完全可以使用SQL Profiler来实现就数据库实例级别的审计日志的功能。那么接下来让我们看看如何使用SQL Profiler实现审计日志的功能。

图形化创建

开始 => 运行 => 键入“Profiler” => 回车,打开Profiler工具后,点击“New Trace” => Server Name => Authentication => Connect,如下图所示:

02.png

然后,选择General => Save to table => 选择要保留到的实例名、数据库名、架构名和表名 => OK

03.png

接下来选择要跟踪的事件,Events Selection => SQL:StmtCompleted => Column Filters => LoginName => Not Like %sa% => OK => Run

04.png

使用SQL语句创建

使用图形化界面创建SQL Profiler实现审计日志功能,简单易用,很容易上手。但是,过程繁琐、效率不高,难于自动化。这个时候,就需要使用SQL语句来创建SQL Profiler功能,实现一键创建的方法了。

use master
GO

set nocount on

declare 
	@trace_folder nvarchar(256)
	,@trace_file nvarchar(256) 
	,@max_files_size bigint
	
	,@stop_time datetime
	,@file_count int

	,@int_filter_cpu int
	,@int_filter_duration bigint
	,@int_filter_spid int
	,@set_trace_status int
;

select @trace_folder=N'C:\Temp\perfmon'
	
	,@max_files_size = 50			--max file size for each trace file
	,@file_count = 10				--max file count
	
	,@stop_time = '6/13/2017 10:50'	--null: stop trace manully; specify time (stop at the specify time)
	,@int_filter_cpu = NULL				-- >= @int_filter_cpu ms will be traced. or else, skipped.
										--NULL: ignore this filter
	,@int_filter_duration = NULL		--execution duration filter: millisecond
										--NULL: ignore this filter
	--,@int_filter_spid = 151			--integer: specify a spid to trace
										--				
										
	,@set_trace_status = 1	--0: Stops the specified trace.; 
							--1: Starts the specified trace.;
							--2: Closes the specified trace and deletes its definition from the server.;
;

/*

select * from sys.traces

*/
--private variables
declare
	@trace_id int
	,@do int
	,@loop int
	,@trace_event_id int
	,@trace_column_id int
	,@return_code tinyint
	,@return_decription varchar(200)
	,@field_separator char(1)

;	
select @field_separator = ','			--trace columns list separator
;

IF right(ltrim(rtrim(@trace_folder)), 1 ) <> '\'
BEGIN
	SELECT 
		@trace_folder = ltrim(rtrim(@trace_folder)) + N'\' 
	;
	exec sys.xp_create_subdir @trace_folder
END
;

select
	@trace_file = @trace_folder + REPLACE(@@SERVERNAME, N'\', N'')
;

IF @int_filter_spid IS NOT NULL
BEGIN
	select
		@trace_file = @trace_file + cast(@int_filter_spid as varchar)
	;
END

--select @trace_file

select top 1
	@trace_id = id
from sys.traces
where path like @trace_file + N'%' if @trace_id is not null
begin
	
	-- Start Trace (status 1 = start)
	EXEC sys.sp_trace_setstatus @trace_id, @set_trace_status return
end

if OBJECT_ID('tempdb..#trace_event','u') is not null
	drop table #trace_event
create table #trace_event
(
	id int identity(1,1) not null primary key
	,trace_event_id int not null
	,trace_column_id int not null
	,event_name sysname null
	,trace_column_name sysname null
)

;with trace_event
as
(		--select * from sys.trace_events order by trace_event_id
	select 
		is_trace = 1 , event_name = 'SQL:StmtCompleted'
		,trace_column_list = 'NestLevel,ClientProcessID,EndTime,
DatabaseID,GroupID,ServerName,SPID,DatabaseName,NTUserName,IntegerData2,RequestID,EventClass,SessionLoginName,NTDomainName,TextData,XactSequence,
CPU,ApplicationName,Offset,LoginSid,TransactionID,IntegerData,Duration,SourceDatabaseID,LineNumber,ObjectID,Reads,RowCounts,Writes,IsSystem,ObjectName,
LoginName,ObjectType,StartTime,HostName,EventSequence,'
),
trace_column
as(
	select 
		*
		,trace_column_list_xml = 
								CAST(
										'<V><![CDATA[' 
													+ REPLACE(
														REPLACE(
																REPLACE(
																			trace_column_list,CHAR(10),']]></V><V><![CDATA['
																		),@field_separator,']]></V><V><![CDATA['
																),CHAR(13),']]></V><V><![CDATA['
															) 
										+ ']]></V>'
									as xml
								)
	from trace_event
	where is_trace = 1
)
,data
as(
	select 
		trace_column = T.C.value('(./text())[1]','sysname')
		,event_name
	from trace_column AS a
		CROSS APPLY trace_column_list_xml.nodes('./V') AS T(C)
)
INSERT INTO #trace_event select 
	trace_event_id = ev.trace_event_id
	,trace_column_id = col.trace_column_id
	,a.event_name
	,trace_column_name = a.trace_column
from data as a
	inner join sys.trace_columns as col
	on a.trace_column = col.name
	inner join sys.trace_events as ev
	on a.event_name = ev.name
where col.trace_column_id is not null
order by ev.trace_event_id
;

--select * from #trace_event

---private variables
select @trace_id = 0
	,@do = 1
	,@loop = @@ROWCOUNT
	,@trace_event_id = 0
	,@trace_column_id = 0
	,@return_code = 0
	,@return_decription = ''
;

--create trace
exec @return_code = sys.sp_trace_create @traceid = @trace_id OUTPUT 
										, @options = 2 
										, @tracefile = @trace_file
										, @maxfilesize = @max_files_size
										, @stoptime = @stop_time
										, @filecount = @file_count
;

select 
	trace_id = @trace_id
	,[current_time] = getdate()
	,[stop_time] = @stop_time
;

set
	@return_decription = case @return_code when 0 then 'No error.' when 1 then 'Unknown error.' when 10 then 'Invalid options. Returned when options specified are incompatible.' 
when 12 then 'File not created.' when 13 then 'Out of memory. Returned when there is not enough memory to perform the specified action.' when 14 then 'Invalid stop time. Returned 
when the stop time specified has already happened.' when 15 then 'Invalid parameters. Returned when the user supplied incompatible parameters.' else ''
							end
;

raiserror('Trace create with:
%s',10,1,@return_decription) with nowait

--loop set trace event & event column
while @do <= @loop
begin
	select top 1 @trace_event_id = trace_event_id
		,@trace_column_id = trace_column_id
	from #trace_event
	where id = @do
	;
	
	--set trace event
	exec sys.sp_trace_setevent @trace_id, @trace_event_id, @trace_column_id, 1
	raiserror('exec sys.sp_trace_setevent @trace_id, %d, %d, 1',10,1,@trace_event_id,@trace_column_id) with nowait
	
	set @do = @do + 1;
end

--CPU >= 500/ cpu columnid = 18
IF @int_filter_cpu IS NOT NULL
	EXEC sys.sp_trace_setfilter @trace_id, 18, 0, 4, @int_filter_cpu

--duration filter/ duration columnid=13
IF @int_filter_duration IS NOT NULL
	EXEC sys.sp_trace_setfilter @trace_id, 13, 0, 4, @int_filter_duration

--spid filter/ spid columnid=12
IF @int_filter_spid IS NOT NULL
	exec sys.sp_trace_setfilter @trace_id, 12, 0, 0, @int_filter_spid


--applicationName not like 'SQL Server Profiler%'
EXEC sys.sp_trace_setfilter @trace_id, 10, 0, 7, N'SQL Server Profiler%'

-- Start Trace (status 1 = start)
EXEC sys.sp_trace_setstatus @trace_id, @set_trace_status
GO


其中输入参数表达的含义解释如下:

@trace_folder:Trace文件存放的位置

@max_files_size:每一个Trace文件大小

@file_count:Trace滚动最多的文件数量

@stop_time:Trace停止的时间

@int_filter_cpu:CPU过滤阈值,CPU使用率超过这个值会被记录下来,单位毫秒

@int_filter_duration:执行时间过滤阈值,执行时间超过这个值会被记录,单位毫秒

@set_trace_status:Trace的状态:0停止;1启动;2删除

SMO编程创建

SQL Profiler除了使用图形化界面创建,使用系统存储过程创建两种方法以外,还可以使用SMO编程方法来创建。

SQL Audit

使用SQL Audit实现SQL Server审计日志功能需要以下三个步骤来完成:

  • 创建实例级别的Audit并启动
  • 创建数据库级别的Audit Specification
  • 读取审计日志文件

创建实例级别Audit

使用Create Server Audit语句创建实例级别的Audit,方法如下:

USE [master]
GO CREATE SERVER AUDIT [Audit_Svr_User_Defined_for_Testing]
TO FILE 
( FILEPATH = N'C:\Temp\Audit'
 ,MAXSIZE = 10 MB
 ,MAX_ROLLOVER_FILES = 10
 ,RESERVE_DISK_SPACE = OFF
)
WITH
( QUEUE_DELAY = 1000
 ,ON_FAILURE = CONTINUE
)
GO 

启动实例级别的Audit,代码如下

USE [master]
GO ALTER SERVER AUDIT [Audit_Svr_User_Defined_for_Testing] WITH(STATE=ON)
;
GO

创建数据库级别Audit Specification

实例级别Audit创建完毕后,接下来是对需要审计的数据库建立对于的Audit Specification,方法如下:

USE [testdb]
GO CREATE DATABASE AUDIT SPECIFICATION [Audit_Spec_for_TestDB]
FOR SERVER AUDIT [Audit_Svr_User_Defined_for_Testing] 
ADD (SELECT, INSERT, UPDATE, DELETE, EXECUTE ON DATABASE::[testdb] BY [public])
WITH (STATE = ON);
GO

由于SQL Audit Specification是基于数据库级别的,所以存在以下场景的维护性复杂度增加:

  • 用户需要审计实例中某些或者所有数据库,必须在每个需要审计的数据库下创建对象
  • 用户实例有新数据库创建,并需要审计日志功能时,必须在新的数据库下创建对象


读取审计日志文件

最后,我们需要将审计日志文件中存放的内容读取出来,使用SQL Server提供的系统函数sys.fn_get_audit_file,方法如下:

DECLARE 
	@AuditFilePath sysname
; SELECT 
 @AuditFilePath = audit_file_path
FROM sys.dm_server_audit_status
WHERE name = 'Audit_Svr_User_Defined_for_Testing' SELECT statement,* 
FROM sys.fn_get_audit_file(@AuditFilePath,default,default)
; 

Extended Event

微软SQL Server产品长期的规划是逐渐使用Extended Event来替换SQL Profiler工具,因为Extended Event更加轻量级,性能消耗比SQL Profiler大幅降低,因此对用户系统性能影响也大幅减轻。在审计日志的应用场景中,只需要在实例级别创建一个Extended Event Session对象,然后启用即可。既满足了功能性的需求,又能够做到很好后期维护,不需要为某一个数据库创建相应对象,对实例的性能消耗大幅降低到5%左右。

创建Extended Event Session

使用Create Event Session On Server语句创建基于实例级别的Extended Event。语句如下:

USE master GO CREATE EVENT SESSION [svrXEvent_User_Define_Testing] ON SERVER ADD EVENT sqlserver.sql_statement_completed
( 
	ACTION 
	( 
		sqlserver.database_id,
		sqlserver.database_name,
		sqlserver.session_id, 
		sqlserver.username, 
		sqlserver.client_hostname,
		sqlserver.client_app_name,
		sqlserver.sql_text, 
		sqlserver.query_hash,
		sqlserver.query_plan_hash,
		sqlserver.plan_handle,
		sqlserver.tsql_stack,
		sqlserver.is_system,
		package0.collect_system_time
	) 
	WHERE sqlserver.username <> N'NT AUTHORITY\SYSTEM' AND sqlserver.username <> 'sa' AND (NOT sqlserver.like_i_sql_unicode_string(sqlserver.client_app_name, '%IntelliSense'))
		AND sqlserver.is_system = 0
		
)
ADD TARGET package0.asynchronous_file_target
( 
	SET 
		FILENAME = N'C:\Temp\svrXEvent_User_Define_Testing.xel', 
		MAX_FILE_SIZE = 10,
		MAX_ROLLOVER_FILES = 100
)
WITH (
	EVENT_RETENTION_MODE = NO_EVENT_LOSS,
	MAX_DISPATCH_LATENCY = 5 SECONDS
);
GO

启用Extended Event Session

Extended Event Session对象创建完毕后,需要启动这个session对象,方法如下:

USE master GO -- We need to enable event session to capture event and event data  ALTER EVENT SESSION [svrXEvent_User_Define_Testing]
ON SERVER STATE = START;
GO

读取审计日志文件

Extend Event生成审计日志文件以后,我们可以使用sys.fn_xe_file_target_read_file系统函数来读取,然后分析event_data列所记录的详细信息。

USE master GO SELECT *
FROM sys.fn_xe_file_target_read_file('C:\Temp\svrXEvent_User_Define_Testing*.xel', null, null, null)

方案对比

根据前面章节“实现审计日志的方法”部分的介绍,我们从可靠性、对象级别、可维护性、开销和对数据库系统的影响五个方面来总结这四种技术的优缺点。

  • 可靠性:这四种实现审计日志的方法可靠性都有保障,如果使用数字化衡量可维护性,得满分100分
  • 对象级别:SQL Profiler和Extended Event是基于实例级别的技术方案;解析事务日志解析和SQL Audit方法是基于数据库级别的技术,一旦有数据库创建或者删除操作,需要做相应的适配,所以维护成本也相对高。基于数据库级别的方案得分为0,基于实例级别得分为100
  • 维护性:基于实例级别的实现方法可维护性(得分100)显然优于基于数据库级别(得分为0)的实现方式
  • 开销:SQL Profiler对数据库系统开销很大,大概20%左右(得分100 - 20 = 80),其他三种开销较小5%左右(得分100 - 5 = 95)
  • 影响:开销大的技术方案自然影响就大,反之亦然。得分与开销部分类似。

四种技术方案优缺点汇总如下表所示:

05.png

以下是对四种实现审计日志方法五个维度打分,得分统计汇总如下表所示:

06.png

将汇总得分做成雷达图,如下图所示:

07.png

从雷达图我们可以很清楚的看到,综合考虑可靠性、可维护性、系统开销和影响来看,使用Extended Event实现审计日志的方法是最优的选择。

最后总结

本期分享了SQL Server实现审计日志功能的四种技术方案和详细实现,并从可靠性、可维护性、对象级别、系统开销和影响五个维度分析了四种方案各自的优缺点,最后的结论是使用Extended Event实现审计日志方法是最优选择,以此来为我们的产品化做出正确的选择。

相关文章
|
21天前
|
存储 缓存 关系型数据库
MySQL事务日志-Redo Log工作原理分析
事务的隔离性和原子性分别通过锁和事务日志实现,而持久性则依赖于事务日志中的`Redo Log`。在MySQL中,`Redo Log`确保已提交事务的数据能持久保存,即使系统崩溃也能通过重做日志恢复数据。其工作原理是记录数据在内存中的更改,待事务提交时写入磁盘。此外,`Redo Log`采用简单的物理日志格式和高效的顺序IO,确保快速提交。通过不同的落盘策略,可在性能和安全性之间做出权衡。
1585 14
|
22天前
|
存储 消息中间件 大数据
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
23 4
|
23天前
|
SQL 分布式计算 Hadoop
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
38 2
|
2月前
|
缓存 监控 算法
分析慢日志文件来优化 PHP 脚本的性能
分析慢日志文件来优化 PHP 脚本的性能
08-06-06>pe_xscan 精简log分析代码 速度提升一倍
08-06-06>pe_xscan 精简log分析代码 速度提升一倍
|
3月前
|
存储 分布式计算 大数据
【Flume的大数据之旅】探索Flume如何成为大数据分析的得力助手,从日志收集到实时处理一网打尽!
【8月更文挑战第24天】Apache Flume是一款高效可靠的数据收集系统,专为Hadoop环境设计。它能在数据产生端与分析/存储端间搭建桥梁,适用于日志收集、数据集成、实时处理及数据备份等多种场景。通过监控不同来源的日志文件并将数据标准化后传输至Hadoop等平台,Flume支持了性能监控、数据分析等多种需求。此外,它还能与Apache Storm或Flink等实时处理框架集成,实现数据的即时分析。下面展示了一个简单的Flume配置示例,说明如何将日志数据导入HDFS进行存储。总之,Flume凭借其灵活性和强大的集成能力,在大数据处理流程中占据了重要地位。
67 3
|
3月前
|
应用服务中间件 Linux nginx
在Linux中,如何统计ip访问情况?分析 nginx 访问日志?如何找出访问页面数量在前十位的ip?
在Linux中,如何统计ip访问情况?分析 nginx 访问日志?如何找出访问页面数量在前十位的ip?
|
2月前
|
SQL 安全 数据库
基于SQL Server事务日志的数据库恢复技术及实战代码详解
基于事务日志的数据库恢复技术是SQL Server中一个非常强大的功能,它能够帮助数据库管理员在数据丢失或损坏的情况下,有效地恢复数据。通过定期备份数据库和事务日志,并在需要时按照正确的步骤恢复,可以最大限度地减少数据丢失的风险。需要注意的是,恢复数据是一个需要谨慎操作的过程,建议在执行恢复操作之前,详细了解相关的操作步骤和注意事项,以确保数据的安全和完整。
96 0
|
3月前
|
存储 消息中间件 监控
Java日志详解:日志级别,优先级、配置文件、常见日志管理系统ELK、日志收集分析
Java日志详解:日志级别,优先级、配置文件、常见日志管理系统、日志收集分析。日志级别从小到大的关系(优先级从低到高): ALL < TRACE < DEBUG < INFO < WARN < ERROR < FATAL < OFF 低级别的会输出高级别的信息,高级别的不会输出低级别的信息
|
3月前
|
算法 关系型数据库 程序员
第一周算法设计与分析:A : log2(N)
这篇文章介绍了解决算法问题"输入一个数N,输出log2N(向下取整)"的三种编程思路,包括使用对数函数和幂函数的转换方法,以及避免浮点数精度问题的整数逼近方法。