使用datax迁移cassandra数据

本文涉及的产品
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现各种异构数据源之间高效的数据同步功能。最近,阿里云cassandra团队为datax提供了cassandra读写插件,进一步丰富了datax支持的数据源,可以很方便实现cassandra之间以及cassandra与其他数据源之间的数据同步。

DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现各种异构数据源之间高效的数据同步功能。最近,阿里云cassandra团队为datax提供了cassandra读写插件,进一步丰富了datax支持的数据源,可以很方便实现cassandra之间以及cassandra与其他数据源之间的数据同步。本文简单介绍如何使用datax同步cassandra的数据,针对几种常见的场景给出配置文件示例,还提供了提升同步性能的建议和实测的性能数据。

datax快速入门

使用datax同步数据的方法很简单,一共只需要三步:
1 部署datax。
2 编写同步作业配置文件。
3 运行datax,等待同步作业完成。
datax的部署和运行都很简单,可以通过datax官方提供的下载地址下载DataX工具包,下载后解压至本地某个目录,进入bin目录,即可运行同步作业:

$ cd  {YOUR_DATAX_HOME}/bin
$ python datax.py {YOUR_JOB.json}

同步作业的配置格式可以参考datax文档
一个典型的配置文件如下:

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "sliceRecordCount": 10,
            "column": [
              {
                "type": "long",
                "value": "10"
              },
              {
                "type": "string",
                "value": "hello,你好,世界-DataX"
              }
            ]
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "encoding": "UTF-8",
            "print": true
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 5
       }
    }
  }
}

一个同步作业的配置文件主要包括两部分,setting包括任务调度的一些配置,content描述同步任务的内容,里面包含reader插件的配置和writer插件的配置。例如我们需要从mysql同步数据到cassandra,那么我们只需要把reader配置为mysqlreader,writer配置为cassandrawriter,并提供相应的插件配置信息即可。在datax项目页面上面可以看到datax支持的插件列表,点击对应的链接就可以查看相关插件的文档了解插件需要的配置内容和格式要求。例如,cassandra插件的文档可点击如下链接:读插件 写插件
以下列举几种常见的场景。

场景一 cassandra之间的数据同步

最常见的场景是把数据从一个集群同步到另一个集群,例如机房整体迁移、上云等。这时需要先手动在目标集群创建好keyspace和表的schema,然后使用datax进行同步。作为例子,下面的配置文件把数据从cassandra的一个表同步到另一个表:

{
  "job": {
    "setting": {
      "speed": {
        "channel": 3
      }
    },
    "content": [
      {
        "reader": {
          "name": "cassandrareader",
          "parameter": {
            "host": "localhost",
            "port": 9042,
            "useSSL": false,
            "keyspace": "test",
            "table": "datax_src",
            "column": [
              "id",
              "name"
            ]
          }
        },
        "writer": {
          "name": "cassandrawriter",
          "parameter": {
            "host": "localhost",
            "port": 9042,
            "useSSL": false,
            "keyspace": "test",
            "table": "datax_dst",
            "column": [
              "id",
              "name"
            ]
          }
        }
      }
    ]
  }
}

场景二 从mysql同步到cassandra

datax支持多种数据源,可以很方便做到cassandra和其他数据源之间的数据同步。下面的配置把数据从mysql同步到cassandra:

{
  "job": {
    "setting": {
      "speed": {
        "channel": 3
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",
            "password": "root",
            "column": [
              "id",
              "name"
            ],
            "splitPk": "db_id",
            "connection": [
              {
                "table": [
                  "table"
                ],
                "jdbcUrl": [
                  "jdbc:mysql://127.0.0.1:3306/database"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "cassandrawriter",
          "parameter": {
            "host": "localhost",
            "port": 9042,
            "useSSL": false,
            "keyspace": "test",
            "table": "datax_dst",
            "column": [
              "id",
              "name"
            ]
          }
        }
      }
    ]
  }
}

场景三 只同步cassandra中的一部分数据

我们在读插件的配置中提供了where关键字,可以用来只同步一部分数据。例如对于时序数据等场景定期同步的情况,就可以通过增加where的条件来实现只同步增量数据。where条件的格式和cql相同,例如"where":"textcol='a'"的作用类似于使用select * from table_name where textcol = 'a'进行查询。另外还有allowFiltering关键字配合where使用,作用和cql中的ALLOW FILTERING关键字也是相同的。下面给出一个配置的例子:

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1
      }
    },
    "content": [
      {
        "reader": {
          "name": "cassandrareader",
          "parameter": {
            "host": "localhost",
            "port": 9042,
            "useSSL": false,
            "keyspace": "test",
            "table": "datax_src",
            "column": [
              "deviceId",
              "time",
              "log"
            ],
            "where":"time > '2019-09-25'",
            "allowFiltering":true
          }
        },
        "writer": {
          "name": "cassandrawriter",
          "parameter": {
            "host": "localhost",
            "port": 9042,
            "useSSL": false,
            "keyspace": "test",
            "table": "datax_dst",
            "column": [
              "deviceId",
              "time",
              "log"
            ]
          }
        }
      }
    ]
  }
}

提高同步速度

