「时序数据库」使用cassandra进行时间序列数据扫描

简介: 「时序数据库」使用cassandra进行时间序列数据扫描

如果你来自RDBMS的世界,在Cassandra有效地开始建模实体之前,需要一些时间来适应。帮助我解决问题的经验法则是让您的查询定义您需要的实体。

最近,我将应用程序的持久层从Oracle迁移到Cassandra。虽然迁移一些主要实体相当简单,但它解决了一些用例,比如支持范围扫描,这给我们带来了一组独特的挑战。

注意:范围扫描/查询基本上是基于时间戳范围的记录查找,在它之上有或没有任何过滤器。

在进入实际问题之前,让我们先了解一下这个应用程序,我在这里提到的应用程序处理客户在我们的站点上执行的所有事务的所有通信。我们平均每天要通过电子邮件、短信和应用程序通知等各种渠道向客户发送数千万条信息。

问题是什么?

我们希望确保在迁移到Cassandra时支持实体上的范围查询,原因有两个:

  • 当我们使用Oracle时,我们的团队已经在一些仪表板上使用了类似的读取,所以必须确保向后兼容性。

例如,在“Y”和“Z”时间之间,在“X”日期为一个特定事件发送了多少通信。

  • 此外,它还可以帮助您为将来可能要执行的任何向下一代持久性解决方案的迁移提供证据。

对于这个文章的范围,让我们假设我们有一个列族定义如下:

create table if not exists my_keyspace.message_log ( id uuid, message_id text, event text, event_type text, machine_ip text, created_date date, created_date_timestamp timestamp, primary key ((message_id), id) );

现在,这可以帮助您基于message_id查询消息的详细信息,因为上面的实体是根据message_id分区的。

但是,如果希望查找在特定时间范围内着陆的消息,就会出现问题。为了支持这一点,我们将引入一个列族,如下所示:

create table if not exists my_keyspace.message_log_dated (

id uuid,

message_id text,

event text,

created_date date,

created_date_timestamp timestamp,

primary key ((created_date), event, created_date_timestamp, message_id, id)

);

现在,这允许我们执行像下面这样的查询,以获得跨时间范围的数据:

select message_id from my_keyspace.message_log where created_date='2019–12–04'

and event='order-confirmation'

and created_date_timestamp > 1575417601000

and created_date_timestamp < 1575547200000

虽然这可能适用于旨在服务一致和可预测流量的系统,但对于容易受到客户行为驱动的意外激增的系统来说,这样的模型可能是一个绝对的噩梦。

由于上述模型是在created_date列上分区的,因此在给定日期上任何意外的流量激增都将意味着您的所有写操作都被定向到同一个分区,并且很可能在Cassandra集群上创建了一个热点。就集群性能而言,这基本上意味着三件事:

  • 您可能会冒集群中的某些节点的风险。
  • 您可能永远无法成功读取此数据,因为查找有超时的风险。
  • 这样就违反了保持集群处于健康状态的黄金法则,即不要让分区大小增长得太大。(建议将分区大小保持在100 MB左右,但没有确切的数字。)

我们做了什么?我们尝试引入一个叫做“一天之窗”(window of the day)的新因素,它只是一个时间插槽(统一),它将一天划分为相等的插槽,并将流量分散到多个分区,如下所示:

create table if not exists my_keyspace.message_log_slotted (

id uuid,

message_id text,

event text,

event_type text,

machine_ip text,

created_date date,

slot_id text,

created_date_timestamp timestamp,

primary key ((created_date, slot_id), event, created_date_timestamp, message_id, id)

);

现在,在某一天出现流量激增的情况下,我们仍然能够在写入数据的同时访问多个分区,并防止任何潜在的热点。

在本例中,读取一天的整个数据的查询是这样的(假设我们决定使用6小时的插槽)

select message_id from my_keyspace.message_log where created_date='2019–12–04'

and slot_id in ('slot_00_06', 'slot_06_12', 'slot_12_18', 'slot_18_24')

and created_date_timestamp > 1575417601000

and created_date_timestamp < 1575547200000

