PostgreSQL Fine-Grained Table,Column,Row Level Audit

本文涉及的产品
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
简介:
通过配置用户级或数据库级的参数可以实现用户以及数据库级别的审计, 但是这样的粒度可能还是太粗糙了.
如果需要更细致的审计, 例如针对某些表的操作审计, 某些用户对某些表的审计, 或者仅仅当某个列的值发生变化时才被审计(记录到LOG或表里面, 本文的例子是将审计信息输出到LOG, 使用raise).
这样的需求可以通过触发器来实现.
接下来以PostgreSQL 9.2为例进行讲解.
# 基础的参数配置
log_destination = 'csvlog'
logging_collector = on
log_directory = 'pg_log'
log_filename = 'postgresql-%Y-%m-%d_%H%M%S.log'
log_file_mode = 0600
log_truncate_on_rotation = on
log_rotation_age = 1d
log_rotation_size = 10MB
log_connections = on
log_error_verbosity = verbose
log_timezone = 'PRC'
log_statement = 'none'
log_min_duration_statement = -1

# 创建测试表 : 
digoal=> create table user_account_kb(id int, info text, balance numeric, crt_time timestamp, mod_time timestamp);
CREATE TABLE

# 插入测试数据 : 
digoal=> insert into user_account_kb select generate_series(1,10),'test',trunc(100*random()),now(),null;
INSERT 0 10
digoal=> select * from user_account_kb ;
 id | info | balance |          crt_time          | mod_time 
----+------+---------+----------------------------+----------
  1 | test |      66 | 2013-03-20 10:08:15.969523 | 
  2 | test |      50 | 2013-03-20 10:08:15.969523 | 
  3 | test |      95 | 2013-03-20 10:08:15.969523 | 
  4 | test |      90 | 2013-03-20 10:08:15.969523 | 
  5 | test |      50 | 2013-03-20 10:08:15.969523 | 
  6 | test |      12 | 2013-03-20 10:08:15.969523 | 
  7 | test |      39 | 2013-03-20 10:08:15.969523 | 
  8 | test |      42 | 2013-03-20 10:08:15.969523 | 
  9 | test |       6 | 2013-03-20 10:08:15.969523 | 
 10 | test |      11 | 2013-03-20 10:08:15.969523 | 
(10 rows)

【审计场景1】
1. 审计某个表的insert, update, delete, truncate语句.
使用after for each statement触发器.
# 创建触发器函数
digoal=> create or replace function trace_statement() returns trigger as $$
declare
  v_user name;
  v_db name;
  v_query text;
begin
  select current_user, current_database(), current_query() into v_user, v_db, v_query;
  raise warning 'user:%, db:%, query:%', v_user, v_db, v_query;
  return null;
end;
$$ language plpgsql;

# 创建触发器
digoal=> create trigger tg1 after insert or update or delete or truncate on user_account_kb for each statement execute procedure trace_statement();
CREATE TRIGGER

# 测试插入
digoal=> insert into user_account_kb values(11,'test',100,now(),null);
WARNING:  user:digoal, db:digoal, query:insert into user_account_kb values(11,'test',100,now(),null);
INSERT 0 1
digoal=> select * from user_account_kb where id=11;
 id | info | balance |          crt_time          | mod_time 
----+------+---------+----------------------------+----------
 11 | test |     100 | 2013-03-20 10:18:02.495836 | 
(1 row)

# 测试更新
digoal=> update user_account_kb set info='new' where id=11;
WARNING:  user:digoal, db:digoal, query:update user_account_kb set info='new' where id=11;
UPDATE 1
digoal=> select * from user_account_kb where id=11;
 id | info | balance |          crt_time          | mod_time 
----+------+---------+----------------------------+----------
 11 | new  |     100 | 2013-03-20 10:18:02.495836 | 
(1 row)

# 测试删除
digoal=> delete from user_account_kb where id=11;
WARNING:  user:digoal, db:digoal, query:delete from user_account_kb where id=11;
DELETE 1

# 测试truncate
digoal=> begin;
BEGIN
digoal=> truncate user_account_kb ;
WARNING:  user:digoal, db:digoal, query:truncate user_account_kb ;
TRUNCATE TABLE
digoal=> rollback;
ROLLBACK