以cassandra之间的数据同步为例。如下这些配置会对数据同步任务的性能产生影响:
(1)并行度
可以通过调大任务的并行度来提高同步速度。这主要通过job.setting.speed.channel从参数来实现。例如下面这个配置的效果是会有10个线程并行执行同步任务。

  "job": {
    "setting": {
      "speed": {
        "channel": 10
      }
    },
...

需要注意的是,cassandra读插件里面,切分任务是通过在cql语句中增加token范围条件来实现的,所以只有使用RandomPartitioner和Murmur3Partitioner的集群才能够正确切分。如果您的集群使用了其他的Partitioner,cassandrareader插件会忽略channel配置,只用一个线程进行同步。
(2)batch
可以通过配置batchSize关键字在cassandra写插件里面使用UNLOGGED batch来提高写入速度。但是需要注意cassandra中对batch的使用有一些限制,使用这个关键字之前建议先阅读[《简析Cassandra的BATCH操作》(https://yq.aliyun.com/articles/719784?spm=a2c4e.11155435.0.0.65386b04OYOsvK)一文中关于batch使用限制的内容。
(3)连接池配置
写插件还提供了连接池相关的配置connectionsPerHost和maxPendingPerConnection。这两个参数的具体含义可以参考[java driver文档](https://docs.datastax.com/en/developer/java-driver/3.7/manual/pooling/)。
(4)一致性配置
读写插件中都提供了consistancyLevel关键字,默认的读写一致性级别都是LOCAL_QUORUM。如果您的业务场景里面可以允许两个集群的数据有少量不一致,也可以考虑不使用默认一致性级别来提高读写性能,例如使用ONE级别来读数据。

性能数据

我们通过一个测试来观察datax同步数据的性能。
服务端使用阿里云cassandra,源集群和目标集群均为3节点,规格为4CPU 8GB。客户端使用一台ECS,规则为4 CPU 16 GB。
首先使用cassandra-stress向源集群写入500w行数据:

cassandra-stress write cl=QUORUM n=5000000 -schema "replication(factor=3) keyspace=test" -rate "threads=300" -col "n=FIXED(10) size=FIXED(64)" -errors "retries=32" -mode "native cql3 user=$USER password=$PWD" -node "$NODE"

写入过程的统计数据如下:
image

然后使用datax将这些数据从源集群同步到目标集群。配置文件如下:

{
  "job": {
    "setting": {
      "speed": {
        "channel": 10
      }
    },
    "content": [
      {
        "reader": {
          "name": "cassandrareader",
          "parameter": {
            "host": "<源集群NODE>",
            "port": 9042,
            "username":"<USER>",
            "password":"<PWD>",
            "useSSL": false,
            "keyspace": "test",
            "table": "standard1",
            "column": [
              "key",
              "\"C0\"",
              "\"C1\"",
              "\"C2\"",
              "\"C3\"",
              "\"C4\"",
              "\"C5\"",
              "\"C6\"",
              "\"C7\"",
              "\"C8\"",
              "\"C9\""
            ]
          }
        },
        "writer": {
          "name": "cassandrawriter",
          "parameter": {
            "host": "<目标集群NODE>",
            "port": 9042,
            "username":"<USER>",
            "password":"<PWD>",
            "useSSL": false,
            "keyspace": "test",
            "table": "standard1",
            "batchSize":6,
            "column": [
              "key",
              "\"C0\"",
              "\"C1\"",
              "\"C2\"",
              "\"C3\"",
              "\"C4\"",
              "\"C5\"",
              "\"C6\"",
              "\"C7\"",
              "\"C8\"",
              "\"C9\""
            ]
          }
        }
      }
    ]
  }
}

同步过程的统计数据如下:
image
可见,datax同步数据的性能和cassandra-stress的性能相当,甚至要好一些。

入群邀约

为了营造一个开放的 Cassandra 技术交流,我们建立了微信群公众号和钉钉群,为广大用户提供专业的技术分享及问答,定期开展专家技术直播,欢迎大家加入。
阿里云为广大开发者提供云上Cassandra资源,可用于动手实践:9.9元可使用三月(限首购)。
直达链接:https://www.aliyun.com/product/cds

TB13X7UGQL0gK0jSZFxXXXWHVXa-1032-1030.jpg

钉钉群入群链接:https://c.tb.cn/F3.ZRTY0o

微信群公众号:
2a9d5b6c3b9030d06ddb59131aab7287945ead53.png

目录
相关文章
|
6月前
|
数据采集 DataWorks 安全
DataWorks产品使用合集之选择独享调度,数据集成里可以使用,但是数据地图里面测试无法通过,是什么原因导致的
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
59 0
DataWorks产品使用合集之选择独享调度,数据集成里可以使用,但是数据地图里面测试无法通过,是什么原因导致的
|
5月前
|
SQL DataWorks 关系型数据库
DataWorks产品使用合集之数据集成时源头提供数据库自定义函数调用返回数据,数据源端是否可以写自定义SQL实现
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
5月前
|
DataWorks NoSQL fastjson
DataWorks操作报错合集之DataX进行MongoDB全量迁移的过程中,DataX的MongoDB Reader插件在初始化阶段找不到Fastjson 2.x版本的类库,该怎么办
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
5月前
|
DataWorks 安全 API
DataWorks产品使用合集之是否可以不使用DataWorks进行EMR的调度和DataX数据导入
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
5月前
|
分布式计算 DataWorks 数据挖掘
DataWorks操作报错合集之上传数据时报错com.alibaba.datax.common.exception.DataXException: Code:[UnstructuredStorageReader-11],该如何排查
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
5月前
|
数据采集 分布式计算 大数据
MaxCompute产品使用合集之数据集成中进行数据抽取时,是否可以定义使用和源数据库一样的字符集进行抽取
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
4月前
|
数据采集 SQL DataWorks
【颠覆想象的数据巨匠】DataWorks——远超Excel的全能数据集成与管理平台:一场电商数据蜕变之旅的大揭秘!
【8月更文挑战第7天】随着大数据技术的发展,企业对数据处理的需求日益增长。DataWorks作为阿里云提供的数据集成与管理平台,为企业提供从数据采集、清洗、加工到应用的一站式解决方案。不同于桌面级工具如Excel,DataWorks具备强大的数据处理能力和丰富的功能集,支持大规模数据处理任务。本文通过电商平台案例,展示了如何使用DataWorks构建数据处理流程,包括多源数据接入、SQL任务实现数据采集、数据清洗加工以提高质量,以及利用分析工具挖掘数据价值的过程。这不仅凸显了DataWorks在大数据处理中的核心功能与优势,还展示了其相较于传统工具的高扩展性和灵活性。
152 0
|
6月前
|
分布式计算 DataWorks 关系型数据库
DataWorks操作报错合集之数据源同步时,使用脚本模式采集mysql数据到odps中,使用querySql方式采集数据,在脚本中删除了Reader中的column,但是datax还是报错OriginalConfPretreatmentUtil - 您的配置有误。如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
6月前
|
分布式计算 DataWorks 调度
DataWorks操作报错合集之DataX访问MaxCompute(原ODPS)突然无法读取到字段数据,是什么导致的
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
6月前
|
SQL 数据采集 DataWorks
DataWorks操作报错合集之数据集成里面的数据调度独享资源组测试通过了,但是数据地图里无法通过,该如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。