Cassandra数据迁移-BulkLoad离线工具介绍

简介: 该工具通过文件流接口快速导入数据到cassandra集群,是最快地将线下数据迁移到线上cassandra集群方法之一,准备工作如下 线上cassandra集群 线下数据,sstable格式或者csv格式。

该工具通过文件流接口快速导入数据到cassandra集群,是最快地将线下数据迁移到线上cassandra集群方法之一,准备工作如下

  • 线上cassandra集群
  • 线下数据,sstable格式或者csv格式。
  • 同vpc一台独立的ecs,开放安全组,能访问cassandra集群端口

1. 准备同vpc下客户端ecs

建议独立的ecs,不要和线上cassandra集群混用,混用会影响线上服务。

2. 创建schema

$ cqlsh -f schema.cql  -u USERNAME -p PASSWORD [host]

3. 准备数据

3.1 sstable数据格式

按data/${keyspace}/${table} 格式组织目录,将sstable数据放入目录,如下示例

ls /tmp/quote/historical_prices/
md-1-big-CompressionInfo.db    md-1-big-Data.db        md-1-big-Digest.crc32        md-1-big-Filter.db        md-1-big-Index.db        md-1-big-Statistics.db        md-1-big-Summary.db        md-1-big-TOC.txt

我示例中keyspace为quota,table为historical_prices

导入数据

执行sstableloader,在cassandra发行包bin目录下,指定数据目录 data/${ks}/${table}

${cassandra_home}/bin/sstableloader -d <ip address of the node> data/${ks}/${table}

静等sstable数据导入成功,使用cqlsh检查
bin/cqlsh -u USERNAME -p PASSWORD [host]

$ bin/cqlsh 
cqlsh> select * from quote.historical_prices;

 ticker | date                            | adj_close | close     | high      | low       | open      | volume
--------+---------------------------------+-----------+-----------+-----------+-----------+-----------+--------
   ORCL | 2019-10-29 16:00:00.000000+0000 | 26.160000 | 26.160000 | 26.809999 | 25.629999 | 26.600000 | 181000
   ORCL | 2019-10-28 16:00:00.000000+0000 | 26.559999 | 26.559999 | 26.700001 | 22.600000 | 22.900000 | 555000

如果原表有索引执行,执行bin/nodetool rebuild_index重建索引,具体命令使用参考相关帮助。

3.2 csv数据格式

csv格式数据需要先将csv数据转成sstable格式,cassandra给我们提供了CQLSSTableWriter工具,用于生成生成sstable,其实可以将任意格式数据转化成sstable格式。
因为csv格式也是需要自己预先组织,所以需要自己编写csv格式解析代码,然后编译执行。
该工具使用示例代码如下,完整工具参考git repo

        // Prepare SSTable writer
        CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder();
        // set output directory
        builder.inDirectory(outputDir)
               // set target schema
               .forTable(SCHEMA)
               // set CQL statement to put data
               .using(INSERT_STMT)
               // set partitioner if needed
               // default is Murmur3Partitioner so set if you use different one.
               .withPartitioner(new Murmur3Partitioner());
        CQLSSTableWriter writer = builder.build();
        
        //TODO: 读取csv文件,迭代读取每一行
        while ((line = csvReader.read()) != null)
                {
                    writer.addRow(ticker,
                                  DATE_FORMAT.parse(line.get(0)),
                                  new BigDecimal(line.get(1)),
                                  new BigDecimal(line.get(2)),
                                  new BigDecimal(line.get(3)),
                                  new BigDecimal(line.get(4)),
                                  Long.parseLong(line.get(6)),
                                  new BigDecimal(line.get(5)));
                }
                writer.close();

执行自定义程序生成sstable后,照3.1 章节导入数据。

sstableloader原理介绍

image

sstableloader工具是一个cassandra客户端,集成了datastax driver会拉取cluster tokenMap信息,知道集群partitionKey的sharding情况。整个sstable也是按tokenRange排好序的,在导数据期间,会将文件拆解成不同的tokenRange,以文件流的方式传输到后端的node上。

