动态输出(ToB海量日志转换业务) - 阿里云HybridDB for PostgreSQL最佳实践

本文涉及的产品
RDS PostgreSQL Serverless,0.5-4RCU 50GB 3个月
推荐场景:
对影评进行热评分析
云原生数据库 PolarDB 分布式版,标准版 2核8GB
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 标签 PostgreSQL , UDF , 动态格式 , format , JOIN , OSS外部表 背景 有一些业务需要将数据归类动态的输出,比如一些公共日志服务,所有用户的日志都被统一的按格式记录到一起,但是每个最终用户关心的字段都不一样,甚至每个用户对数据转换的需求都不一样。

标签

PostgreSQL , UDF , 动态格式 , format , JOIN , OSS外部表


背景

有一些业务需要将数据归类动态的输出,比如一些公共日志服务,所有用户的日志都被统一的按格式记录到一起,但是每个最终用户关心的字段都不一样,甚至每个用户对数据转换的需求都不一样。

pic

比如这个业务:

《日增量万亿+级 实时分析、数据规整 - 阿里云HybridDB for PostgreSQL最佳实践》

一、需求

1、可以根据ToB的用户的定义,输出不同的格式。

2、每个ToB的用户,写入到一个文件或多个文件。

3、一个文件不能出现两个用户的内容。

其他需求见:

《日增量万亿+级 实时分析、数据规整 - 阿里云HybridDB for PostgreSQL最佳实践》

二、架构设计

1、采用OSS存储实时载入的海量公共日志。

2、采用HybridDB for PostgreSQLRDS PostgreSQL的OSS外部表接口,直接并行读取OSS的文件。

3、为了防止ToB ID的数据倾斜问题,引入一个时间字段进行截断,采用 "ToB ID+截断时间" 两个字段进行重分布。

为了实现自动生成截断格式,需要起一个任务,自动计算截断格式。并将格式写入一张表。

4、通过HDB PG的窗口函数,按 "ToB ID+截断时间" 强制重分布。

5、通过UDF,将公共日志的格式,按ToB ID对应的UDF转换为对应的格式。

为了实现动态的格式需求,采用UDF,并将ToB ID对应的UDF写入一张表。

6、将转换后的数据,写入OSS。自动按ToB ID切换,绝对保证每个ToB的用户,写入到一个文件或多个文件。一个文件不出现两个用户的内容。

以上功能是阿里云HybridDB for PostgreSQL或RDS PostgreSQL的独有功能。

三、DEMO与性能

这里介绍如何动态的转换数据,其他内容请详见案例:

《日增量万亿+级 实时分析、数据规整 - 阿里云HybridDB for PostgreSQL最佳实践》

1、创建公共日志OSS外部表 - 数据来源

根据domain字段的值,动态格式输出,每个domain写一个或多个文件。不同的domain绝对不能出现在同一个文件中。

