Citus 分布式 PostgreSQL 集群 - SQL Reference(摄取、修改数据 DML)

本文涉及的产品
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
简介: Citus 分布式 PostgreSQL 集群 - SQL Reference(摄取、修改数据 DML)

 插入数据



要将数据插入分布式表,您可以使用标准 PostgreSQL INSERT 命令。例如,我们从 Github 存档数据集中随机选择两行。


  • INSERT


/*
CREATE TABLE github_events
(
  event_id bigint,
  event_type text,
  event_public boolean,
  repo_id bigint,
  payload jsonb,
  repo jsonb,
  actor jsonb,
  org jsonb,
  created_at timestamp
);
*/
INSERT INTO github_events VALUES (2489373118,'PublicEvent','t',24509048,'{}','{"id": 24509048, "url": "https://api.github.com/repos/SabinaS/csee6868", "name": "SabinaS/csee6868"}','{"id": 2955009, "url": "https://api.github.com/users/SabinaS", "login": "SabinaS", "avatar_url": "https://avatars.githubusercontent.com/u/2955009?", "gravatar_id": ""}',NULL,'2015-01-01 00:09:13');
INSERT INTO github_events VALUES (2489368389,'WatchEvent','t',28229924,'{"action": "started"}','{"id": 28229924, "url": "https://api.github.com/repos/inf0rmer/blanket", "name": "inf0rmer/blanket"}','{"id": 1405427, "url": "https://api.github.com/users/tategakibunko", "login": "tategakibunko", "avatar_url": "https://avatars.githubusercontent.com/u/1405427?", "gravatar_id": ""}',NULL,'2015-01-01 00:00:24');


向分布式表中插入行时,必须指定插入行的分布列。根据分布列,Citus 确定插入应该路由到的正确分片。然后,查询被转发到正确的分片,并在该分片的所有副本上执行远程插入命令。


有时将多个 insert 语句放在一个包含多行的单个 insert 中会很方便。它也比重复数据库查询更有效。例如,上一节中的示例可以像这样一次性加载:


INSERT INTO github_events VALUES
  (
    2489373118,'PublicEvent','t',24509048,'{}','{"id": 24509048, "url": "https://api.github.com/repos/SabinaS/csee6868", "name": "SabinaS/csee6868"}','{"id": 2955009, "url": "https://api.github.com/users/SabinaS", "login": "SabinaS", "avatar_url": "https://avatars.githubusercontent.com/u/2955009?", "gravatar_id": ""}',NULL,'2015-01-01 00:09:13'
  ), (
    2489368389,'WatchEvent','t',28229924,'{"action": "started"}','{"id": 28229924, "url": "https://api.github.com/repos/inf0rmer/blanket", "name": "inf0rmer/blanket"}','{"id": 1405427, "url": "https://api.github.com/users/tategakibunko", "login": "tategakibunko", "avatar_url": "https://avatars.githubusercontent.com/u/1405427?", "gravatar_id": ""}',NULL,'2015-01-01 00:00:24'
  );


“From Select”子句(分布式汇总)


Citus 还支持 INSERT ... SELECT 语句 —— 根据选择查询的结果插入行。这是一种方便的填充表的方法,并且还允许使用 ON CONFLICT 子句进行“更新插入(upserts)”,这是进行分布式汇总的最简单方法。


  • 分布式汇总



Citus 中,可以通过三种方式从 select 语句中插入。第一个是如果源表和目标表位于同一位置,并且 select/insert 语句都包含分布列。在这种情况下,Citus 可以将 INSERT ... SELECT 语句下推以在所有节点上并行执行。


SELECT 查询不需要协调器上的合并步骤时,可能会发生重新分区优化。它不适用于以下需要合并步骤的 SQL 功能:


  • ORDER BY
  • LIMIT
  • OFFSET
  • GROUP BY 当分布列不是 group 键的一部分时
  • 按源表中的非分布列分区时的 Window(窗口)函数
  • 非同位表之间的Join(连接)(即重新分区连接)


当源表和目标表没有在同一位置,并且无法应用重新分区优化时,Citus 使用第三种方式执行 INSERT ... SELECT。它从工作节点中选择结果,并将数据拉到协调节点。协调器将行重定向回适当的分片。因为所有数据都必须通过单个节点,所以这种方法效率不高。


如果对 Citus 使用哪种方法有疑问,请使用 EXPLAIN 命令,如 PostgreSQL 调优中所述。当目标表的分片数量非常大时,禁用重新分区可能是明智之举, 请参阅 citus.enable_repartitioned_insert_select (boolean)。


  • PostgreSQL 调优
  • citus.enable_repartitioned_insert_select (boolean)


COPY 命令(批量加载)


要从文件中批量加载数据,您可以直接使用 PostgreSQL\COPY 命令。

首先通过运行下载我们的示例 github_events 数据集:


wget http://examples.citusdata.com/github_archive/github_events-2015-01-01-{0..5}.csv.gz
gzip -d github_events-2015-01-01-*.gz


