数据审计是一个跟踪表内容随时间变化的系统,在现在安全合规方面数据审计是必须要的功能之一。PostgreSQL作为一个强大现代的开源关系数据库,也有一个相关插件PGAudit可以提供审计功能。
关于PGAudit插件以后有机会可以详细介绍,本文我们介绍一个简单SQL语句实现的数据集审计功能。
概述
最终实现效果为:
创建一个示例表:
createextensionsupa_auditcascade;
createtablepublic.account(
idintprimarykey,
nametextnotnull
);
启用审计:
selectaudit.enable_tracking('public.account'::regclass);
增改删操作:
insertintopublic.account(id, name)
values (1, 'Chongchong');
updatepublic.accountsetname='CC'whereid=1;
deletefrompublic.accountwhereid=1;
清空表:
truncatetablepublic.account;
查看审计日志:
select*fromaudit.record_history
请注意,record_id和old_record_id在更新行时保持不变,这样就可以轻松查询单行的历史记录。
要关闭审计追踪,只需执行:
selectaudit.disable_tracking('public.account'::regclass);
实现
首先创建一个名为audit schema为审计用:
createschemaifnotexistsaudit;
记录存储
接下来,需要一个表来跟踪插入、更新和删除。
传统上,使用audit schema并附加了一些元数据列,如提交的时间戳。
该解决方案存在一些维护挑战:
- 对表启用审计需要数据库迁移
- 当源表的模式改变时,审计表的模式也必须改变
为此使用PostgreSQL的无模式JSONB数据类型将每条记录的数据存储在单个列中的。这种方法的另一个好处是允许将多个表的审计历史存储在一个审计表中。
createtableaudit.record_version(
idbigserialprimarykey,
record_iduuid,
old_record_iduuid,
opvarchar(8) notnull,
tstimestamptznotnulldefaultnow(),
table_oidoidnotnull,
table_schemanamenotnull,
table_namenamenotnull,
recordjsonb,
old_recordjsonb
);
查询和索引
查询性能很重要,如果不能快速查询日志,则该审计日志没有多大实际意义。为了提高查询的性能,需要对最常用的查询涉及字段创建索引。
时间范围内查询
对于时间范围,需要一个索引ts。 由于审计表仅用于插入记录,其中ts列插入操作时间,其值ts自然是升序排列。PostgreSQL的内置BRIN索引可以利用值和物理位置之间的相关性来生成一个索引,该索引在规模上比默认值(BTREE索引)小数百倍,并且查找时间更快。
createindexrecord_version_ts
onaudit.record_version
usingbrin(ts);
对于表查询,包含了一个 table_oid跟踪PostgreSQL内部数字表标识符的列。可以为该列添加索引而不是table_schema和 able_name列,最小化索引大小并提供更好的性能。
createindexrecord_version_table_oid
onaudit.record_version
usingbtree(table_oid);
记录唯一标识
将每一行的数据存储为的缺点之一jsonb是基于列值的过滤变得非常低效。如果想快速查找一行的历史记录,需要为每一行提取和索引一个唯一标识符。
对于全局唯一标识符,使用以下结构:
[table_oid, primary_key_value_1, primary_key_value_2, ...]
并将该数组散列为UUID v5以获得有效的可索引UUID类型,以识别对数据更改具有鲁棒性的行。
使用一个实用函数来查找记录的主键列名:
createorreplacefunctionaudit.primary_key_columns(entity_oidoid)
returnstext[]
stable
securitydefiner
languagesql
as$$
--Looksupthenamesofatable's primary key columns
select
coalesce(
array_agg(pa.attname::textorderbypa.attnum),
array[]::text[]
) column_names
from
pg_indexpi
joinpg_attributepa
onpi.indrelid=pa.attrelid
andpa.attnum=any(pi.indkey)
where
indrelid=$1
andindisprimary
$$;
另一个为table_oid和主键,将结果转换为记录的UUID。
createorreplacefunctionaudit.to_record_id(
entity_oidoid,
pkey_colstext[],
recjsonb
)
returnsuuid
stable
languagesql
as$$
select
case
whenrecisnullthennull
--ifnoprimarykeyexists, usearandomuuid
whenpkey_cols=array[]::text[] thenuuid_generate_v4()
else (
select
uuid_generate_v5(
'fd62bc3d-8d6e-43c2-919c-802ba3762271',
(
jsonb_build_array(to_jsonb($1))
||jsonb_agg($3->>key_)
)::text
)
from
unnest($2) x(key_)
)
end
$$;
最后,索引record_id和old_record_id包含这些用于快速查询的唯一标识符的列。
createindexrecord_version_record_id
onaudit.record_version(record_id)
whererecord_idisnotnull;
createindexrecord_version_old_record_id
onaudit.record_version(record_id)
whereold_record_idisnotnull;
触发器
为了让审计功能真正起作用,需要在最终用户不对其事务进行任何更改的情况下插入记录给审计表。为此,设置一个触发器在数据更改时触发,为每个插入/更新/删除的行为触发一次触发器。
createorreplacefunctionaudit.insert_update_delete_trigger()
returnstrigger
securitydefiner
languageplpgsql
as$$
declare
pkey_colstext[] =audit.primary_key_columns(TG_RELID);
record_jsonbjsonb=to_jsonb(new);
record_iduuid=audit.to_record_id(TG_RELID, pkey_cols, record_jsonb);
old_record_jsonbjsonb=to_jsonb(old);
old_record_iduuid=audit.to_record_id(TG_RELID, pkey_cols, old_record_jsonb);
begin
insertintoaudit.record_version(
record_id,
old_record_id,
op,
table_oid,
table_schema,
table_name,
record,
old_record
)
select
record_id,
old_record_id,
TG_OP,
TG_RELID,
TG_TABLE_SCHEMA,
TG_TABLE_NAME,
record_jsonb,
old_record_jsonb;
returncoalesce(new, old);
end;
$$;
API
将公开的用于对表启用审计的API:
selectaudit.enable_tracking('<schema>.<table>'::regclass);
禁用跟踪:
selectaudit.disable_tracking('<schema>.<table>'::regclass);
这些函数根据请求由表注册审计触发器:
createorreplacefunctionaudit.enable_tracking(regclass)
returnsvoid
volatile
securitydefiner
languageplpgsql
as$$
declare
statement_rowtext=format('
createtriggeraudit_i_u_d
beforeinsertorupdateordelete
on%I
foreachrow
executeprocedureaudit.insert_update_delete_trigger();',
$1
);
pkey_colstext[] =audit.primary_key_columns($1);
begin
ifpkey_cols=array[]::text[] then
raiseexception'Table % can not be audited because it has no primary key', $1;
endif;
ifnotexists(select1frompg_triggerwheretgrelid=$1andtgname='audit_i_u_d') then
executestatement_row;
endif;
end;
$$;
createorreplacefunctionaudit.disable_tracking(regclass)
returnsvoid
volatile
securitydefiner
languageplpgsql
as$$
declare
statement_rowtext=format(
'drop trigger if exists audit_i_u_d on %I;',
$1
);
begin
executestatement_row;
end;
$$;
性能开销
开启审计表后会降低插入、更新和删除的吞吐量。但是在吞吐量低于每秒1000次写入的情况下,其开销通常可以忽略不计。对于写入频率较高的表,建议使用pgAudit。
总结
通过简单纯sql语句就实现了Postgresql数据库的安全审计,总体上算起来实现才150行sql语句。大家可以自己手动尝试一下,主要是搞清楚其原理,如果生产环境中有需求还是建议用pgAudit。
文章来源网络,作者:运维,如若转载,请注明出处:https://shuyeidc.com/wp/275210.html<





