前言
本文将介绍在Blink实时计算平台如何使用InfluxDB作为流计算的结果表,以及建立相应的作业流程。
InfluxDB Writer
InfluxDB Sink 是 Blink 的一个插件,实现了将数据点写入到阿里云时序数据库Influxdb版中。
Quick Start
DDL定义
流计算支持使用InfluxDB作为结果输出。示例代码如下:
create table stream_test_influxdb(
`metric` varchar,
`timestamp` BIGINT,
`tag_value1` varchar,
`field_fieldValue1` Double
)with(
type = 'influxdb',
endpoint = 'https://localhost:3242',
database = 'blink',
batchPutsize = '1',
username = 'test',
password = 'Test1111'
);
建表默认格式:
第1列:metric,varchar
第2列:timestamp,BIGINT 单位:毫秒
第3列:tag_value1,varchar
第4列:field_fieldValue1,Double
注意:
1、 metric和timestamp必须存在。
2、支持多个tag和field,但至少添加1个tag_和1个field_。
3、结果表中只支持metric、timestamp、tag_和field_,不能出现其他的字段。
4、endpoint支持http和https。
WITH参数
参数 | 参数说明 | 是否必填 | 备注 |
---|---|---|---|
type | 必填 | 是 | 固定为influxdb |
endpoint | 协议://host:port,支持http和https | 是 | 例如:https://localhost:3242,或者http://localhost:8086 |
database | 写入InfluxDB的数据库名 | 是 | 例如:db-blink或者blink |
username | InfluxDB的用户名 | 是 | 需要对写入的数据库有写权限 |
password | InfluxDB的密码 | 是 | 默认为0。 |
retentionPolicy | 保留策略 | 否 | 不先写的话,默认写入每个database的默认保留策略 |
batchPutSize | 批量提交的记录条数 | 否 | 默认每次提交500个数据点。 |
FAQ:
Question:field_fieldValue支持多少个?
答:默认和InfluxDB支持的一致。
Question:多个field_fieldValue如何写入?
答:field_fieldValue1 类型,
field_fieldValue2 类型,
...
field_fieldValueN 类型
例如:
field_fieldValue1 Double,
field_fieldValue2 INTERGER,
...
field_fieldValueN INTERGER
使用步骤
创建新的任务:
在Blink实时计算平台数据开发模块建立新任务,并填写节点类型、Blink版本、节点名称以及目标文件夹等相关内容
定义任务DDL
新建任务之后,进入该任务,点击切换为SQL模式按钮。按照之前介绍的DDL定义开发自己的任务。如下图所示:
所用的DDL语句参看DDL定义一节。
发布和启动:
作业完成之后,点击发布,选择运行环境及配置可用CU,此次建立的流式作业就正式启动了,可通过运维界面管理作业以及查看作业运行相关信息。