# 注意回滚的操作不会被记录. 即使log_statement = 'ddl', 所以rollback没有被记录下来.
# 这是个弊端. 需要注意. 希望未来的PostgreSQL版本加以改进. 现在的解决办法是修正触发器的触发点, 小结部分会提到.
# 以上操作的日志输出如下.
2013-03-20 10:18:02.496 CST,"digoal","digoal",4521,"[local]",51491867.11a9,9,"INSERT",2013-03-20 10:01:11 CST,1/229,3355,WARNING,01000,"user:digoal, db:digoal, query:insert into user_account_kb values(11,'test',100,now(),null);",,,,,,,,"exec_stmt_raise, pl_exec.c:2840","psql"
2013-03-20 10:19:42.980 CST,"digoal","digoal",4521,"[local]",51491867.11a9,10,"UPDATE",2013-03-20 10:01:11 CST,1/233,3356,WARNING,01000,"user:digoal, db:digoal, query:update user_account_kb set info='new' where id=11;",,,,,,,,"exec_stmt_raise, pl_exec.c:2840","psql"
2013-03-20 10:19:53.612 CST,"digoal","digoal",4521,"[local]",51491867.11a9,11,"DELETE",2013-03-20 10:01:11 CST,1/236,3357,WARNING,01000,"user:digoal, db:digoal, query:delete from user_account_kb where id=11;",,,,,,,,"exec_stmt_raise, pl_exec.c:2840","psql"
2013-03-20 10:20:18.361 CST,"digoal","digoal",4521,"[local]",51491867.11a9,12,"TRUNCATE TABLE",2013-03-20 10:01:11 CST,1/237,3358,WARNING,01000,"user:digoal, db:digoal, query:truncate user_account_kb ;",,,,,,,,"exec_stmt_raise, pl_exec.c:2840","psql"

【审计场景2】
2. 按用户审计某个表的insert, update, delete, truncate语句.
使用after for each statement  when (current_user='') 触发器.
# 删除前面用到的触发器
digoal=> drop trigger tg1 on user_account_kb;
DROP TRIGGER

# 创建触发器, 这次带上when条件
digoal=> create trigger tg1 after insert or update or delete or truncate on user_account_kb for each statement when (current_user='digoal') execute procedure trace_statement();
CREATE TRIGGER

# 测试digoal用户的操作
digoal=> update user_account_kb set info='new' where id=11;
WARNING:  user:digoal, db:digoal, query:update user_account_kb set info='new' where id=11;
UPDATE 0

# 测试其他用户的操作, 不被审计
digoal=> \c digoal postgres
You are now connected to database "digoal" as user "postgres".
digoal=# update digoal.user_account_kb set info='new' where id=11;
UPDATE 0

【审计场景3】
3. 按条件审计某个表的insert, update, delete语句.
使用after for each row  when (new.balance <> old.balance) 触发器.
# 删除前面用到的触发器
digoal=> drop trigger tg1 on user_account_kb;
DROP TRIGGER

# 新建触发器函数
digoal=> create or replace function trace_row() returns trigger as $$                                                            
declare
  v_user name;
  v_db name;
  v_query text;
begin
select current_user, current_database(), current_query() into v_user, v_db, v_query;
case TG_OP
  when 'UPDATE' then
    raise warning 'user:%, db:%, query:%, newdata:%, olddata:%', v_user, v_db, v_query, NEW, OLD;
  when 'INSERT' then
    raise warning 'user:%, db:%, query:%, newdata:%', v_user, v_db, v_query, NEW;
  when 'DELETE' then
    raise warning 'user:%, db:%, query:%, olddata:%', v_user, v_db, v_query, OLD;
  else
    null;
end case;
return null;
end;
$$ language plpgsql;
CREATE FUNCTION

# 新建触发器
digoal=> create trigger tg1 after insert or update or delete on user_account_kb for each row execute procedure trace_row();
CREATE TRIGGER

# 测试插入
digoal=> insert into user_account_kb select * from user_account_kb limit 3;
WARNING:  user:digoal, db:digoal, query:insert into user_account_kb select * from user_account_kb limit 3;, newdata:(1,test,66,"2013-03-20 10:08:15.969523",)
WARNING:  user:digoal, db:digoal, query:insert into user_account_kb select * from user_account_kb limit 3;, newdata:(2,test,50,"2013-03-20 10:08:15.969523",)
WARNING:  user:digoal, db:digoal, query:insert into user_account_kb select * from user_account_kb limit 3;, newdata:(3,test,95,"2013-03-20 10:08:15.969523",)
INSERT 0 3

