【技术实验】表格存储Tablestore准实时同步数据到Elasticsearch

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: TableStore是阿里云自研专业级分布式NoSQL数据库,Elasticsearch是著名的开源搜索引擎,本篇文章会介绍如何同步TableStore中的数据到Elasticsearch中,以便对部分字段支持搜索功能。

实验背景

图书馆Q是一家大型图书馆,图书馆藏书众多,纸质图书600多万册,电子图书7000多万册,总数有八千多万册,这些图书之前都是人工检索维护的,现在需要做一个系统来存储管理这些图书信息。
需求如下:

  • 图书总量目前八千多万册,考虑到未来二十年的增长,需要系统能支持一亿的存储量。
  • 图书信息很重要,不能接受丢失发生。
  • 图书的名字和作者名字需要支持模糊搜索。
  • 每本书的属性最多有一百多个,且不固定,不同类型的图书的属性列差异较大。且未来可能会新增属性列。

根据上面这些需求特点,要完成这个管理系统,需要两类系统支持:

  • 分布式NoSQL数据库:解决两亿存储量的问题,解决属性列较多且不固定的问题,解决可靠性要求高的问题。
  • 搜索系统:解决固定列模糊搜索的需求。

如果使用阿里云产品,那么对应的产品就是:

  • Table Store:分布式NoSQL数据库。
  • Elasticsearch:搜索系统,支持模糊搜索。

在管理系统中使用上述两个系统的时候,目前需要双写,当新增一本书的时候,需要将详细书本信息写入Table Store,将书本ID和作者,书名写入Elasticsearch,并且对书名,作者建索引。查询的时候,如果是根据书本ID,则直接查询Table Store。如果是根据书名模糊查询,则先查Elasticsearch,获取到匹配的书本的ID后,再到Table Store中查询详细信息。

如果Table Store到Elasticsearch有自动同步通道,那么只需要将新书信息写入Table Store即可,不再需要写Elasticsearch。减少了一次写入操作,且不用再考虑数据一致性问题,系统架构大大简化。那么如何才能实现这个自动同步通道呢?

类似于上面的场景,有很多系统都有这样的需求:拥有PB级海量数据需要持久化存储,同时有一两个字段需要做模糊查询,比如姓名,手机号码等,目前很多解决方案需要双写分布式数据库和Elasticsearch,但这样不仅会带来开发、运维复杂度,而且还有数据不一致的问题。

针对上述问题,Table Store团队联合数据集成(CDP)和Elasticsearch团队上线了近实时的数据同步方案,用户只需要将数据写入Table Store,Table Store会负责将数据在10分钟内自动发送给Elasticsearch建索引。

实验环境

Table Store:阿里云分布式NoSQL数据库,专注于海量数据的存储服务,目前单表可支持10PB级,10万亿行以上的数据量,且数据量增大后性能仍然保持稳定。Table Store Stream功能是一种增量实时通道服务,类似于MySQL的binlog,可以通过Stream接口实时读取到最新的变化数据(Put/Update/Delete)。

数据集成 :阿里云数据管理平台,支持数据同步等众多数据功能。

Elasticsearch :阿里云Elasticsearch是刚推出的一项新服务,提供基于开源Elasticsearch及商业版X-Pack插件,致力于数据分析、数据搜索等场景服务。在开源Elasticsearch基础上提供企业级权限管控、安全监控告警、自动报表生成等功能。

三种产品在新解决方案中的角色如下:

| 产品 | Table Store | 数据集成| Elasticsearch |

角色 数据存储 数据同步通道 查询增强

由于Table Store和Elasticsearch不是完全对等的产品,所以如果需要将数据导入Elasticsearch,那么在使用Table Store的时候有一些注意的地方:

  • Table Store主键列个数:
    目前Table Store最大支持4个主键列,而Elasticsearch只支持一个,所以Table Store的表设计时只能使用一个主键列,如果之前有多个主键列,可以将多个主键列的值转换成String,然后拼接成一个主键列。
  • Table Store数据变化类型:
    仅支持PUT(新增),UPDATE(更新)两种操作。

