什么是日志服务
日志服务(Log Service),简称LOG,原SLS。是针对实时数据的一站式服务,在阿里巴巴集团经历大量大数据场景锤炼而成。无需开发就能快捷完成数据采集、消费、投递以及查询分析等功能,帮助提升运维、运营效率,建立DT时代海量日志处理能力。
日志服务本身是流数据存储,实时计算 Flink能将其作为流式数据输入。对于日志服务而言,数据格式类似JSON,示例如下。
{"a": 1000,"b": 1234,"c": "li"}
对于实时计算而言,我们需要定义的DDL如下(sls即Log Service)。
create table sls_stream(a int,b int,c varchar) with (type ='sls',endPoint ='http://cXXXXXXXXyuncs.com',accessId ='0iXXXXXXXAs',accessKey ='yF60EXXXXXXXPJ2zhCfHU',startTime = '2017-07-05 00:00:00',project ='ali-XXXXX-streamtest',logStore ='strXXXtest',consumerGroup ='consXXXXroupTest1');
属性字段
目前默认支持三个属性字段的获取,也支持其他自定义写入的字段。
| 字段名 | 注释说明 |
|---|---|
__source__ |
消息源 |
__topic__ |
消息主题 |
__timestamp__ |
日志时间 |
举例
通过 HEADER 关键字获取属性字段。
测试数据
__topic__: ens_altar_flowresult: {"MsgID":"ems0a","Version":"0.0.1"}
测试代码
CREATE TABLE sls_log (__topic__ varchar HEADER,result varchar)WITH(type ='sls');CREATE TABLE sls_out (name varchar,MsgID varchar,Version varchar)WITH(type ='RDS');INSERT INTO sls_outSELECT__topic__,JSON_VALUE(result,'$.MsgID'),JSON_VALUE(result,'$.Version')FROMsls_log
测试结果
| name(VARCHAT) | MsgID(VARCHAT) | Version(VARCHAT) |
|---|---|---|
| ens_altar_flow | ems0a | 0.0.1 |
WITH参数
| 参数 | 注释说明 | 备注 |
|---|---|---|
| endPoint | 消费端点信息 | 日志服务的ENDPOINT地址 |
| accessId | sls读取的accessKey | N/A |
| accessKey | sls读取的密钥 | N/A |
| project | 读取的sls项目 | N/A |
| logStore | project下的具体的logStore | N/A |
| consumerGroup | 消费组名 | 用户可以自定义消费组名(没有固定格式) |
| startTime | 消费日志开始的时间点 | N/A |
| heartBeatIntervalMills | 可选,消费客户端心跳间隔时间 | 默认为10s |
| maxRetryTimes | 读取最大尝试次数 | 可选,默认为5 |
| batchGetSize | 单次读取logGroup条数 | 可选,默认为10 |
| lengthCheck | 单行字段条数检查策略 | 可选,默认为SKIP,其它可选值为EXCEPTION、PAD。SKIP:字段数目不符合时跳过 。EXCEPTION:字段数目不符合时抛出异常。PAD:按顺序填充,不存在的置为null。 |
| columnErrorDebug | 是否打开调试开关,如果打开,会把解析异常的log打印出来 | 可选,默认为false |
注意:
- SLS暂不支持MAP类型的数据。
- 字段顺序支持无序(建议字段顺序和表中定义一致)。
- 输入数据源为Json形式时,注意定义分隔符,并且需要采用内置函数分析JSON_VALUE,否则就会解析失败。报错如下:
2017-12-25 15:24:43,467 WARN [Topology-0 (1/1)] com.alibaba.blink.streaming.connectors.common.source.parse.DefaultSourceCollector - Field missing error, table column number: 3, data column number: 3, data filed number: 1, data: [{"lg_order_code":"LP00000005","activity_code":"TEST_CODE1","occur_time":"2017-12-10 00:00:01"}]
- batchGetSize设置不能超过1000,否则会报错
- batchGetSize指明的是logGroup获取的数量,如果单条logItem的大小和 batchGetSize都很大,很有可能会导致频繁的GC,这种情况下该参数应注意调小。
类型映射
日志服务和实时计算字段类型对应关系,建议您使用该对应关系进行DDL声明:
| 日志服务字段类型 | 流计算字段类型 |
|---|---|
| STRING | VARCHAR |
本文转自实时计算——
创建日志服务(Log Service)源表