「时序数据库」Cassandra时间序列大规模数据建模

简介: 「时序数据库」Cassandra时间序列大规模数据建模


在开始使用Cassandra和时间序列数据时,人们面临的最大挑战之一是理解编写工作负载对集群的影响。过快地写入单个分区可能会创建热点,从而限制向外扩展的能力。分区太大可能会导致修复、流和读取性能方面的问题。从大分区的中间读取会带来很大的开销,并导致GC压力的增加。Cassandra 4.0应该可以提高大分区的性能,但是它不能完全解决我已经提到的其他问题。在可预见的未来,我们将需要考虑它们的性能影响,并相应地进行计划。

在这篇文章中,我将讨论一种常见的Cassandra数据建模技术,称为bucketing。Bucketing是一种策略,让我们可以控制每个分区中存储多少数据,以及将写出的数据分散到整个集群。这篇文章将讨论两种形式的攻击。当数据模型需要进一步扩展时,可以结合使用这些技术。读者应该已经熟悉了分区的解剖和基本的CQL命令。

当我们第一次使用Cassandra学习数据建模时,我们可能会看到如下内容:

CREATE TABLE raw_data (

sensor text,

ts timeuuid,

readint int,

primary key(sensor, ts)

) WITH CLUSTERING ORDER BY (ts DESC)

AND compaction = {'class': 'TimeWindowCompactionStrategy',

'compaction_window_size': 1,

'compaction_window_unit': 'DAYS'};

这是存储一些非常简单的传感器数据的一个很好的第一个数据模型。通常我们收集的数据要比整数复杂得多,但在这篇文章中,我们将关注键。我们利用TWCS作为压缩战略。TWCS将帮助我们处理压缩大分区的开销,这将使我们的CPU和I/O处于控制之下。不幸的是,它仍然有一些明显的限制。如果我们不使用TTL,那么当我们接收更多数据时,我们的分区大小将无限地持续增长。如上所述,在修复、流化或从任意时间片读取数据时,大分区会带来很大的开销。

为了分解这个大分区,我们将利用第一种形式的bucketing。我们将根据时间窗口将我们的分区分成更小的分区。理想的大小是将分区保持在100MB以下。例如,如果我们每天存储50-75MB的数据,那么每天每个传感器一个分区就是一个不错的选择。只要分区不超过100MB,我们也可以简单地使用周(从某个纪元开始)、月和年。无论选择什么,留一点增长空间是个好主意。

为此,我们将向分区键添加另一个组件。修改之前的数据模型,我们将添加一个day字段:

CREATE TABLE raw_data_by_day (

sensor text,

day text,

ts timeuuid,

reading int,

primary key((sensor, day), ts)

) WITH CLUSTERING ORDER BY (ts DESC)

AND COMPACTION = {'class': 'TimeWindowCompactionStrategy',

'compaction_window_unit': 'DAYS',

'compaction_window_size': 1};

插入到表中需要使用date和now()值(你也可以在你的应用代码中生成一个TimeUUID):

INSERT INTO raw_data_by_day (sensor, day, ts, reading)

VALUES ('mysensor', '2017-01-01', now(), 10);

这是限制每个分区的数据量的一种方法。为了跨多天获取大量数据,您需要每天发出一个查询。这样查询的好处在于,我们可以将工作分散到整个集群,而不是要求单个节点执行大量工作。我们还可以通过依赖驱动程序中的异步调用并行地发出这些查询。对于这种用例,Python驱动程序甚至有一个方便的辅助函数:

from itertools import product

from cassandra.concurrent import execute_concurrent_with_args

days = ["2017-07-01", "2017-07-12", "2017-07-03"] # collecting three days worth of data

session = Cluster(["127.0.0.1"]).connect("blog")

prepared = session.prepare("SELECT day, ts, reading FROM raw_data_by_day WHERE sensor = ? and day = ?")

args = product(["mysensor"], days)

# args: ('test', '2017-07-01'), ('test', '2017-07-12'), ('test', '2017-07-03')

# driver handles concurrency for you

results = execute_concurrent_with_args(session, prepared, args)

# Results:

#[ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d36750>),

# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d36a90>),

# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d36550>)]

这种技术的一种变体是每个时间窗口使用不同的表。例如,每月使用一个表意味着每年有12个表:

CREATE TABLE raw_data_may_2017 (

sensor text,

ts timeuuid,

reading int,

primary key(sensor, ts)

) WITH COMPACTION = {'class': 'TimeWindowCompactionStrategy',

'compaction_window_unit': 'DAYS',

'compaction_window_size': 1};

这种策略的主要好处是有助于存档和快速删除旧数据。例如,在每个月的开始,我们可以将上个月的数据以拼花的格式归档到HDFS或S3中,利用便宜的存储来进行分析。当我们不再需要Cassandra中的数据时,我们可以简单地删除表。您可能会看到,在创建和删除表时需要进行一些额外的维护,因此,这种方法实际上只有在需要归档时才有用。还有其他存档数据的方法,因此这种类型的bucketing可能是不必要的。

上面的策略主要是防止分区在长时间内变得太大。如果我们有一个可预测的工作负载和有很小变化的分区大小,这是很好的。我们可能会摄入太多的信息,以至于单个节点无法写出数据,或者一小部分对象的摄入率要高得多。Twitter就是一个很好的例子,有些人拥有数千万的追随者,但这并不常见。对于我们需要大规模使用的这些类型的账户,通常会有一个单独的代码路径

第二种技术在任何给定时间使用多个分区将插入扇出到整个集群。这个策略的好处是,我们可以使用一个分区来处理小卷,使用多个分区来处理大卷。

我们在这个设计中所做的权衡是在读取时我们需要使用散射聚集,这有明显的更高的开销。这可能会使分页更加困难。我们需要能够跟踪我们为每个小发明摄取了多少数据。这是为了确保我们可以选择正确数量的分区来使用。如果我们使用太多的桶,我们就会在很多分区上执行很多非常小的读取操作。如果桶太少,我们会得到非常大的分区,这些分区不能很好地压缩、修复、流处理,并且读取性能很差。

在这个例子中,我们将研究一个理论模型,它适用于那些在Twitter这样的社交网络上关注大量用户的人。大多数帐户都可以使用一个单独的分区来接收消息,但有些人/机器人可能会关注数百万个帐户。

免责声明:我不知道Twitter实际上是如何存储他们的数据的,这只是一个简单的例子来讨论。

CREATE TABLE tweet_stream (

account text,

day text,

bucket int,

ts timeuuid,

message text,

primary key((account, day, bucket), ts)

) WITH CLUSTERING ORDER BY (ts DESC)

AND COMPACTION = {'class': 'TimeWindowCompactionStrategy',

'compaction_window_unit': 'DAYS',

'compaction_window_size': 1};

这个数据模型扩展了前面的数据模型,将bucket添加到分区键中。现在,每天都可以从多个桶中获取数据。当需要读取时,我们需要从所有分区中获取所需的结果。为了演示,我们将插入一些数据到我们的分区:

cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 0, now(), 'hi');

cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 1, now(), 'hi2');

cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 2, now(), 'hi3');

cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 3, now(), 'hi4');

如果我们想要十个最新的消息,我们可以这样做:

from itertools import chain

from cassandra.util import unix_time_from_uuid1

prepared = session.prepare("SELECT ts, message FROM tweet_stream WHERE account = ? and day = ? and bucket = ? LIMIT 10")

# let's get 10 buckets

partitions = range(10)

# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

args = product(["jon_haddad"], ["2017-07-01"], partitions)

result = execute_concurrent_with_args(session, prepared, args)

# [ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1e6d0>),

# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1d710>),

# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1d4d0>),

# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1d950>),

# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1db10>),

# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1dfd0>),

# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1dd90>),

# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1d290>),

# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1e250>),

# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1e490>)]

results = [x.result_or_exc for x in result]

# append all the results together

data = chain(*results)


sorted_results = sorted(data, key=lambda x: unix_time_from_uuid1(x.ts), reverse=True)

# newest stuff first

# [Row(ts=UUID('e1c59e60-7406-11e7-9458-897782c5d96c'), message=u'hi4'),

# Row(ts=UUID('dd6ddd00-7406-11e7-9458-897782c5d96c'), message=u'hi3'),

# Row(ts=UUID('d4422560-7406-11e7-9458-897782c5d96c'), message=u'hi2'),