不支持DELETE操作。

  • Table Store多版本:
    仅支持单版本,不支持多版本
  • Elasticsearch:
    版本:支持阿里云和开源的5..版本。

延时:目前使用的是周期调度,每隔5分钟调度一次,再加上插件中有5分钟延迟,同步总延迟在5~10分钟。

实验步骤

开通服务

  • Table Store:
  • 登陆Table Store。首页点击开通
  • 新建实例和表,表需要开通Stream,有效时间可以选择24小时。
  • Table Store支持按预留CU和按量付费两种收费模式,如果创建表时指定读写CU都为0则表示为按量付费,后期如果没有使用则不收费。且目前每月有10GB,1000万CU的免费额度。
  • Table Store中表需要开通Stream功能。
  • 数据集成
  • 登陆DataWorks绑定AK。
  • 然后创建项目即可。
  • 注意:子账号不能创建项目,只能被主账号授权。
  • Elasticsearch
  • 登陆Elasticsearch首页,首页点击开通。购买时的VPC必须和之前购买的ECS在同一个VPC环境内部。根据数据量预估,购买相应的实例大小。

使用方式

  • Table Store:PutRow/BatchWriteRow接口写入数据
  • Elasticsearch:无须写入
  • Elasticsearch:搜索到请求结果后,拿到每个doc的_id字段值。
  • Table Store:Elasticsearch中的_id字段就是Table Store中的主键值,获取到一系列_id值后,使用Table Store的BatchGetRow可以查询到完整数据。

同步流程

  1. 整个同步流程应该包括下面两个步骤:
  1. 导出Table Store的全量数据到Elasticsearch,并且记录开始时间T1。
  2. 等全量导出结束后,再开始同步增量数据,增量数据开始同步的时间是T1。
  3. 对于全量导出,需要使用otsreader插件,配置中的Range使用INF_MIN到INF_MAX,也就是导出所有数据。
  4. 对于增量同步,需要配置起始时间和结束时间为一个变量,在调度周期配置的时候配置起始时间必须小于等于T1,否则可能会有数据丢失发生。

我们下面会以增量同步为例来介绍如何配置增量同步任务。

Table Store配置

无须配置

Elasticsearch配置

无须配置

数据集成配置

  1. 创建数据源(可选)

如果已经创建了Table Store的数据源,则可以跳过这一步。
如果不希望创建数据源,也可以在配置页面配置相应的endpoint,instanceName,AccessKeyID和AccessKeySecret。如果希望创建,则按照下面步骤操作。

登录阿里云大数据开发套件:数据源地址。
单击左侧 离线同步 > 数据源。
在数据源配置页面,选择右上角 新增数据源 ,会有一个弹出框。

按照说明填写:

  • 数据源名称:填写一个数据源标识符,比如车联网。
  • 数据源描述:填入描述符,比如:车联网GPS数据存储。
  • 数据源类型:选择 ots ,ots是Table Store曾用名。
  • OTS Endpoint:填入TableStore 实例页面的实例地址,如果Table Store的实例和目标产品(比如Elasticsearch)在同一个region,则可以填入私网地址,否则需要填入公网地址,不能填入VPC地址。
  • OTS 实例ID:填入Table Store的实例名称。
  • Access Id:填入阿里云网站的AccessKeyID。
  • Access Key:填入阿里云网站AccessKeyID对应的AccessKeySecret。
  • 点击测试连通性 ,如果成功则会在右上角提示:测试连接成功。 如果失败,点击endpoint是否配置正确,如果仍然无法解决,提工单联系数据集成。
  • 填好后的页面类似下面这样:
    Elasticsearch_
  • 单击确定,数据源创建成功,此时在数据源页面会出现一个新的数据源信息;
    Elasticsearch_
  1. 创建导出任务
    (1)单击数据集成地址,进入数据集成的页面,会出现模式选择:

Elasticsearch_
(2)单击 脚本模式 ,弹出一个 导入模板 配置。
(3)在导入模板配置里面:
来源类型:OTS Stream
目标类型:Elasticsearch
Elasticsearch_
(4)单击确认,则进入配置界面。

  1. 完善配置项

在配置界面,已经提前嵌入了OTSStreamReader和ElasticsearchWriter的模板,每一项配置后面都做了解释。