# 测试更新
digoal=> update user_account_kb set info='new' where id<3;
WARNING:  user:digoal, db:digoal, query:update user_account_kb set info='new' where id<3;, newdata:(1,new,66,"2013-03-20 10:08:15.969523",), olddata:(1,new,66,"2013-03-20 10:08:15.969523",)
WARNING:  user:digoal, db:digoal, query:update user_account_kb set info='new' where id<3;, newdata:(2,new,50,"2013-03-20 10:08:15.969523",), olddata:(2,new,50,"2013-03-20 10:08:15.969523",)
WARNING:  user:digoal, db:digoal, query:update user_account_kb set info='new' where id<3;, newdata:(1,new,66,"2013-03-20 10:08:15.969523",), olddata:(1,new,66,"2013-03-20 10:08:15.969523",)
WARNING:  user:digoal, db:digoal, query:update user_account_kb set info='new' where id<3;, newdata:(2,new,50,"2013-03-20 10:08:15.969523",), olddata:(2,new,50,"2013-03-20 10:08:15.969523",)
UPDATE 4

# 测试删除
digoal=> delete from user_account_kb where id<3;
WARNING:  user:digoal, db:digoal, query:delete from user_account_kb where id<3;, olddata:(1,new,66,"2013-03-20 10:08:15.969523",)
WARNING:  user:digoal, db:digoal, query:delete from user_account_kb where id<3;, olddata:(2,new,50,"2013-03-20 10:08:15.969523",)
WARNING:  user:digoal, db:digoal, query:delete from user_account_kb where id<3;, olddata:(1,new,66,"2013-03-20 10:08:15.969523",)
WARNING:  user:digoal, db:digoal, query:delete from user_account_kb where id<3;, olddata:(2,new,50,"2013-03-20 10:08:15.969523",)
DELETE 4


【审计场景4】
# 基于列的判断审计
# 删除前面用到的触发器
digoal=> drop trigger tg1 on user_account_kb;
DROP TRIGGER

# 创建触发器, 这里用到when条件, 只有当balance变化时才审计
digoal=> create trigger tg1 after update on user_account_kb for each row when (new.balance<>old.balance) execute procedure trace_row();
CREATE TRIGGER

# 测试
digoal=> update user_account_kb set info='new' where id=4;
UPDATE 1
digoal=> update user_account_kb set info='new',balance=balance where id=4;
UPDATE 1

# balance变化才审计
digoal=> update user_account_kb set info='new',balance=balance-1 where id=4;
WARNING:  user:digoal, db:digoal, query:update user_account_kb set info='new',balance=balance-1 where id=4;, newdata:(4,new,89,"2013-03-20 10:08:15.969523",), olddata:(4,new,90,"2013-03-20 10:08:15.969523",)
UPDATE 1

【小结】
1. 前面提到ROLLBACK等事务相关的SQL不会被审计到, 所以当SQL执行失败时, LOG已经记录了, 但是没有记录回滚的动作, 所以信息是不完整的, 除非从XLOG/CLOG中取出对应的XID是提交还是回滚. 
为了使记录在LOG中的语句一定是提交的, 那么需要调整一下触发器的创建方法, 使得回滚的事务中所有的SQL都不被审计.
如下,
触发器只有在提交时才会触发, 回滚不触发. (使用constraint来创建触发器)
digoal=> create constraint trigger tg1 after update on user_account_kb DEFERRABLE INITIALLY deferred for each row when (new.balance<>old.balance) execute procedure trace_row();
CREATE TRIGGER
digoal=> begin;
BEGIN
digoal=> update user_account_kb set balance=balance+1 where id=1;
UPDATE 0
digoal=> update user_account_kb set balance=balance+1 where id=4;
UPDATE 1
digoal=> end;
WARNING:  user:digoal, db:digoal, query:end;, newdata:(4,new,90,"2013-03-20 10:08:15.969523",), olddata:(4,new,89,"2013-03-20 10:08:15.969523",)
COMMIT
digoal=> begin;
BEGIN
digoal=> update user_account_kb set balance=balance+1 where id=4;
UPDATE 1
digoal=> rollback;
ROLLBACK