阅读相关源码,sstableloader也使用了cassandra streamfile接口,这个接口有个明显的可优化点,linux零拷贝技术,目前的实现将sstable中的数据段以对象方式传输到后端node上,涉及了内核层pagecache到用户空间buffer拷贝,我们可以使用mmap技术直接将文件在pagecache层面写到socket fd上,避免了用户态buffer拷贝,减少两次内存拷贝,这被称为linux零拷贝技术。

入群邀约

为了营造一个开放的 Cassandra 技术交流环境,社区建立了微信群公众号和钉钉群,为广大用户提供专业的技术分享及问答,定期开展专家技术直播,欢迎大家加入。另外阿里云提供免费Cassandra试用:https://www.aliyun.com/product/cds
8a55f5a99463a7276265074b1079d74f4ab3d164

相关文章
|
安全 Java 数据处理
Python网络编程基础(Socket编程)多线程/多进程服务器编程
【4月更文挑战第11天】在网络编程中,随着客户端数量的增加,服务器的处理能力成为了一个重要的考量因素。为了处理多个客户端的并发请求,我们通常需要采用多线程或多进程的方式。在本章中,我们将探讨多线程/多进程服务器编程的概念,并通过一个多线程服务器的示例来演示其实现。
|
SQL Java 关系型数据库
实时计算 Flink版操作报错之如何处理从源表插入数据到结果表报错误:[ERROR] Could not execute SQL statement.
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
存储 弹性计算 人工智能
2025年阿里云企业云服务器ECS选购与配置全攻略
本文介绍了阿里云服务器的核心配置选择方法论,涵盖算力需求分析、网络与存储设计、地域部署策略三大维度。针对不同业务场景,如初创企业官网和AI模型训练平台,提供了具体配置方案。同时,详细讲解了购买操作指南及长期运维优化建议,帮助用户快速实现业务上云并确保高效运行。访问阿里云官方资源聚合平台可获取更多最新产品动态和技术支持。
|
Linux 程序员 Perl
老程序员分享:Linux查看系统开机时间
老程序员分享:Linux查看系统开机时间
1467 0
|
Java 开发者
超实用!一文搞懂Java中if-else和switch的正确打开方式!
超实用!一文搞懂Java中if-else和switch的正确打开方式!
381 2
|
中间件 数据挖掘 API
ERP系统的系统集成与接口管理:实现高效协同
【7月更文挑战第29天】 ERP系统的系统集成与接口管理:实现高效协同
1361 0
|
XML JSON 监控
淘宝商品数据接口实战:自动化监控与竞品分析
淘宝开放平台提供的商品列表数据接口是一种API,使开发者能编程获取淘宝商品数据。主要功能包括按关键词、分类等获取商品列表及其详情,并支持分页、排序及多维度筛选。常见参数有关键词、页码、排序方式等。使用需注册账号获取API密钥,构建并发送HTTP请求,解析JSON/XML响应数据进行业务处理。此接口适用于商品监控、市场分析等多种场景。[体验API](http://u6v.cn/5W41Dx)
|
SQL 存储 DataWorks
DataWorks产品使用合集之DataWorks中,配置DataHub数据源如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
546 5
|
监控 搜索推荐 数据挖掘
淘宝关键词设置:技巧与实战指南
淘宝关键词设置:技巧与实战指南
2520 1
|
存储 SQL 机器学习/深度学习
OpenSearch大模型实践之Havenask篇
ChatGPT在通用任务上表现优秀,但无法解决众多垂直业务领域问题,这也不是ChatGPT模型本身的定位,而是需要通过其生态应用去解决。如何在垂直领域针对特定业务构建企业专属问答并且确保生成内容可控,是垂直领域面临的主要问题,也是从事行业搜索和问答的应用努力的方向。OpenSearch作为行业智能搜索产品,其使命就是去解决这一问题,我们正在做积极的探索。同时Havenask作为云产品OpenSearch和集团内引擎HA3的开源版本,也进行了尝试和探索。
10155 2
OpenSearch大模型实践之Havenask篇