在开始使用Cassandra和时间序列数据时,人们面临的最大挑战之一是理解编写工作负载对集群的影响。过快地写入单个分区可能会创建热点,从而限制向外扩展的能力。分区太大可能会导致修复、流和读取性能方面的问题。从大分区的中间读取会带来很大的开销,并导致GC压力的增加。Cassandra 4.0应该可以提高大分区的性能,但是它不能完全解决我已经提到的其他问题。在可预见的未来,我们将需要考虑它们的性能影响,并相应地进行计划。
在这篇文章中,我将讨论一种常见的Cassandra数据建模技术,称为bucketing。Bucketing是一种策略,让我们可以控制每个分区中存储多少数据,以及将写出的数据分散到整个集群。这篇文章将讨论两种形式的攻击。当数据模型需要进一步扩展时,可以结合使用这些技术。读者应该已经熟悉了分区的解剖和基本的CQL命令。
当我们第一次使用Cassandra学习数据建模时,我们可能会看到如下内容:
CREATE TABLE raw_data (
sensor text,
ts timeuuid,
readint int,
primary key(sensor, ts)
) WITH CLUSTERING ORDER BY (ts DESC)
AND compaction = {'class': 'TimeWindowCompactionStrategy',
'compaction_window_size': 1,
'compaction_window_unit': 'DAYS'};
这是存储一些非常简单的传感器数据的一个很好的第一个数据模型。通常我们收集的数据要比整数复杂得多,但在这篇文章中,我们将关注键。我们利用TWCS作为压缩战略。TWCS将帮助我们处理压缩大分区的开销,这将使我们的CPU和I/O处于控制之下。不幸的是,它仍然有一些明显的限制。如果我们不使用TTL,那么当我们接收更多数据时,我们的分区大小将无限地持续增长。如上所述,在修复、流化或从任意时间片读取数据时,大分区会带来很大的开销。
为了分解这个大分区,我们将利用第一种形式的bucketing。我们将根据时间窗口将我们的分区分成更小的分区。理想的大小是将分区保持在100MB以下。例如,如果我们每天存储50-75MB的数据,那么每天每个传感器一个分区就是一个不错的选择。只要分区不超过100MB,我们也可以简单地使用周(从某个纪元开始)、月和年。无论选择什么,留一点增长空间是个好主意。
为此,我们将向分区键添加另一个组件。修改之前的数据模型,我们将添加一个day字段:
CREATE TABLE raw_data_by_day (
sensor text,
day text,
ts timeuuid,
reading int,
primary key((sensor, day), ts)
) WITH CLUSTERING ORDER BY (ts DESC)
AND COMPACTION = {'class': 'TimeWindowCompactionStrategy',
'compaction_window_unit': 'DAYS',
'compaction_window_size': 1};
插入到表中需要使用date和now()值(你也可以在你的应用代码中生成一个TimeUUID):
INSERT INTO raw_data_by_day (sensor, day, ts, reading)
VALUES ('mysensor', '2017-01-01', now(), 10);
这是限制每个分区的数据量的一种方法。为了跨多天获取大量数据,您需要每天发出一个查询。这样查询的好处在于,我们可以将工作分散到整个集群,而不是要求单个节点执行大量工作。我们还可以通过依赖驱动程序中的异步调用并行地发出这些查询。对于这种用例,Python驱动程序甚至有一个方便的辅助函数:
from itertools import product
from cassandra.concurrent import execute_concurrent_with_args
days = ["2017-07-01", "2017-07-12", "2017-07-03"] # collecting three days worth of data
session = Cluster(["127.0.0.1"]).connect("blog")
prepared = session.prepare("SELECT day, ts, reading FROM raw_data_by_day WHERE sensor = ? and day = ?")
args = product(["mysensor"], days)
# args: ('test', '2017-07-01'), ('test', '2017-07-12'), ('test', '2017-07-03')
# driver handles concurrency for you
results = execute_concurrent_with_args(session, prepared, args)
# Results:
#[ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d36750>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d36a90>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d36550>)]
这种技术的一种变体是每个时间窗口使用不同的表。例如,每月使用一个表意味着每年有12个表:
CREATE TABLE raw_data_may_2017 (
sensor text,
ts timeuuid,
reading int,
primary key(sensor, ts)
) WITH COMPACTION = {'class': 'TimeWindowCompactionStrategy',
'compaction_window_unit': 'DAYS',
'compaction_window_size': 1};
这种策略的主要好处是有助于存档和快速删除旧数据。例如,在每个月的开始,我们可以将上个月的数据以拼花的格式归档到HDFS或S3中,利用便宜的存储来进行分析。当我们不再需要Cassandra中的数据时,我们可以简单地删除表。您可能会看到,在创建和删除表时需要进行一些额外的维护,因此,这种方法实际上只有在需要归档时才有用。还有其他存档数据的方法,因此这种类型的bucketing可能是不必要的。
上面的策略主要是防止分区在长时间内变得太大。如果我们有一个可预测的工作负载和有很小变化的分区大小,这是很好的。我们可能会摄入太多的信息,以至于单个节点无法写出数据,或者一小部分对象的摄入率要高得多。Twitter就是一个很好的例子,有些人拥有数千万的追随者,但这并不常见。对于我们需要大规模使用的这些类型的账户,通常会有一个单独的代码路径
第二种技术在任何给定时间使用多个分区将插入扇出到整个集群。这个策略的好处是,我们可以使用一个分区来处理小卷,使用多个分区来处理大卷。
我们在这个设计中所做的权衡是在读取时我们需要使用散射聚集,这有明显的更高的开销。这可能会使分页更加困难。我们需要能够跟踪我们为每个小发明摄取了多少数据。这是为了确保我们可以选择正确数量的分区来使用。如果我们使用太多的桶,我们就会在很多分区上执行很多非常小的读取操作。如果桶太少,我们会得到非常大的分区,这些分区不能很好地压缩、修复、流处理,并且读取性能很差。
在这个例子中,我们将研究一个理论模型,它适用于那些在Twitter这样的社交网络上关注大量用户的人。大多数帐户都可以使用一个单独的分区来接收消息,但有些人/机器人可能会关注数百万个帐户。
免责声明:我不知道Twitter实际上是如何存储他们的数据的,这只是一个简单的例子来讨论。
CREATE TABLE tweet_stream (
account text,
day text,
bucket int,
ts timeuuid,
message text,
primary key((account, day, bucket), ts)
) WITH CLUSTERING ORDER BY (ts DESC)
AND COMPACTION = {'class': 'TimeWindowCompactionStrategy',
'compaction_window_unit': 'DAYS',
'compaction_window_size': 1};
这个数据模型扩展了前面的数据模型,将bucket添加到分区键中。现在,每天都可以从多个桶中获取数据。当需要读取时,我们需要从所有分区中获取所需的结果。为了演示,我们将插入一些数据到我们的分区:
cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 0, now(), 'hi');
cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 1, now(), 'hi2');
cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 2, now(), 'hi3');
cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 3, now(), 'hi4');
如果我们想要十个最新的消息,我们可以这样做:
from itertools import chain
from cassandra.util import unix_time_from_uuid1
prepared = session.prepare("SELECT ts, message FROM tweet_stream WHERE account = ? and day = ? and bucket = ? LIMIT 10")
# let's get 10 buckets
partitions = range(10)
# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
args = product(["jon_haddad"], ["2017-07-01"], partitions)
result = execute_concurrent_with_args(session, prepared, args)
# [ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1e6d0>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1d710>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1d4d0>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1d950>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1db10>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1dfd0>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1dd90>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1d290>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1e250>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1e490>)]
results = [x.result_or_exc for x in result]
# append all the results together
data = chain(*results)
sorted_results = sorted(data, key=lambda x: unix_time_from_uuid1(x.ts), reverse=True)
# newest stuff first
# [Row(ts=UUID('e1c59e60-7406-11e7-9458-897782c5d96c'), message=u'hi4'),
# Row(ts=UUID('dd6ddd00-7406-11e7-9458-897782c5d96c'), message=u'hi3'),
# Row(ts=UUID('d4422560-7406-11e7-9458-897782c5d96c'), message=u'hi2'),
# Row(ts=UUID('d17dae30-7406-11e7-9458-897782c5d96c'), message=u'hi')]
这个例子只使用了10个项目,所以我们可以作为懒惰的程序员,合并列表,然后对它们排序。如果我们想获取更多的元素我们就需要k路归并算法。我们将在以后的博客中进一步讨论这个话题。
此时,您应该对如何围绕集群分发数据和请求有了更好的理解,这使得集群可以比使用单个分区时扩展得更大。记住每个问题都是不同的,没有万能的解决方案。