注意以上方法只有after ... for each row才能被用到.

When the CONSTRAINT option is specified, this command creates a constraint trigger. This is the same as a regular trigger except that the timing of the trigger firing can be adjusted using SET CONSTRAINTS. 

Constraint triggers must be AFTER ROW triggers. They can be fired either at the end of the statement causing the triggering event, or at the end of the containing transaction; in the latter case they are said to be deferred. 

A pending deferred-trigger firing can also be forced to happen immediately by using SET CONSTRAINTS. 

Constraint triggers are expected to raise an exception when the constraints they implement are violated.

【参考】 6.  http://www.postgresql.org/docs/9.2/static/sql-createtrigger.html
相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
关系型数据库 数据库 PostgreSQL
PostgreSQL 内存表可选项 - unlogged table
标签 PostgreSQL , 内存表 , unlogged table 背景 内存表,通常被用于不需要持久化,变更频繁,访问RT低的场景。 目前社区版本PostgreSQL没有内存表的功能,postgrespro提供了两个插件可以实现类似内存表的功能。
3410 0
|
3月前
|
SQL 关系型数据库 数据库
PostgreSQL数据库报错 ERROR: multiple default values specified for column "" of table "" 如何解决?
PostgreSQL数据库报错 ERROR: multiple default values specified for column "" of table "" 如何解决?
345 59
|
2月前
|
SQL 关系型数据库 数据库
postgresql报:ERROR: column “i“ of relation “test“ does not exist LINE 1: UPDATE怎么解决?
解决“ERROR: column "i" of relation "test" does not exist”错误的关键在于核实列名的准确性,修正更新语句,确保列名的引用正确无误,并考虑到任何可能影响列名引用的表别名、大小写、特殊字符或动态SQL生成等因素。通过上述步骤,你应该能有效定位并解决问题,保证SQL语句的正确执行。
379 0
|
SQL 弹性计算 算法
PostgreSQL 普通表在线转换为分区表 - online exchange to partition table
标签 PostgreSQL , 分区表 , 在线转换 背景 非分区表,如何在线(不影响业务)转换为分区表? 方法1,pg_pathman分区插件 《PostgreSQL 9.5+ 高效分区表实现 - pg_pathman》 使用非堵塞式的迁移接口 partition_table_concurrently( relation REGCLASS,
2758 0
|
关系型数据库 数据库 PostgreSQL
PostgreSQL分区表(Table Partitioning)应用
一、简介   在数据库日渐庞大的今天,为了方便对数据库数据的管理,比如按时间,按地区去统计一些数据时,基数过于庞大,多有不便。
1813 0
|
SQL 算法 关系型数据库
PostgreSQL 普通表在线转换为分区表 - online exchange to partition table
PostgreSQL 普通表在线转换为分区表 - online exchange to partition table
2798 0
|
SQL 分布式计算 并行计算
PostgreSQL 并行计算解说 之13 - parallel OLAP : 中间结果 parallel with unlogged table
标签 PostgreSQL , cpu 并行 , smp 并行 , 并行计算 , gpu 并行 , 并行过程支持 背景 PostgreSQL 11 优化器已经支持了非常多场合的并行。简单估计,已支持27余种场景的并行计算。 parallel seq scan parallel
647 0
|
SQL 人工智能 分布式计算
PostgreSQL 并行计算解说 之20 - parallel partition table wise join
标签 PostgreSQL , cpu 并行 , smp 并行 , 并行计算 , gpu 并行 , 并行过程支持 背景 PostgreSQL 11 优化器已经支持了非常多场合的并行。简单估计,已支持27余种场景的并行计算。 parallel seq scan
451 0
|
SQL 分布式计算 并行计算
PostgreSQL 并行计算解说 之21 - parallel partition table wise agg
标签 PostgreSQL , cpu 并行 , smp 并行 , 并行计算 , gpu 并行 , 并行过程支持 背景 PostgreSQL 11 优化器已经支持了非常多场合的并行。简单估计,已支持27余种场景的并行计算。 parallel seq scan
290 0