{
  "type": "job",
  "version": "1.0",
  "configuration": {
    "setting": {
      "errorLimit": {
        "record": "0"  # 允许出错的个数,当错误超过这个数目的时候同步任务会失败。
      },
      "speed": {
        "mbps": "1",  # 每次同步任务的最大流量。
        "concurrent": "1" # 每次同步任务的并发度。
      }
    },
    "reader": {
      "plugin": "otsstream",  # Reader插件的名称。
      "parameter": {
        "endpoint": "",  # TableStore中实例的endpoint。
        "accessId": "",  # 阿里云的AccessKeyID。
        "accessKey": "",  # 阿里云的AccessKeySecret。
        "instanceName": "",  # TableStore的实例名,如果使用DataSource,则需要新增配置项datasource,不再需要配置endpoint,accessId,accessKey和instanceName。
        "dataTable": "",  # TableStore中的表名。
        "statusTable": "TableStoreStreamReaderStatusTable", # 存储TableStore Stream状态的表,一般不需要修改。
        "startTimestampMillis": "", # 开始导出的时间点,由于是增量导出,需要循环启动此任务,则这里每次启动的时候的时间都不一样,这里需要设置一个变量,比如${start_time}。
        "endTimestampMillis": "",   # 结束导出的时间点。这里也需要设置一个变量,比如${end_time}。
        "date": "yyyyMMdd",  # 导出哪一天的数据,功能和startTimestampMillis、endTimestampMillis重复,这一项需要删除。
        "mode": "single_version_and_update_only", # TableStore Stream导出数据的格式,目前ElasticSearch只能接收这种格式的,这个不需要修改。如果配置模板中没有则需要增加。
        "column":[  # 需要导出TableStore中的哪些列到ElasticSearch中去,如果配置模板中没有则需要增加。
            {"name":"uid"},  
            {"name":"name"},           
            {"name":"phone"}
        ],
        "isExportSequenceInfo": false,  # single_version_and_update_only 模式下只能是false。
        "maxRetries": 30 # 最大重试次数。
      }
    },
    "writer": {
      "plugin": "elasticsearch",  # Writer插件的名称:ElasticSearchWriter,不需要修改。
      "parameter": {
        "endpoint": "",# ElasticSearch的endpoint,控制台上有。
        "accessId": "",# 如果使用了X-PACK插件,则这里需要填写username,如果没使用,则这里填空字符串即可。阿里云Elasticsearch使用了X-PACK插件,这里需要填写username。
        "accessKey": "", # 如果使用了X-PACK插件,则这里需要填写password,如果没使用,则这里填空字符串即可。阿里云Elasticsearch使用了X-PACK插件,这里需要填写password。  
        "index": "",  # ElasticSearch的索引名称,如果之前没有,插件会自动创建。
        "indexType": "", # ElasticSearch中相应索引下的类型名称
        "cleanup": true, # 是否在每次导入数据到ElasticSearch的时候清空原有数据,全量导入/重建索引的时候需要设置为true,同步增量的时候必须为false,这里因为是同步,则需要设置为false。
        "discovery": false, # 是否自动发现,设置为true
        "batchSize": 1000, # 每批导出的个数
        "splitter": ",",  # 如果插入数据是array,就使用指定分隔符。
        "column": [  # ElasticSearch中的列名,顺序和Reader中的Column顺序一致
          {
            "name": "uid",  # TableStore中的主键列是uid,这里也有同名uid,用type:id表示这一列是主键
            "type": "id"  # id表示这一列是主键,id不是ElasticSearch的内置类型,是ElasticSearchWriter提供的虚拟类型
          },
          {
              "name": "name", # 对应于TableStore中的属性列:name
            "type": "text"  # 文本类型,采用默认分词
          }
        ]
      }
    }
  }
}

其他配置项可以参考:ElasticsearchWriter配置项https://help.aliyun.com/knowledge_detail/61990.html

  1. 保存任务

点击上部的 保存 按钮,则弹出一个对话框,输入任务名称后按确定即可。
Elasticsearch_

  1. 设置调度资源