-- please note this may need some app side filtering to narrow down results to certain events in the entity.

虽然这种方法确实解决了问题,但它只是部分地解决了问题。如果流量激增被限制在一个特定的插槽内,该模型仍然容易出现与第一个模型相同的问题。

您可以决定将插槽大小减少到更少的小时数甚至分钟数,但事实是您的模型仍然是易受影响的。

试图使这种方法适合我们的生产用例的绝望尝试,它将需要一个槽的大小约5分钟,确保即使在最大峰值(当前)我们能够阻止任何热点和能够直接交通尽快下一个分区,同时保持当前分区大小可接受范围。这就意味着我们需要在12 * 24 = 288个分区中查找一天的数据。

那么我们该怎么办呢?

我们决定后退一步,重新评估我们的计划,而不是粗暴地按所有数据Cassandra建模指南行事。我们的结论是,我们需要一种可靠的方式向每个分区发送受控的写操作,一旦达到限制(每个分区希望维护的最大存储空间),我们的应用程序应该能够将写操作重定向到一个新的分区。

我们提出了一种bucketing策略,其中每个应用程序实例(VMs/computes)为它持久保存的记录维护它自己的内存计数器,并为所有这些写操作分配一个专用的bucket_id。每次进入流动状态时,应用程序都会生成一个新的随机唯一id:

  • 日期已经改变了。
  • 一个新的实体已经被引入。
  • 桶已达到其最大计数。(这意味着在这个特定分区中保存了足够多的记录,需要将写操作移动到一个新的分区。)

这还需要我们维护一个字典,以维护由所有应用程序实例生成的所有bucket_ids到它们为之创建的日期和实体的映射。我们可以将其持久化到同一个Cassandra密钥空间中。

在本例中,实体(message_log_date)中支持范围扫描的行如下所示

created_datebucket_ideventapp_instancecreated_tsmessage_id d1random_unique_id1order-confirmationvm1ts1m1 d1random_unique_id1ship-confirmationvm1ts2m2 d1random_unique_id2ship-confirmationvm2ts3m3 d1random_unique_id3order-confirmationvm3ts4m4 d1random_unique_id1delivery-confirmationvm1ts5m5

而用于维护日期和实体到该范围各自存储段之间的映射的字典将是这样的。

created_dateentity_namecreated_tsapp_instancebucket_id d1message_logts1vm1random_unique_id1 d1message_logts2vm2random_unique_id2 d1success_logts3vm1random_unique_id3 d1failure_logts4vm1random_unique_id4

现在,当你的bucket在某一天溢出时,你可能会看到下面这样的条目(注意,同一时间内为同一VM创建的第7行):

created_datebucket_ideventapp_instancecreated_tsmessage_id d1random_unique_id1order-confirmationvm1ts1m1 d1random_unique_id1ship-confirmationvm1ts2m2 d1random_unique_id2ship-confirmationvm2ts3m3 d1random_unique_id3order-confirmationvm3ts4m4 d1random_unique_id1delivery-confirmationvm1ts5m5 d1random_unique_id4delivery-confirmationvm1ts6m1

现在,让我们看看如何使用这个模型对给定的实体执行范围扫描,并使用一些实际的测试数据。

-- view bucket ids.

team@cqlsh> select * from my_keyspace.bucket_id_log where created_date='2019-12-01' and entity_name='MESSAGE_LOG_DATED';

created_date | entity_name | created_date_timestamp | id (just here to make pk unique) | bucket_id | created_by

--------------+-------------------+--------------------------+--------------------------------------+--------------------------------------+----------------

2019-12-01 | MESSAGE_LOG_DATED | 2019-12-01 00:00:06+0000 | dc5a7fb9-eeb2-425d-ae9c-4c14f9ab581c | 8751a640-13cd-11ea-bcc0-7f2f66afd78a | 10.65.99.183

2019-12-01 | MESSAGE_LOG_DATED | 2019-12-01 00:00:06+0000 | b61afafe-13a8-4b8d-9014-d249183760e1 | 8751a640-13cd-11ea-bcc0-7f2f66afd78b | 10.65.103.141

