开发者社区> 表格存储> 正文

使用函数计算对表格存储中数据做简单清洗

简介: 表格存储的增量数据流功能能够使用户使用API获取Table Store表中增量数据,并可以进行增量数据流的实时增量分析、数据增量同步等。通过创建Table Store触发器,能够实现Table Store Stream和函数计算的自动对接,让计算函数中自定义的程序逻辑自动处理Table Store表中发生的数据修改,充分的利用了函数计算全托管、弹性伸缩的特点。
+关注继续查看

函数计算(Function Compute) 是一个事件驱动的服务,通过函数计算,用户无需管理服务器等运行情况,只需编写代码并上传。函数计算准备计算资源,并以弹性伸缩的方式运行用户代码,而用户只需根据实际代码运行所消耗的资源进行付费。

Table Store Stream是用于获取Table Store表中增量数据的一个数据通道,通过创建Table Store触发器,能够实现Table Store Stream和函数计算的自动对接,让计算函数中自定义的程序逻辑自动处理Table Store表中发生的数据修改。

表格存储高并发的写入性能以及低廉的存储成本非常适合物联网、日志、监控数据的存储,我们可以将数据写入到表格存储中,同时在函数计算中对新增的数据做简单的清洗、转换、聚合计算等操作,并将清洗之后的数据写回到表格存储的结果表中,并对原始明细数据及结果数据提供实时访问。

下面,我们使用函数计算对表格存储中的数据做简单的清洗,并写入到结果表中。

数据定义

我们假设写入的为日志数据,包括三个基础字段:

字段名称 类型 含义
id 整型 日志id
level 整型 日志的等级,越大表明等级越高
message 字符串 日志的内容

我们需要将 level>1 的日志写入到另外一张数据表中,用作专门的查询。

实现过程:

创建实例及数据表

表格存储的控制台创建表格存储实例(__本次以 华东2 distribute-test 为例__),并创建源表(__source_data__)及结果表(__result__),主键为均 __id (整型)__,由于表格存储是 schemafree 结构,无需预先定义其他属性列字段。

_

开启数据源表的Stream功能

触发器功能需要先开启数据表的Stream功能,才能在函数计算中处理写入表格存储中的增量数据。

_stream

Stream记录过期时长 为通过 StreamAPI 能够读取到的增量数据的最长时间。

由于触发器只能绑定现有的函数,故先到函数计算的控制台上在同region创建服务及函数。

创建函数计算服务

函数计算的控制台上创建服务及处理函数,我们继续使用华东2节点。

1.在华东2节点创建服务。

fc_service

2.创建函数依次选择:空白函数——不创建触发器。

fc_function
fc_function_2

  • 函数名称为:etl_test,选择 python2.7 环境,在线编辑代码
  • 函数入口为:etl_test.handler
  • 代码稍后编辑,点击下一步。

3.进行服务授权

由于函数计算需要将运行中的日志写入到日志服务中,同时,需要对表格存储的表进行读写,故需要对函数计算进行授权,为方便起见,我们先添加 AliyunOTSFullAccess 与 __AliyunLogFullAccess __权限,实际生产中,建议根据权限最小原则来添加权限。
fc_function_ram

4.点击授权完成,并创建函数。

5.修改函数代码。

创建好函数之后,点击对应的函数代码执行,编辑代码并保存,其中,INSTANCE_NAME(表格存储的实例名称)、REGION(使用的区域)需要根据情况进行修改:
fc3

使用示例代码如下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import cbor
import json
import tablestore as ots

INSTANCE_NAME = 'distribute-test'
REGION = 'cn-shanghai'
ENDPOINT = 'http://%s.%s.ots-internal.aliyuncs.com'%(INSTANCE_NAME, REGION)
RESULT_TABLENAME = 'result'


def _utf8(input):
    return str(bytearray(input, "utf-8"))

def get_attrbute_value(record, column):
    attrs = record[u'Columns']
    for x in attrs:
        if x[u'ColumnName'] == column:
            return x['Value']

def get_pk_value(record, column):
    attrs = record[u'PrimaryKey']
    for x in attrs:
        if x['ColumnName'] == column:
            return x['Value']

#由于已经授权了AliyunOTSFullAccess权限,此处获取的credentials具有访问表格存储的权限
def get_ots_client(context):
    creds = context.credentials
    client = ots.OTSClient(ENDPOINT, creds.accessKeyId, creds.accessKeySecret, INSTANCE_NAME, sts_token = creds.securityToken)
    return client

def save_to_ots(client, record):
    id = int(get_pk_value(record, 'id'))
    level = int(get_attrbute_value(record, 'level'))
    msg = get_attrbute_value(record, 'message')

    pk = [(_utf8('id'), id),]
    attr = [(_utf8('level'), level), (_utf8('message'), _utf8(msg)),]
    row = ots.Row(pk, attr)
    client.put_row(RESULT_TABLENAME, row)

def handler(event, context):
    records = cbor.loads(event)
    #records = json.loads(event)
    client = get_ots_client(context)
    for record in records['Records']:
        level = int(get_attrbute_value(record, 'level'))
        if level > 1:
            save_to_ots(client, record)
        else:
            print "Level <= 1, ignore."

