作者:终古
1 组件简介
MongoDB是面向文档的NoSQL(非关系型)数据库,它的数据结构由字段(Field)和值(Value)组成,类似于JSON对象。
Mongodb输入组件提供了从MongoDB数据库中全量和增量两种读取方式,并且能够对数据自动进行数据分片,从而可以并发的高校读取数据。
2 组件配置介绍
如上图所以,MongoDB输入组件需要以下配置信息:数据源、表、输入过滤、输出字段。
- 数据源
数据源信息在dataphin数据源配置,配置完成后在此处可下拉框中选择
- 表
MongoDB数据库存储了表的元数据信息,选择数据源后,该表下拉框会自动列出该数据源下的表名。注意这里默认显示100条,输入表名会进行实时查询匹配
- 输入过滤
非必选项,此为配置全量读取或者增量读取或者有条件读取的关键。此处配置的语法遵循MongoDB的过滤表达式的语法,例如只同步年龄大于23的数据: {"age":{"$gt":23}}
- 输出字段
此处需要手动配置输出字段,支持批量添加方式和逐个增加的方式:
- 批量添加:点击批量添加,输入json对象数组,每个json对表表示一个字段信息,可参见默认示例
- 逐个添加:点击新建输出字段,每个字段信息包括字段名称、字段类型,字段类型为标准的MongoDB数据类型
注:因为MongoDB为NoSQL数据库,是没有固定字段元数据的,因此没有表的字段元数据,所以需要根据表的数据内容手动配置
3 组件使用
本文介绍全量和增量两种典型的数据同步场景。
3.1 全量同步
全量同步的方式非常简单,即 输入过滤 什么都不配置就是全量同步,如果需要根据条件过滤那就遵循MongoDB的过滤表达式配置过滤语句即可。
首先,这是表的数据:
其中_id是系统字段,我们的字段有:name(string)、id(long)、gmt_greate(date)。
3.1.1 无过滤的全量同步
- 输入过滤: 无需配置
- 界面配置:
- 预览结果
3.1.2 有过滤的全量同步
- 输入过滤:过滤名称为sff的数据: {"name":"sff"}
- 界面配置:
- 预览结果
可以看到name=2的数据被过滤掉了
3.2 增量同步
增量同步依旧是配置输入过滤,注意目前仅支持按照时间字段进行增量同步。因此,这里介绍如何使用时间字段进行增量同步。
首先,这是表的数据:
3.2.1 字段类型为Date的增量同步
- 输入过滤:每日周期同步T-1的典型增量配置。
即假设今天是2021-09-19,那今天任务运行需要同步昨天的数据即: 2021-09-18 00:00:00,这个需要结合 调度参数 配合使用。因为mongodb的过滤表达式:
{"gmt_create":{$gte:ISODate("2021-09-18T00:00:00.000Z")}},所以这就需要拼接出2021-09-18T00:00:00.000Z这种时间格式,因此需要在调度参数那里定义两个变量:
date: {yyyy-MM-dd}
time: {hh:mm:ss}(或者不需要配置此变量,直接写死00:00:00即可)
因此 最终的输入过滤为: {"gmt_create":{$gte:ISODate("${date}T${time}.000Z")}}
若time固定从0点开始,也可以直接配置:{"gmt_create":{$gte:ISODate("${date}T00:00:00.000Z")}}
- 界面配置:
- 预览结果
3.2.2 字段类型为String的时间增量同步
对于类型是String的,可以直接使用字符串比较
- 输入过滤:{"gmt_create":{$gte:"${date} 00:00:00"}} (注:date为系统变量,业务日期,yyyy-MM-dd,可直接使用)
- 配置界面:
- 数据预览
4 MongoDB特别注意
MongoDB每个表都有一个系统字段:_id,Object类型,由数据库维护,此字段可被覆盖,但切记不要覆盖此字段,也就是说任何写入MongoDB 表的任务都不应该显示写入 _id 字段,否则破坏该字段的类型,会导致数据集成在读取该表数据时数据切分过程报错或者读取数据不准确。因为mongodb输入组件在进行数据切分时便是使用该字段,强依赖该字段的Object类型,一旦遇到非Object类型,轻则报错,重则数据读取不准确。