2019-12-01 | MESSAGE_LOG_DATED | 2019-12-01 00:00:05+0000 | 5a60bd48-cd97-4912-9cba-d543225e7bfe | 8751a640-13cd-11ea-bcc0-7f2f66afd78c | 10.117.233.105


-- Read bucket ids.

team@cqlsh> select bucket_id from my_keyspace.bucket_id_log where created_date='2019-12-01' and entity_name='MESSAGE_LOG_DATED';

bucket_id

--------------------------------------

8751a640-13cd-11ea-bcc0-7f2f66afd78a

8751a640-13cd-11ea-bcc0-7f2f66afd78b

8751a640-13cd-11ea-bcc0-7f2f66afd78c


-- Read message log for the time span.

team@cqlsh> select * from my_keyspace.message_log_dated where created_date='2019-12-01' and bucket_id in (8823ea60-13cd-11ea-b3c9-adfcfeffafbc, 881eba40-13cd-11ea-b0bc-dd0e281ebb5f, 87bbd9c0-13cd-11ea-81d9-81a480b11fcf, 8751a640-13cd-11ea-bcc0-7f2f66afd78c);


created_date | bucket_id | event | created_date_timestamp | message_id | id (part of pk, (surrogate)) | created_by

--------------+---------------------------------------+-----------------------+--------------------------+--------------------------------------+---------------------------------------+----------------

2019-12-01 | 8751a640-13cd-11ea-bcc0-7f2f66afd78a | order-confirmation | 2019-12-01 15:58:00+0000 | 13a07a49-8169-44d1-a7a9-ae8f33ba44c5 | 71c1f81e-cd6e-4ab8-b714-ebf48af32be0 | 10.247.194.101

2019-12-01 | 8751a640-13cd-11ea-bcc0-7f2f66afd78b | ship-confirmation | 2019-12-01 22:21:53+0000 | 3dfd0c23-1a62-45e8-a7b6-d4e14d65149a | 9ea23802-121e-48cc-91ac-711ab261b195 | 10.247.194.102

2019-12-01 | 8751a640-13cd-11ea-bcc0-7f2f66afd78c | delivery-confirmation | 2019-12-01 22:21:17+0000 | 4a24e822-624f-486e-b67c-f25be2f40110 | 9f8ce5ad-69a1-4e1a-8a4f-d31dedba7847 | 10.247.194.103

-- now we can easily scan through each message using message ids resulted above for the given time range.

这里有一个快速查询,确认每个bucket都符合分配的最大bucket计数,在我的测试中保持50000。

team@cqlsh> select count(*) from my_keyspace.message_log_dated where created_date='2019-12-01' and bucket_id=8751a640-13cd-11ea-bcc0-7f2f66afd78a;

count

-------

50000

(1 rows)

下面是这个流程的写和读的快速演示:



评价

现在我们来评估一下这个方法,

优点:

  • 过滤应该是直接的,你可以在日期日志中复制更多的列,这样你就可以读取你想要的配置的消息id。
  • 插入操作简单快捷。
  • 读是很快速的。
  • 最重要的是,它解决了我们在这里试图解决的集群平衡问题。
  • 如果应用程序上的负载均衡器是公正的,它会很有帮助,但即使它在错误/配置改变的情况下摇摆不定,这种方法仍然会保持稳定。

缺点:

  • 排序(如果需要)将需要在应用程序层进行。
  • 分页需要在应用程序层上执行。

总结

Cassandra上的范围扫描解决起来并不简单,但是只要有合适的数据模型,就一定可以解决这个问题。为了保持集群正常运行,您的写操作必须是分布式的和统一的。

虽然您可能会认为使用上述建议的解决方案会消耗更多存储空间,但在Cassandra的世界中这完全没问题。你可以复制数据(而不是以最规格化的形式),如果它有助于优化你的查询。

使用这种基于“Bucketisation”的方法,您可以支持范围扫描,同时避免意外流量激增,并且不会对集群的运行状况造成任何风险。