然后,您可以使用 psql 复制数据(注意,此数据需要数据库具有 UTF8 编码):


\COPY github_events FROM 'github_events-2015-01-01-0.csv' WITH (format CSV)


注意:

没有跨分片的快照隔离的概念,这意味着与 COPY 并发运行的多分片 SELECT 可能会看到它在某些分片上提交,但在其他分片上没有。如果用户正在存储事件数据,他可能偶尔会观察到最近数据中的小间隙。如果这是一个问题,则由应用程序来处理(例如,从查询中排除最新数据,或使用一些锁)。

如果 COPY 未能打开分片放置的连接,那么它的行为方式与 INSERT 相同,即将放置标记为非活动,除非没有更多活动的放置。如果连接后发生任何其他故障,事务将回滚,因此不会更改元数据。


使用汇总缓存聚合


事件数据管道和实时仪表板等应用程序需要对大量数据进行亚秒级查询。使这些查询快速的一种方法是提前计算和保存聚合。这称为“汇总”数据,它避免了在运行时处理原始数据的成本。作为一个额外的好处,将时间序列数据汇总到每小时或每天的统计数据中也可以节省空间。当不再需要其全部详细信息并且聚合足够时,可能会删除旧数据。

例如,这是一个通过 url 跟踪页面浏览量的分布式表:


CREATE TABLE page_views (
  site_id int,
  url text,
  host_ip inet,
  view_time timestamp default now(),
  PRIMARY KEY (site_id, url)
);
SELECT create_distributed_table('page_views', 'site_id');


一旦表中填充了数据,我们就可以运行聚合查询来计算每个 URL 每天的页面浏览量,限制到给定的站点和年份。


-- how many views per url per day on site 5?
SELECT view_time::date AS day, site_id, url, count(*) AS view_count
  FROM page_views
  WHERE site_id = 5 AND
    view_time >= date '2016-01-01' AND view_time < date '2017-01-01'
  GROUP BY view_time::date, site_id, url;


上述设置有效,但有两个缺点。首先,当您重复执行聚合查询时,它必须遍历每个相关行并重新计算整个数据集的结果。如果您使用此查询来呈现仪表板,则将聚合结果保存在每日页面浏览量表中并查询该表会更快。其次,存储成本将随着数据量和可查询历史的长度成比例增长。在实践中,您可能希望在短时间内保留原始事件并查看较长时间窗口内的历史图表。


为了获得这些好处,我们可以创建一个 daily_page_views 表来存储每日统计信息。


CREATE TABLE daily_page_views (
  site_id int,
  day date,
  url text,
  view_count bigint,
  PRIMARY KEY (site_id, day, url)
);
SELECT create_distributed_table('daily_page_views', 'site_id');


在此示例中,我们在 site_id 列上同时分配了 page_viewsdaily_page_views。这确保了与特定站点相对应的数据将位于同一节点上。在每个节点上将两个表的行保持在一起可以最大限度地减少节点之间的网络流量并实现高度并行执行。


一旦我们创建了这个新的分布式表,我们就可以运行 INSERT INTO ... SELECT 将原始页面视图汇总到聚合表中。在下文中,我们每天汇总页面浏览量。Citus 用户通常在一天结束后等待一段时间来运行这样的查询,以容纳迟到的数据。


-- roll up yesterday's data
INSERT INTO daily_page_views (day, site_id, url, view_count)
  SELECT view_time::date AS day, site_id, url, count(*) AS view_count
  FROM page_views
  WHERE view_time >= date '2017-01-01' AND view_time < date '2017-01-02'
  GROUP BY view_time::date, site_id, url;
-- now the results are available right out of the table
SELECT day, site_id, url, view_count
  FROM daily_page_views
  WHERE site_id = 5 AND
    day >= date '2016-01-01' AND day < date '2017-01-01';


上面的汇总查询汇总了前一天的数据并将其插入 daily_page_views。每天运行一次查询意味着不需要更新汇总表行,因为新一天的数据不会影响之前的行。

当处理迟到的数据或每天多次运行汇总查询时,情况会发生变化。如果任何新行与汇总表中已有的天数匹配,则匹配计数应增加。 PostgreSQL 可以使用 “ON CONFLICT” 来处理这种情况, 这是它进行 upserts 的技术。这是一个例子。


  • upserts


-- roll up from a given date onward,
-- updating daily page views when necessary
INSERT INTO daily_page_views (day, site_id, url, view_count)
  SELECT view_time::date AS day, site_id, url, count(*) AS view_count
  FROM page_views
  WHERE view_time >= date '2017-01-01'
  GROUP BY view_time::date, site_id, url
  ON CONFLICT (day, url, site_id) DO UPDATE SET
    view_count = daily_page_views.view_count + EXCLUDED.view_count;


更新和删除


您可以使用标准 PostgreSQL UPDATE 和 DELETE 命令更新或删除分布式表中的行。