(1)由于目前数据集成还没办法自动访问VPC环境内的Elasticsearch,所以暂时需要用户自己购买VPC内的ECS机器作为调度资源。
(2)购买一台ECS,新购ECS所在的VPC需要和Elasticsearch的VPC是同一个。
(3)进入 DataWorks > 调度资源列表 ,单击 新增调度资源 。
(4)在弹出的对话框中输入:资源名称,选择:归属项目。
Elasticsearch_
(5)然后点击 确定 ,会提示 添加调度资源成功 。
Elasticsearch_
(6)点击 管理服务器 ,会弹出一个新的弹出框。
Elasticsearch_

  • (7)点击 增加服务器 ,会弹出一个新的弹出框。
  • 选择 专有网络 。
  • 输入刚刚购买的ECS的:ECS UUID。登陆ECS,以root身份执行命令 bash dmidecode | grep UUID 即可获取到UUID。
  • 输入刚刚购买的ECS的内网:机器IP。
  • 单击 添加 。
    Elasticsearch_

(8)如果是添加机器,需要初始化服务器,点击 调度资源列表 页面相应资源名称后面的 服务器初始化 ,会弹出初始化步骤,按这个步骤执行。
(9)执行完成后,再点击 调度资源列表 页面相应资源名称后面的 服务器管理 ,点击 刷新 ,当 服务状态 变为 正常 时,表示调度资源配置成功。
Elasticsearch_

  1. 运行任务(测试)

(1)可以先通过运行任务来测试配置是否正确和符合预期。
(2)回到任务配置页面 数据集成 > 离线同步 > 同步任务 > 数据同步 。
(3)双击刚刚创建的任务:tablestore2es,点击配置内容上部的 运行 。
(4)这时候会弹出一个参数设置框,设置之前配置的变量:
Elasticsearch_
(5)填完值后,点击 确认后,任务会立即开始运行。
(6)运行结束后,如果没有报错,则执行成功,在日志里面会打印同步的数据行数。
(7)最后可以到Elasticsearch中查询索引成功的文档数。具体方法见后面的 <9. 验证结果> 。

  1. 提交任务

(1)回到任务配置页面 数据集成 > 离线同步 > 同步任务 > 数据同步 。
(2)双击刚刚创建的任务:tablestore2es,点击配置内容上部的 提交 。

  • (3) 配置调度参数:
    调度类型:周期调度

自动重跑:√
生效日期:默认值
调度周期:分钟
起始时间:默认值
时间间隔:5分钟
start_time:$[yyyymmddhh24miss-10/24/60],表示调度时间-10分钟。
end_time:$[yyyymmddhh24miss-5/24/60],表示调度时间-5分钟。
跨周期依赖:可以选择:自依赖,等待上一调度结束,才能接续运行。
Elasticsearch_
(4)提交任务后,原有任务处于:直读状态。

  1. 绑定调度资源

(1)切换到 运维中心 > 任务列表 > 周期任务 ,可以看到刚刚创建的周期任务:
Elasticsearch_
(2)选中刚刚创建的周期任务:tablestore2es。点击下部的 修改资源组 ,会弹出一个选择框,选择刚刚创建的资源组名称:
Elasticsearch_
(3)点击 确认 ,即可绑定成功。

  1. 验证结果

(1)周期任务是从下一天的00:00点开始执行。
(2)等执行完一个任务后,可以在ECS上通过下述命令查看Elasticsearch中的数据量:

curl -XGET http://endpoint/index_name/type_name/_count?pretty  -d '
{
    "query": {
        "match_all": {}
    }
}'            

结果类似下面:

{
  "count" : 1000,  # ElasticSearch中index_name索引的type_name类型中的doc数
  "_shards" : {     # 这个是ElasticSearch返回数据相关的meta值,表示总共有5个shard,全部成功返回了结果
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  }
}               
  1. 下一步计划

(1)至此,TableStore数据通过数据集成同步到Elasticsearch的配置完成了,延迟在5分钟到10分钟之间。
(2)虽然目前可以运行了,但是仍然存在一些问题。

需要 设置调度资源 ,比较麻烦。预计十二月中旬的时候这一步可以自动处理,不再需要配置。使用会更加简单。
延迟在 5~10 分钟,对于部分系统而言,可能延迟比较大,预计十二月底的时候可以减少到秒级。时效性会更高。