# Row(ts=UUID('d17dae30-7406-11e7-9458-897782c5d96c'), message=u'hi')]

这个例子只使用了10个项目,所以我们可以作为懒惰的程序员,合并列表,然后对它们排序。如果我们想获取更多的元素我们就需要k路归并算法。我们将在以后的博客中进一步讨论这个话题。

此时,您应该对如何围绕集群分发数据和请求有了更好的理解,这使得集群可以比使用单个分区时扩展得更大。记住每个问题都是不同的,没有万能的解决方案。

相关文章
|
8月前
|
存储 NoSQL 数据库
阿里云数据库Cassandra的产品价格
阿里云数据库Cassandra提供多地域服务,如中国、亚太、欧洲、美洲及中东。计费分为实例主机节点规格费和存储费用,实例价格因节点数和副本数而异,存储费用按挂载云盘计算。生产系统建议配置多节点以确保冗余。公网流量目前免费,具体收费时间未定。详细价格以购买页面为准。
500 3
|
3月前
|
SQL NoSQL 数据库
Cassandra数据库与Cql实战笔记
Cassandra数据库与Cql实战笔记
46 1
Cassandra数据库与Cql实战笔记
|
5月前
|
存储 NoSQL 算法
使用图数据库进行复杂数据建模:探索数据关系的无限可能
【8月更文挑战第17天】图数据库以其高效的关系查询能力、直观的数据表示方式、灵活的数据模型和强大的可扩展性,在复杂数据建模和查询中展现出了巨大的潜力。随着大数据和人工智能技术的不断发展,图数据库的应用领域也将不断拓展和深化。对于需要处理复杂关系网络和数据关联性的场景来说,图数据库无疑是一个值得深入研究和应用的强大工具。
|
5月前
|
XML 分布式数据库 数据库
【计算机三级数据库技术】第13章 大规模数据库架构--附思维导图
文章概述了分布式数据库、并行数据库、云计算数据库架构和XML数据库的基本概念、目标、体系结构以及与传统数据库的比较,旨在提供对这些数据库技术的全面理解。
53 1
|
7月前
|
分布式计算 Hadoop 存储
|
7月前
|
监控 NoSQL 数据建模
使用Apache Cassandra进行分布式数据库管理的技术实践
【6月更文挑战第5天】本文探讨了使用Apache Cassandra进行分布式数据库管理的技术实践。Cassandra是一款高性能、可扩展的NoSQL数据库,适合大规模、高并发场景。文章介绍了其高可扩展性、高性能、高可用性和灵活数据模型等核心特性,并详细阐述了环境准备、安装配置、数据建模与查询以及性能优化与监控的步骤。通过本文,读者可掌握Cassandra的运用,适应不断增长的数据需求。
|
6月前
|
存储 NoSQL Java
Spring Boot与Cassandra数据库的集成应用
Spring Boot与Cassandra数据库的集成应用
|
8月前
|
存储 运维 物联网
【专栏】OceanBase 是一款先进的分布式数据库系统,以其分布式架构、高扩展性、高可用性和强一致性特点,应对大规模数据处理挑战
【4月更文挑战第29天】OceanBase 是一款先进的分布式数据库系统,以其分布式架构、高扩展性、高可用性和强一致性特点,应对大规模数据处理挑战。它支持混合负载,适用于金融、电商和物联网等领域,提供高性能、低成本的解决方案。尽管面临技术复杂性、数据迁移和性能优化等问题,通过合理策略可克服挑战。随着技术发展,OceanBase 在数字化时代将持续发挥关键作用。
275 1
|
8月前
|
存储 资源调度 分布式计算
在分布式数据库系统中处理大规模数据
【4月更文挑战第24天】在分布式数据库系统中处理大规模数据
105 3
|
8月前
|
存储 运维 监控
面经:Cassandra分布式NoSQL数据库深度解读
【4月更文挑战第10天】本文是关于Apache Cassandra的面试准备指南,涵盖了数据模型、一致性模型、架构性能优化和故障恢复等核心知识点。作者强调理解Cassandra的列族、Tunable Consistency、Gossip协议及运维策略的重要性,并通过代码示例辅助解释。掌握这些内容不仅能帮助在面试中表现出色,也有助于实际工作中解决大规模数据处理问题。
118 1