DELETE FROM github_events
WHERE repo_id IN (24509048, 24509049);
UPDATE github_events
SET event_public = TRUE
WHERE (org->>'id')::int = 5430905;


  • UPDATE
  • DELETE

更新/删除影响如上例中的多个分片时,Citus 默认使用单阶段提交协议。为了提高安全性,您可以通过设置启用两阶段提交


SET citus.multi_shard_commit_protocol = '2pc';


如果更新或删除仅影响单个分片,则它在单个工作节点内运行。在这种情况下,不需要启用 2PC。当按表的分布列更新或删除过滤器时,通常会发生这种情况:


-- since github_events is distributed by repo_id,
-- this will execute in a single worker node
DELETE FROM github_events
WHERE repo_id = 206084;


此外,在处理单个分片时,Citus 支持 SELECT ... FOR UPDATE。这是对象关系映射器 (ORM) 有时使用的一种技术,用于安全地:


  1. 加载行
  2. 在应用程序代码中进行计算
  3. 根据计算更新行

选择要更新的行会对它们设置写锁定,以防止其他进程导致“丢失更新(lost update)”异常。


BEGIN;
  -- select events for a repo, but
  -- lock them for writing
  SELECT *
  FROM github_events
  WHERE repo_id = 206084
  FOR UPDATE;
  -- calculate a desired value event_public using
  -- application logic that uses those rows...
  -- now make the update
  UPDATE github_events
  SET event_public = :our_new_value
  WHERE repo_id = 206084;
COMMIT;


仅哈希分布表和引用表支持此功能,并且仅那些具有 replication_factor1 的表支持。


  • replication_factor


最大化写入性能


在大型机器上,INSERTUPDATE/DELETE 语句都可以扩展到每秒约 50,000 个查询。但是,要达到这个速度,您将需要使用许多并行的、长期存在的连接并考虑如何处理锁定。有关更多信息,您可以查阅我们文档的横向扩展数据摄取部分。


  • 横向扩展数据摄取
相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
16天前
|
SQL 存储 缓存
SQL Server 数据太多如何优化
11种优化方案供你参考,优化 SQL Server 数据库性能得从多个方面着手,包括硬件配置、数据库结构、查询优化、索引管理、分区分表、并行处理等。通过合理的索引、查询优化、数据分区等技术,可以在数据量增大时保持较好的性能。同时,定期进行数据库维护和清理,保证数据库高效运行。
|
1月前
|
SQL 移动开发 Oracle
SQL语句实现查询连续六天数据的方法与技巧
在数据库查询中,有时需要筛选出符合特定时间连续性条件的数据记录
|
1月前
|
SQL 存储 关系型数据库
添加数据到数据库的SQL语句详解与实践技巧
在数据库管理中,添加数据是一个基本操作,它涉及到向表中插入新的记录
|
1月前
|
SQL 数据挖掘 数据库
SQL查询每秒的数据:技巧、方法与性能优化
id="">SQL查询功能详解 SQL(Structured Query Language,结构化查询语言)是一种专门用于与数据库进行沟通和操作的语言
|
1月前
|
SQL 监控 数据处理
SQL数据库数据修改操作详解
数据库是现代信息系统的重要组成部分,其中SQL(StructuredQueryLanguage)是管理和处理数据库的重要工具之一。在日常的业务运营过程中,数据的准确性和及时性对企业来说至关重要,这就需要掌握如何在数据库中正确地进行数据修改操作。本文将详细介绍在SQL数据库中如何修改数据,帮助读者更好
200 4
|
1月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
3月前
|
NoSQL Redis
基于Redis的高可用分布式锁——RedLock
这篇文章介绍了基于Redis的高可用分布式锁RedLock的概念、工作流程、获取和释放锁的方法,以及RedLock相比单机锁在高可用性上的优势,同时指出了其在某些特殊场景下的不足,并提到了ZooKeeper作为另一种实现分布式锁的方案。
112 2
基于Redis的高可用分布式锁——RedLock
|
3月前
|
缓存 NoSQL Java
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】
这篇文章是关于如何在SpringBoot应用中整合Redis并处理分布式场景下的缓存问题,包括缓存穿透、缓存雪崩和缓存击穿。文章详细讨论了在分布式情况下如何添加分布式锁来解决缓存击穿问题,提供了加锁和解锁的实现过程,并展示了使用JMeter进行压力测试来验证锁机制有效性的方法。
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】
|
9天前
|
NoSQL Redis
Redis分布式锁如何实现 ?
Redis分布式锁通过SETNX指令实现,确保仅在键不存在时设置值。此机制用于控制多个线程对共享资源的访问,避免并发冲突。然而,实际应用中需解决死锁、锁超时、归一化、可重入及阻塞等问题,以确保系统的稳定性和可靠性。解决方案包括设置锁超时、引入Watch Dog机制、使用ThreadLocal绑定加解锁操作、实现计数器支持可重入锁以及采用自旋锁思想处理阻塞请求。
43 16
|
1月前
|
缓存 NoSQL Java
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
59 3
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁

热门文章

最新文章