作者:少强 阿里巴巴技术专家

加入钉钉技术讨论群

dingQR

阿里云Elasticsearch已正式发布啦,Elastic开源官方联合开发,集成5.5商业版本XPack功能,欢迎开通使用。
点击了解更多产品信息

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
14天前
|
存储 监控 数据挖掘
使用 Meltano 将数据从 Snowflake 导入到 Elasticsearch:开发者之旅
【6月更文挑战第9天】Meltano,一个开源数据集成框架,简化了从Snowflake到Elasticsearch的数据迁移。这个工具支持多种数据源,提供易于配置的界面。要开始,需安装Meltano并配置连接信息。一个简单的YAML示例展示了如何定义从Snowflake到Elasticsearch的迁移任务。Meltano自动执行迁移,同时提供监控和日志功能。借助Meltano,用户能高效集成数据,提升搜索和分析能力,适应不断增长的数据需求和挑战。
61 6
|
20天前
|
索引 搜索推荐 缓存
使用Elasticsearch进行高效全文搜索的技术探索
【6月更文挑战第3天】本文探索了使用Elasticsearch进行全文搜索的技术,它基于Lucene,是一款开源、分布式搜索引擎。核心原理在于倒排索引,实现快速查找。全文搜索涉及安装配置、创建索引、索引文档、执行查询及处理结果。为了优化性能,可以调整索引映射、选择合适分析器、利用缓存及优化硬件配置。Elasticsearch因其高效性和灵活性在大数据时代广泛应用。
160 62
|
12天前
|
JSON DataWorks 关系型数据库
DataWorks操作报错合集之同步Elasticsearch数据报错:Cat response did not contain a JSON Array,是什么导致的
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
25天前
|
运维 数据挖掘 Serverless
阿里云Elasticsearch Serverless助力某电商平台公司实现商品订单数据的实时写入查询
某电商平台公司采用阿里云Elasticsearch Serverless解决方案,实现商品、订单和其他关键信息的写入和查询的实时响应。
214 1
|
9天前
|
存储 负载均衡 NoSQL
【后端面经】【NoSQL】ElasticSearch - 1 - 1 节点角色 写入数据
【6月更文挑战第12天】中间件高可用与高性能通过冗余、负载均衡和优化实现。Elasticsearch节点有候选主节点、协调节点和数据节点等角色,可兼任或独立。数据写入涉及Buffer、Page Cache和磁盘,通过段合并优化资源使用。查询通过Commit Point识别新段,确保近实时搜索。
20 0
|
1月前
|
JSON 搜索推荐 API
使用Elasticsearch进行全文搜索:技术深度解析
【5月更文挑战第16天】本文深入解析了使用Elasticsearch进行全文搜索的技术细节。Elasticsearch是一个基于Lucene的开源搜索引擎,支持全文、结构化搜索和数据分析,具备优秀的扩展性。文中介绍了其核心功能,包括全文搜索、结构化搜索、分析和可扩展性。详细步骤涉及安装配置、数据准备、创建索引、导入数据、构建查询及结果处理。Elasticsearch凭借其高效性能和灵活性,成为企业全文搜索的首选解决方案。
|
1月前
|
SQL 监控 API
实时计算 Flink版产品使用合集之可以用来同步数据到 Elasticsearch(ES)吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
实时计算 Flink版产品使用合集之可以用来同步数据到 Elasticsearch(ES)吗
|
1月前
|
安全 搜索推荐 定位技术
一张图30个知识点,全方位认知 Elasticsearch 技术发展
一张图30个知识点,全方位认知 Elasticsearch 技术发展
57 3
|
1月前
|
SQL 监控 搜索推荐
Elasticsearch 与 OpenSearch:开源搜索技术的演进与选择
Elasticsearch 与 OpenSearch:开源搜索技术的演进与选择
101 2
|
26天前
|
消息中间件 存储 关系型数据库
【微服务】mysql + elasticsearch数据双写设计与实现
【微服务】mysql + elasticsearch数据双写设计与实现

相关产品

  • 检索分析服务 Elasticsearch版