最后但并非最不重要的是,我要对我的导师大喊一声,感谢他从这个方法的开始到它在生产中发挥作用一直在指导我。

相关文章
|
3天前
|
存储 SQL 监控
Visual Basic与数据库交互:实现数据访问和管理
【4月更文挑战第27天】本文探讨了使用Visual Basic进行数据库编程的基础,包括数据库基础、连接、数据访问技术如ADO.NET,数据绑定,事务处理,存储过程与视图。还强调了性能优化、安全性、测试与调试,以及持续维护的重要性。通过掌握这些概念和技巧,开发者能构建高效、可靠的数据驱动应用。
|
2月前
|
存储 Oracle 关系型数据库
Dataphin常见问题之想要周期执行任务如何解决
Dataphin是阿里云提供的一站式数据处理服务,旨在帮助企业构建一体化的智能数据处理平台。Dataphin整合了数据建模、数据处理、数据开发、数据服务等多个功能,支持企业更高效地进行数据治理和分析。
|
2月前
|
存储 NoSQL 数据库
阿里云数据库Cassandra的产品价格
阿里云数据库Cassandra提供多地域服务,如中国、亚太、欧洲、美洲及中东。计费分为实例主机节点规格费和存储费用,实例价格因节点数和副本数而异,存储费用按挂载云盘计算。生产系统建议配置多节点以确保冗余。公网流量目前免费,具体收费时间未定。详细价格以购买页面为准。
423 3
|
2月前
|
SQL 开发框架 JavaScript
在 Vue 中进行数据持久化时,有哪些常用的数据库框架?
在 Vue 中进行数据持久化时,有哪些常用的数据库框架?
49 3
|
8天前
|
关系型数据库 Apache 流计算
手把手教你实现 OceanBase 数据到阿里云数据库 SelectDB 内核版 Apache Doris 的便捷迁移|实用指南
本文介绍了如何将数据从 OceanBase 迁移到阿里云数据库 SelectDB 内核版 Apache Doris。提供 3 种数据同步方法 1. 使用 DataX,下载 DataX 并编写配置文件,通过 OceanBaseReader 和 DorisWriter 进行数据迁移。 2. 利用 Apache Doris 的 Catalog功 能,将 OceanBase 表映射到 Doris 并插入数据。 3. 通过Flink CDC,设置 OceanBase 环境,配置 Flink 连接器,实现实时数据同步。
手把手教你实现 OceanBase 数据到阿里云数据库 SelectDB 内核版 Apache Doris 的便捷迁移|实用指南
|
2月前
|
存储 数据管理 数据处理
数据之光:探索数据库技术的演进之路
数据之光:探索数据库技术的演进之路
60 1
|
2月前
|
SQL Java 数据库连接
从来没想到我们会扒拉nohup文件去找我们想要的数据,然后往数据库中添加。。。...
从来没想到我们会扒拉nohup文件去找我们想要的数据,然后往数据库中添加。。。...
19 0
|
20天前
|
缓存 NoSQL 算法
17- 数据库有1000万数据 ,Redis只能缓存20w数据, 如何保证Redis中的数据都是热点数据 ?
保证Redis中的20w数据为热点数据,可以通过设置Redis的LFU(Least Frequently Used)淘汰策略。这样,当数据库有1000万数据而Redis仅能缓存20w时,LFU会自动移除使用频率最低的项,确保缓存中的数据是最常使用的。
55 8
|
1天前
|
分布式计算 DataWorks 关系型数据库
DataWorks产品使用合集之在使用 DataWorks 数据集成同步 PostgreSQL 数据库中的 Geometry 类型数据如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
6 0
|
6天前
|
SQL 关系型数据库 API
从API获取数据并将其插入到PostgreSQL数据库:步骤解析
使用Python处理从API获取的数据并插入到PostgreSQL数据库:安装`psycopg2`,建立数据库连接,确保DataFrame与表结构匹配,然后使用`to_sql`方法将数据插入到已存在的表中。注意数据准备、权限设置、性能优化和安全处理。

热门文章

最新文章