create external table demo_source (    
UnixTime text,  -- 这个字段为unixtime,需要使用to_timestamp进行格式输出    
domain  text,    
c1 text,    
c2 text,    
c3 text,    
c4 text,    
c5 text,    
c6 int,    
c7 text,    
c8 text,    
c9 text    
)     
location('oss://xxxxx     
        dir=xxxx/xxx/xxx/xxx id=xxx    
        key=xxx bucket=xxx') FORMAT 'csv' (QUOTE '''' ESCAPE ''''   DELIMITER ',')    
LOG ERRORS SEGMENT REJECT LIMIT 10000    
;    

2、写入一批测试数据,根据对应的CSV格式,将文件写入OSS对应的dir/bucket中。

3、创建公共日志OSS外部表 - 动态格式输出

create WRITABLE external table demo_output (    
rn int8,   -- 输出到OSS时,忽略该字段  
domain_and_key  text,   -- 输出到OSS时,忽略该字段  
Data  text    
)     
location('oss://xxxxx    
        dir=xxxx/xxx/xxx/xxx id=xxx id=xxx    
        key=xxx bucket=xxx distributed_column=domain_and_key') FORMAT 'csv'     
DISTRIBUTED BY (domain_and_key)    
;    

将按domain_and_key字段值的不同,自动将Data字段的数据写入对应的OSS文件。实现数据的重规整。(domain_and_key字段不写入OSS文件。)

这是阿里云HybridDB for PG的定制功能。

4、创建domain的时间格式化表。

create table domain_tsfmt(    
  domain text,    
  tsfmt text    
);    

根据domain的count(*),即数据量的多少,决定使用的时间截断格式,(yyyy, yyyymm, yyyymmdd, yyyymmddhh24, yyyymmddhh24miss)

越大的domain,需要越长(yyyymmhh24miss)的截断。

业务方可以来维护这个表,例如一天生成一次。

对于很小的domain,不需要写入这张表,可以采用统一格式,例如0000。

insert into domain_tsfmt values ('domain_a', 'yyyymmhh24');    
insert into domain_tsfmt values ('domain_b', 'yyyymmhh24mi');    

5、创建UDF元信息表,存储每个ToB ID对应的UDF名字

create table domain_udf(domain text, udf name);      

6、创建UDF,需要定制格式的ToB ID,创建对应的UDF

PostgreSQL 例子

建议用format。

create or replace function f1(demo_source) returns text as $$      
  select format('domain: %L , c2: %L , c4: %L', $1.domain, $1.c2, $1.c4);      
$$ language sql strict;      
      
create or replace function f2(demo_source) returns text as $$      
declare      
  res text := format('%L , %L, %L', upper($1.c2), $1.c4, $1.c3);      
begin      
  return res;      
end;      
$$ language plpgsql strict;      

HybridDB for PostgreSQL 例子

create or replace function f1(demo_source) returns text as $$      
  select 'domain: '||$1.domain||' , c2: '||$1.c2||' , c4: '||$1.c4;      
$$ language sql strict;      
      
create or replace function f2(demo_source) returns text as $$      
  select upper($1.c2)||' , '||$1.c4||' , '||$1.c3;      
$$ language sql strict;     
    
create or replace function f3(demo_source) returns text as $$      
  select upper($1.c2)||' , '||$1.c4||' , '||$1.c9;      
$$ language sql strict;     

7、创建动态UDF,根据输入,动态调用对应的UDF

create or replace function ff(domain_source, name) returns text as $$      
declare      
  sql text := 'select '||quote_ident($2)||'('||quote_literal($1)||')';      
  res text;      
begin      
  execute sql into res;      
  return res;      
end;      
$$ language plpgsql strict;      
    
-- 由于hdb版本太老,不支持format,不支持record和text互转,不支持quote_literal(record)。    
-- 调整如下    
    
create or replace function ff(domain_source, name) returns text as $$      
declare      
  sql text := 'select '||quote_ident($2)||'($abc_abc_abc$'||textin(record_out($1))||'$abc_abc_abc$)';      
  res text;      
begin      
  execute sql into res;      
  return res;      
end;      
$$ language plpgsql strict;      

8、写入UDF映射,例如1-100的domain,使用F1进行转换,0的ID使用F2进行转换。

insert into domain_udf select 'domain_'||generate_series(1,100), 'f1';      
insert into domain_udf values ('domain_0', 'f2');      

不在这里的DOMAIN,采用默认UDF转换格式,例如f3。

9、根据 "domain + 时间截断" 重分发,根据UDF动态转换查询如下:

select domain_and_key, data from   
(  
select row_number() over (partition by domain||key order by domain,key) as RN, domain||key as domain_and_key, data from    
-- partition by的窗口与第二个字段domain||key(目标表的分布键)保持一致,确保只需要重分发一次  
(    
  select     
    t1.domain,   
    (case when t2.* is null then '0000' else to_char(to_timestamp(t1.UnixTime::text::float8), t2.tsfmt) end) as key,     
    ff(t1, (case when t3.* is null then 'f3' else t3.udf end)) as data  
  from domain_source t1     
    left join domain_tsfmt t2 using (domain)     
    left join domain_udf t3 using (domain)    
) t    
) t  
;    

阿里云HDB FOR PG将自动根据OSS目标外部表中定义的distributed_column参数,当VALUE发生变化时,自动写新文件,从而实现不同的DOMAIN VALUE,写到不同的文件中。

10、将规整后的数据输出到OSS

insert into domain_output    
select domain_and_key, data from   
(  
select row_number() over (partition by domain||key order by domain,key) as RN, domain||key as domain_and_key, data from    
-- partition by的窗口与第二个字段domain||key(目标表的分布键)保持一致,确保只需要重分发一次  
(    
  select     
    t1.domain,   
    (case when t2.* is null then '0000' else to_char(to_timestamp(t1.UnixTime::text::float8), t2.tsfmt) end) as key,     
    ff(t1, (case when t3.* is null then 'f3' else t3.udf end)) as data  
  from domain_source t1     
    left join domain_tsfmt t2 using (domain)     
    left join domain_udf t3 using (domain)    
) t    
) t  
;    
  
执行计划  
只需要重分布一次  
  
                                                               QUERY PLAN                                                                  
-----------------------------------------------------------------------------------------------------------------------------------------  
 Insert (slice0; segments: 3)  (rows=333334 width=64)  
   ->  Subquery Scan t  (cost=375072.68..395072.68 rows=333334 width=64)  
         ->  Window  (cost=375072.68..385072.68 rows=333334 width=96)  
               Partition By: domain_and_key  
               Order By: domain_and_key      
	       -- 有按目标列排序,所以OSS切换文件没有问题  
	       -- 注意同一个domain的不同时间的数据,可能会切成多个文件  
	       -- 如果想让同一个domain的数据,切到一个文件中,那么请使用domain, key作为窗口分组,同时OSS外表的实现上需要修改一下,忽略两个列。  
               ->  Sort  (cost=375072.68..377572.68 rows=333334 width=96)  
                     Sort Key: domain_and_key  
                     ->  Redistribute Motion 3:3  (slice3; segments: 3)  (cost=12.84..86770.34 rows=333334 width=96)  
		     -- 重分布一次。采用窗口函数,强制了重分布  
                           Hash Key: domain_and_key  
                           ->  Subquery Scan t  (cost=12.84..66770.34 rows=333334 width=96)  
                                 ->  Hash Left Join  (cost=12.84..54270.34 rows=333334 width=237)  
                                       Hash Cond: t1.domain = t2.domain  
                                       ->  Hash Left Join  (cost=11.75..24261.75 rows=333334 width=192)  
                                             Hash Cond: t1.domain = t3.domain  
                                             ->  External Scan on domain_source t1  (cost=0.00..11000.00 rows=333334 width=96)  
                                             ->  Hash  (cost=8.00..8.00 rows=100 width=105)  
                                                   ->  Broadcast Motion 3:3  (slice1; segments: 3)  (cost=0.00..8.00 rows=100 width=105)  
                                                         ->  Seq Scan on domain_udf t3  (cost=0.00..4.00 rows=34 width=105)  
                                       ->  Hash  (cost=1.05..1.05 rows=1 width=61)  
                                             ->  Broadcast Motion 3:3  (slice2; segments: 3)  (cost=0.00..1.05 rows=1 width=61)  
                                                   ->  Seq Scan on domain_tsfmt t2  (cost=0.00..1.01 rows=1 width=61)  
(21 rows)  

简化DEMO (不依赖OSS,仅仅演示)

1、创建公共日志表

create table t1 (tid int, c1 text, c2 text, c3 int, c4 timestamp, c5 numeric);      

2、写入一批测试数据

insert into t1 select random()*100, md5(random()::text), 'test', random()*10000, clock_timestamp(), random() from generate_series(1,1000000);      

3、创建目标表

create table t1_output (rn int8, fmt text, data text);    

4、创建UDF元信息表,存储每个ToB ID对应的UDF名字

create table t2(tid int, udf name);      

5、创建UDF,需要定制格式的ToB ID,创建对应的UDF

create or replace function f1(t1) returns text as $$      
  select 'tid: '||$1.tid||' , c2: '||$1.c2||' , c4: '||$1.c4;      
$$ language sql strict;      
      
create or replace function f2(t1) returns text as $$       
  select $1.tid||' , '||upper($1.c2)||' , '||$1.c4||' , '||$1.c3;      
$$ language sql strict;      
    
create or replace function f3(t1) returns text as $$       
  select $1.tid||' , '||upper($1.c2)||' , '||$1.c3||' , '||$1.c3;      
$$ language sql strict;      

默认采用f3()函数。

6、创建动态UDF,根据输入,动态调用对应的UDF

create or replace function ff(t1, name) returns text as $$      
declare      
  sql text := 'select '||quote_ident($2)||'($abc_abc_abc$'||textin(record_out($1))||'$abc_abc_abc$)';      
  res text;      
begin      
  execute sql into res;      
  return res;      
end;      
$$ language plpgsql strict;      

7、写入UDF映射,例如1-10的ID,使用F1进行转换,0的ID使用F2进行转换。

insert into t2 select generate_series(1,10), 'f1';      
insert into t2 values (0, 'f2');      

8、创建格式表

create table t1_fmt (tid int, fmt text);    
    
insert into t1_fmt values (1, 'yyyymm');    
insert into t1_fmt values (2, 'yyyy');    

默认采样'0000'的格式。

9、动态转换查询如下:

select tid_key, data from  
(  
select row_number() over (partition by tid||key order by key) as RN, tid||key as tid_key, data from    
(    
  select     
    t1.tid,     
    (case when t2.* is null then '0000' else to_char(t1.c4, t2.fmt) end) as key,   
    ff(t1, (case when t3.* is null then 'f3' else t3.udf end)) as data  
  from t1     
    left join t1_fmt t2 using (tid)     
    left join t2 t3 using (tid)    
) t    
) t  
;    

10、将规整后的数据输出到目标表

insert into t1_output    
select tid_key, data from  
(  
select row_number() over (partition by tid||key order by key) as RN, tid||key as tid_key, data from    
(    
  select     
    t1.tid,     
    (case when t2.* is null then '0000' else to_char(t1.c4, t2.fmt) end) as key,   
    ff(t1, (case when t3.* is null then 'f3' else t3.udf end)) as data  
  from t1     
    left join t1_fmt t2 using (tid)     
    left join t2 t3 using (tid)    
) t    
) t    
;    
    
INSERT 0 1000000    
  
  
postgres=# select * from t1_output  limit 100;    
   fmt   |                        data                           
---------+-----------------------------------------------------  
 870000  | 87 , TEST , 7108 , 7108  
 870000  | 87 , TEST , 787 , 787  
 870000  | 87 , TEST , 6748 , 6748  
 870000  | 87 , TEST , 6385 , 6385  
 870000  | 87 , TEST , 5278 , 5278  
 870000  | 87 , TEST , 8132 , 8132  
 870000  | 87 , TEST , 7513 , 7513  
 870000  | 87 , TEST , 2025 , 2025  
 870000  | 87 , TEST , 7322 , 7322  
 870000  | 87 , TEST , 2019 , 2019  
 870000  | 87 , TEST , 6416 , 6416  
 870000  | 87 , TEST , 9959 , 9959  
 870000  | 87 , TEST , 7876 , 7876  
 870000  | 87 , TEST , 5022 , 5022  
.....  
  
写OSS外部表时,根据fmt列的VALUE进行识别,当遇到不同的VALUE就切换文件名,不同FMT VALUE的数据写入不同的文件。  

四、技术点

这里只谈本文涉及的技术点。

1、UDF

PostgreSQL支持多种UDF语言(例如C,plpgsql, sql, plpython, pljava, plv8, ......),用户通过UDF定义需要转换的格式。

2、动态调用

用户通过动态调用,可以动态的调用对应的UDF,在一个请求中生成不同的格式。

五、云端产品

阿里云 RDS PostgreSQL

阿里云 HybridDB for PostgreSQL

阿里云 OSS

六、类似场景、案例

《日增量万亿+级 实时分析、数据规整 - 阿里云HybridDB for PostgreSQL最佳实践》

七、小结

一些公共日志服务,所有用户的日志都被统一的按格式记录到一起,但是每个最终用户关心的字段都不一样,甚至每个用户对数据转换的需求都不一样。

PostgreSQL支持多种UDF语言(例如C,plpgsql, sql, plpython, pljava, plv8, ......),用户通过UDF定义需要转换的格式。

用户通过动态调用,可以动态的调用对应的UDF,在一个请求中生成不同的格式。

八、参考

《日增量万亿+级 实时分析、数据规整 - 阿里云HybridDB for PostgreSQL最佳实践》

相关实践学习
借助OSS搭建在线教育视频课程分享网站
本教程介绍如何基于云服务器ECS和对象存储OSS,搭建一个在线教育视频课程分享网站。
目录
相关文章
|
1月前
|
存储 关系型数据库 数据库
【赵渝强老师】PostgreSQL的服务器日志文件
本文介绍了PostgreSQL数据库的物理存储结构,重点讨论了服务器日志文件。通过`pg_ctl`命令启动PostgreSQL实例时,使用`-l`参数指定日志文件位置,记录数据库启动、运行及关闭过程中的关键信息。附有相关视频讲解和日志文件示例。
|
18天前
|
存储 监控 安全
网络安全视角:从地域到账号的阿里云日志审计实践
日志审计的必要性在于其能够帮助企业和组织落实法律要求,打破信息孤岛和应对安全威胁。选择 SLS 下日志审计应用,一方面是选择国家网络安全专用认证的日志分析产品,另一方面可以快速帮助大型公司统一管理多组地域、多个账号的日志数据。除了在日志服务中存储、查看和分析日志外,还可通过报表分析和告警配置,主动发现潜在的安全威胁,增强云上资产安全。
|
1月前
|
存储 数据采集 监控
阿里云DTS踩坑经验分享系列|SLS同步至ClickHouse集群
作为强大的日志服务引擎,SLS 积累了用户海量的数据。为了实现数据的自由流通,DTS 开发了以 SLS 为源的数据同步插件。目前,该插件已经支持将数据从 SLS 同步到 ClickHouse。通过这条高效的同步链路,客户不仅能够利用 SLS 卓越的数据采集和处理能力,还能够充分发挥 ClickHouse 在数据分析和查询性能方面的优势,帮助企业显著提高数据查询速度,同时有效降低存储成本,从而在数据驱动决策和资源优化配置上取得更大成效。
141 9
|
1月前
|
存储 关系型数据库 数据库
【赵渝强老师】PostgreSQL的WAL预写日志文件
PostgreSQL数据库的物理存储结构包含多种文件,其中WAL(预写日志)用于确保数据完整性和高效恢复。WAL机制允许在不频繁刷新数据至磁盘的情况下,通过先写日志再改数据的方式,减少I/O操作,提高性能。每个WAL文件默认大小为16MB,位于pg_wal目录下,支持手动和自动切换。WAL不仅有助于数据恢复,还能显著降低I/O成本。
|
1月前
|
存储 SQL 关系型数据库
【赵渝强老师】PostgreSQL的运行日志文件
PostgreSQL的物理存储结构包括数据文件、日志文件等。运行日志默认未开启,需配置`postgresql.conf`文件中的相关参数如`log_destination`、`log_directory`等,以记录数据库状态、错误信息等。示例配置中启用了CSV格式日志,便于管理和分析。通过创建表操作,可查看生成的日志文件,了解具体日志内容。
|
3月前
|
XML JSON 监控
告别简陋:Java日志系统的最佳实践
【10月更文挑战第19天】 在Java开发中,`System.out.println()` 是最基本的输出方法,但它在实际项目中往往被认为是不专业和不足够的。本文将探讨为什么在现代Java应用中应该避免使用 `System.out.println()`,并介绍几种更先进的日志解决方案。
85 1
|
3月前
|
SQL 存储 人工智能
阿里云日志服务的傻瓜式极易预测模型
预测服务有助于提前规划,减少资源消耗和成本。阿里云日志服务的AI预测服务简化了数学建模,仅需SQL操作即可预测未来指标,具备高准确性,并能处理远期预测。此外,通过ScheduledSQL功能,可将预测任务自动化,定时执行并保存结果。
103 3
|
5月前
|
存储 消息中间件 人工智能
AI大模型独角兽 MiniMax 基于阿里云数据库 SelectDB 版内核 Apache Doris 升级日志系统,PB 数据秒级查询响应
早期 MiniMax 基于 Grafana Loki 构建了日志系统,在资源消耗、写入性能及系统稳定性上都面临巨大的挑战。为此 MiniMax 开始寻找全新的日志系统方案,并基于阿里云数据库 SelectDB 版内核 Apache Doris 升级了日志系统,新系统已接入 MiniMax 内部所有业务线日志数据,数据规模为 PB 级, 整体可用性达到 99.9% 以上,10 亿级日志数据的检索速度可实现秒级响应。
AI大模型独角兽 MiniMax 基于阿里云数据库 SelectDB 版内核 Apache Doris 升级日志系统,PB 数据秒级查询响应
|
4月前
|
设计模式 SQL 安全
PHP中的设计模式:单例模式的深入探索与实践在PHP的编程实践中,设计模式是解决常见软件设计问题的最佳实践。单例模式作为设计模式中的一种,确保一个类只有一个实例,并提供全局访问点,广泛应用于配置管理、日志记录和测试框架等场景。本文将深入探讨单例模式的原理、实现方式及其在PHP中的应用,帮助开发者更好地理解和运用这一设计模式。
在PHP开发中,单例模式通过确保类仅有一个实例并提供一个全局访问点,有效管理和访问共享资源。本文详细介绍了单例模式的概念、PHP实现方式及应用场景,并通过具体代码示例展示如何在PHP中实现单例模式以及如何在实际项目中正确使用它来优化代码结构和性能。
61 2
|
3月前
|
监控 网络协议 CDN
阿里云国际监控查询流量、用量查询流量与日志统计流量有差异?
阿里云国际监控查询流量、用量查询流量与日志统计流量有差异?

相关产品

  • 云原生数据库 PolarDB
  • 云数据库 RDS PostgreSQL 版