对表格存储 Stream 数据的格式详情请参考Stream 数据处理

绑定触发器

1.回到表格存储的实例管理页面,点击表 source_data 后的 使用触发器 按钮,进入触发器绑定界面,点击使用已有函数计算, 选择刚创建的服务及函数,勾选 表格存储发送事件通知的权限, 进行确定。
trigger
ots_fc

2.绑定成功之后,能够看到如下的信息:

trigger3

运行验证

1.向 source_data 表中写入数据。

在 __source_data__ 的数据管理页面,点击插入数据,如图依次填入id、level及message信息。

ots_insert

2.在 result 表中查询清洗后的数据

点击 result 表的数据管理页面,会查询到刚写入到 source_data 中的数据。
当然,向 soure_data 写入level <=1的数据将不会同步到 result 表中

ots_get

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
《OpenACC并行程序设计:性能优化实践指南》一 3.10 使用Score-P和Vampir记录OpenACC运行时事件
本节书摘来自华章出版社《OpenACC并行程序设计:性能优化实践指南》一 书中的第3章,第3.10节,作者:[美] 罗布·法伯(Rob Farber),更多章节内容可以访问云栖社区“华章计算机”公众号查看。
1284 0
如何打破边界,接入各家云,使用阿里云的数据产品作游戏数据分析?
背景:       某某游戏公司的业务逻辑图,其实可以从中发现这基本上跟大多数的游戏公司一样,通过阿里云作数据分析,其他云作数据接入。原因在于阿里云的数据产品比较贴合使用习惯、全面丰富的良好产品体验。       但是,接入的通路问题,采集问题,甚至集成问题,都是困扰一大部分用户的。这篇文章,就是希望打开这方面的脑洞。细化到网络层面,
921 0
海量结构化数据存储技术揭秘:Tablestore表设计最佳实践
前言 表格存储Tablestore是阿里云自研的面向海量结构化数据存储的Serverless NoSQL多模型数据库。在处理海量数据时,方案设计非常重要,合理的设计才能够发挥出数据库的性能水平。本文主要介绍Tablestore在表设计方面的一些实践经验,供大家参考。
8357 0
C#读取Excel表格中数据并返回datatable
在软件开发的过程中,经常用到从excel表格中读取数据作为数据源,以下整理了一个有效的读取excel表格的方法。   DataTable GetDataTable(string tableName,string leftTopCel,string rightbutCel) ...
1111 0
Android 开发者如何使用函数式编程 (二)
本文讲的是Android 开发者如何使用函数式编程 (二),在上一篇帖子中,我们学习了纯粹性*、副作用和排序**。在本部分中,我们将讨论不变性和并发。
1052 0
函数式编程初探
函数式编程初探  (原文地址:http://blog.jobbole.com/17228/) 04月 11, 2012 at 9:50 am by 齐哲 Tags: Erlang, 函数式编程, 编程语言   诞生50多年之后,函数式编程(functional programming)开始获得越来越多的关注。   不仅最古老的函数式语言Li
1163 0
201604深圳云栖大会Workshop - 使用表格存储开发用户弹幕功能
使用表格存储开发用户弹幕功能 目标 使用表格存储(TableStore,原称OTS)实现视频直播的弹幕功能,通过TableStore存储弹幕,并在TableStore中检索最新弹幕实时显示到直播页面中。
4066 0
在mongodb服务器上存储和执行 js 函数 - 存储过程
虽然官方不推荐使用将业务逻辑存储在数据库中,并且提示在 mongodb 中执行 javascript 存在性能限制。 但实际上,将 javascript 函数存储在 mongodb 中执行,还是非常有必要的,更方便,许多场景下性能会更好(在执行大量查询处理时不需要将数据传回客户端引擎)。 在目前的版本中,我们任然可以将 javascript 函数存储在 mongodb 内置的一个特殊集合 db.system.js 中,然后这些变量就可以在任何 mongodb 的 javascript 上下文中调用,包括:db.eval()、$where子句、mapReduce。 自从 mongodb 3
173 0
表格存储 SQL 数据类型详解
本文主要介绍 Tablestore SQL中的数据类型与 MySQL 数据类型之间的映射关系。 ​ ## 背景介绍 ### Tablestore 数据类型 Tablestore 中的数据类型支持如下表所示,其中主键列支持的数据类型包括String、Integer和Binary,属性列支持的数据类型包括String、Integer、Double、Boolean和Binary。 - 主键列支持的数
84 0
+关注
18
文章
3
问答
来源圈子
更多
阿里云存储基于飞天盘古2.0分布式存储系统,产品包括对象存储OSS、块存储Block Storage、共享文件存储NAS、表格存储、日志存储与分析、归档存储及混合云存储等,充分满足用户数据存储和迁移上云需求,连续三年跻身全球云存储魔力象限四强。
+ 订阅
相关文档: 混合云容灾服务 混合云备份服务 日志服务
文章排行榜
最热
最新
相关电子书
更多
JS零基础入门教程(上册)
立即下载
性能优化方法论
立即下载
手把手学习日志服务SLS,云启实验室实战指南
立即下载