Datahub Python SDK入门手册

简介: 用户编写Datahub应用程序最简单直接的方式就是基于Datahub SDK进行,目前Datahub官方提供的SDK包括C++ SDK和Java SDK,随着越来越多的Pythoner使用Datahub,Python版本Datahub SDK正式release

前言

DataHub是 MaxCompute 提供的流式数据处理(Streaming Data)服务,它提供流式数据的发布 (Publish)和订阅 (Subscribe)的功能,让您可以轻松构建基于流式数据的分析和应用。DataHub 可以对各种移动设备,应用软件,网站服务,传感器等产生的大量流式数据进行持续不断的采集,存储和处理。用户可以编写应用程序或者使用流计算引擎来处理写入到 DataHub 的流式数据比如实时web访问日志、应用日志、各种事件等,并产出各种实时的数据处理结果比如实时图表、报警信息、实时统计等。用户编写Datahub应用程序最简单直接的方式就是基于Datahub SDK进行,目前Datahub官方提供的SDK包括C++ SDK和Java SDK,随着越来越多的Pythoner使用Datahub,Python版本Datahub SDK需求量也日益上升,这里就告诉各位Pythoner们一个好消息,Datahub官方Python SDK Beta正式Release(Github地址),使用非常简单,这里做个入门介绍,大家如有任何疑问随时在Github上提问留言。

安装

快速安装

$ sudo pip install pydatahub

源码安装

$ git clone https://github.com/aliyun/aliyun-datahub-sdk-python.git
$ cd aliyun-datahub-sdk-python
$ sudo python setup.py install

安装验证

$ python -c "from datahub import DataHub"

如果上述命令执行成功,恭喜你安装Datahub Python版本SDK成功!

基本概念

详见: https://help.aliyun.com/document_detail/47440.html?spm=5176.product27797.3.2.VGxgya

准备工作

  • 访问DataHub服务需要使用阿里云认证账号,需要提供阿里云accessId及accessKey。 同时需要提供访问的服务地址。
  • 创建Project

  • 初始化Datahub
import sys
import traceback

from datahub import DataHub
from datahub.utils import Configer
from datahub.models import Topic, RecordType, FieldType, RecordSchema, BlobRecord, TupleRecord, CursorType
from datahub.errors import DatahubException, ObjectAlreadyExistException

access_id = ***your access id***
access_key = ***your access key***
endpoint = ***your datahub server endpoint***
dh = DataHub(access_id, access_key, endpoint)

Topic操作

Tuple Topic

  • Tuple类型Topic写入的数据是有格式的,需要指定Record Schema,目前支持以下几种数据类型:
类型 含义 值域
Bigint 8字节有符号整型。请不要使用整型的最小值 (-9223372036854775808),这是系统保留值。 -9223372036854775807 ~ 9223372036854775807
String 字符串,只支持UTF-8编码。 单个String列最长允许1MB。
Boolean 布尔型。 可以表示为True/False,true/false, 0/1
Double 8字节双精度浮点数。 -1.0 10308 ~ 1.0 10308
TimeStamp 时间戳类型 表示到微秒的时间戳类型
  • 创建示例
topic = Topic(name=topic_name)
topic.project_name = project_name
topic.shard_count = 3
topic.life_cycle = 7
topic.record_type = RecordType.TUPLE
topic.record_schema = RecordSchema.from_lists(['bigint_field', 'string_field', 'double_field', 'bool_field', 'time_field'], [Fie
ldType.BIGINT, FieldType.STRING, FieldType.DOUBLE, FieldType.BOOLEAN, FieldType.TIMESTAMP])

try:
    dh.create_topic(topic)
    print "create topic success!"
    print "=======================================\n\n"
except ObjectAlreadyExistException, e:
    print "topic already exist!"
    print "=======================================\n\n"
except Exception, e:
    print traceback.format_exc()
    sys.exit(-1)

Blob Topic

  • Blob类型Topic支持写入一块二进制数据作为一个Record,数据将会以BASE64编码传输。
topic = Topic(name=topic_name)
topic.project_name = project_name
topic.shard_count = 3
topic.life_cycle = 7
topic.record_type = RecordType.BLOB

try:
    dh.create_topic(topic)
    print "create topic success!"
    print "=======================================\n\n"
except ObjectAlreadyExistException, e:
    print "topic already exist!"
    print "=======================================\n\n"
except Exception, e:
    print traceback.format_exc()
    sys.exit(-1)

数据发布/订阅

获取Shard列表

  • list_shards接口获取topic下的所有shard
shards = dh.list_shards(project_name, topic_name)

返回结果是一个List对象,每个元素是一个shard,可以获取shard_id,state状态,begin_hash_key,end_hash_key等信息

发布数据

  • put_records接口向一个topic发布数据
failed_indexs = dh.put_records(project_name, topic_name, records)

其中传入参数records是一个List对象,每个元素为一个record,但是必须为相同类型的record,即Tuple类型或者Blob类型,返回结果为写入失败记录的数组下标

  • 写入Tuple类型Record示例
try:
    # block等待所有shard状态ready
    dh.wait_shards_ready(project_name, topic_name)
    print "shards all ready!!!"
    print "=======================================\n\n"

    topic = dh.get_topic(topic_name, project_name)
    print "get topic suc! topic=%s" % str(topic)
    if topic.record_type != RecordType.TUPLE:
        print "topic type illegal!"
        sys.exit(-1)
    print "=======================================\n\n"

    shards = dh.list_shards(project_name, topic_name)
    for shard in shards:
        print shard
    print "=======================================\n\n"

    records = []

    record0 = TupleRecord(schema=topic.record_schema, values=[1, 'yc1', 10.01, True, 1455869335000000])
    record0.shard_id = shards[0].shard_id
    record0.put_attribute('AK', '47')
    records.append(record0)

    record1 = TupleRecord(schema=topic.record_schema)
    record1['bigint_field'] = 2
    record1['string_field'] = 'yc2'
    record1['double_field'] = 10.02
    record1['bool_field'] = False
    record1['time_field'] = 1455869335000011
    record1.shard_id = shards[1].shard_id
    records.append(record1)

    record2 = TupleRecord(schema=topic.record_schema)
    record2['bigint_field'] = 3
    record2['string_field'] = 'yc3'
    record2['double_field'] = 10.03
    record2['bool_field'] = False
    record2['time_field'] = 1455869335000013
    record2.shard_id = shards[2].shard_id
    records.append(record2)

    failed_indexs = dh.put_records(project_name, topic_name, records)
    print "put tuple %d records, failed list: %s" %(len(records), failed_indexs)
    # failed_indexs如果非空最好对failed record再进行重试
    print "=======================================\n\n"
except DatahubException, e:
    print traceback.format_exc()
    sys.exit(-1)
else:
    sys.exit(-1)

获取cursor

  • 获取Cursor,可以通过三种方式获取:OLDEST, LATEST, SYSTEM_TIME

    • OLDEST: 表示获取的cursor指向当前有效数据中时间最久远的record
    • LATEST: 表示获取的cursor指向当前最新的record
    • SYSTEM_TIME: 表示获取的cursor指向该时间之后接收到的第一条record
cursor = dh.get_cursor(project_name, topic_name, CursorType.OLDEST, shard_id)

通过get_cursor接口获取用于读取指定位置之后数据的cursor

订阅数据

  • 从指定shard读取数据,需要指定从哪个Cursor开始读,并指定读取的上限数据条数,如果从Cursor到shard结尾少于Limit条数的数据,则返回实际的条数的数据。
dh.get_records(topic, shard_id, cursor, 10)
  • 消费Tuple类型Record示例
try:
    # block等待所有shard状态ready
    dh.wait_shards_ready(project_name, topic_name)
    print "shards all ready!!!"
    print "=======================================\n\n"

    topic = dh.get_topic(topic_name, project_name)
    print "get topic suc! topic=%s" % str(topic)
    if topic.record_type != RecordType.TUPLE:
        print "topic type illegal!"
        sys.exit(-1)
    print "=======================================\n\n"

    cursor = dh.get_cursor(project_name, topic_name, CursorType.OLDEST, '0')
    while True:
        (record_list, record_num, next_cursor) = dh.get_records(topic, '0', cursor, 10)
        for record in record_list:
            print record
        if 0 == record_num:
            time.sleep(1)
        cursor = next_cursor

except DatahubException, e:
    print traceback.format_exc()
    sys.exit(-1)
else:
    sys.exit(-1)

结尾

目录
相关文章
|
2天前
|
数据采集 机器学习/深度学习 人工智能
Python编程入门:从基础到实战
【10月更文挑战第24天】本文将带你进入Python的世界,从最基础的语法开始,逐步深入到实际的项目应用。我们将一起探索Python的强大功能和灵活性,无论你是编程新手还是有经验的开发者,都能在这篇文章中找到有价值的内容。让我们一起开启Python的奇妙之旅吧!
|
4天前
|
数据采集 存储 数据库
Python中实现简单爬虫的入门指南
【10月更文挑战第22天】本文将带你进入Python爬虫的世界,从基础概念到实战操作,一步步指导你如何使用Python编写一个简单的网络爬虫。我们将不展示代码示例,而是通过详细的步骤描述和逻辑讲解,帮助你理解爬虫的工作原理和开发过程。无论你是编程新手还是有一定经验的开发者,这篇文章都将为你打开一扇通往数据收集新世界的大门。
|
2天前
|
测试技术 开发者 Python
探索Python中的装饰器:从入门到实践
【10月更文挑战第24天】 在Python的世界里,装饰器是一个既神秘又强大的工具。它们就像是程序的“隐形斗篷”,能在不改变原有代码结构的情况下,增加新的功能。本篇文章将带你走进装饰器的世界,从基础概念出发,通过实际例子,逐步深入到装饰器的高级应用,让你的代码更加优雅和高效。无论你是初学者还是有一定经验的开发者,这篇文章都将为你打开一扇通往高效编程的大门。
|
4天前
|
存储 人工智能 数据挖掘
Python编程入门:构建你的第一个程序
【10月更文挑战第22天】编程,这个听起来高深莫测的词汇,实际上就像搭积木一样简单有趣。本文将带你走进Python的世界,用最浅显的语言和实例,让你轻松掌握编写第一个Python程序的方法。无论你是编程新手还是希望了解Python的爱好者,这篇文章都将是你的理想起点。让我们一起开始这段奇妙的编程之旅吧!
13 3
|
3天前
|
机器学习/深度学习 人工智能 算法
机器学习基础:使用Python和Scikit-learn入门
机器学习基础:使用Python和Scikit-learn入门
11 1
|
5天前
|
存储 程序员 开发者
Python编程入门:从零开始掌握基础语法
【10月更文挑战第21天】本文将带你走进Python的世界,通过浅显易懂的语言和实例,让你快速了解并掌握Python的基础语法。无论你是编程新手还是想学习一门新的编程语言,这篇文章都将是你的不二之选。我们将一起探索变量、数据类型、运算符、控制结构、函数等基本概念,并通过实际代码示例加深理解。准备好了吗?让我们开始吧!
|
5天前
|
数据采集 机器学习/深度学习 数据可视化
深入浅出:用Python进行数据分析的入门指南
【10月更文挑战第21天】 在信息爆炸的时代,掌握数据分析技能就像拥有一把钥匙,能够解锁隐藏在庞大数据集背后的秘密。本文将引导你通过Python语言,学习如何从零开始进行数据分析。我们将一起探索数据的收集、处理、分析和可视化等步骤,并最终学会如何利用数据讲故事。无论你是编程新手还是希望提升数据分析能力的专业人士,这篇文章都将为你提供一条清晰的学习路径。
|
NoSQL JavaScript 前端开发
程序员必备手册 | Git、Vim、GDB、Shell、Python
程序员必备手册 | Git、Vim、GDB、Shell、Python
170 1
程序员必备手册 | Git、Vim、GDB、Shell、Python
|
6天前
|
安全 数据处理 开发者
Python中的多线程编程:从入门到精通
本文将深入探讨Python中的多线程编程,包括其基本原理、应用场景、实现方法以及常见问题和解决方案。通过本文的学习,读者将对Python多线程编程有一个全面的认识,能够在实际项目中灵活运用。
|
1天前
|
设计模式 开发者 Python
Python编程中的设计模式:工厂方法模式###
本文深入浅出地探讨了Python编程中的一种重要设计模式——工厂方法模式。通过具体案例和代码示例,我们将了解工厂方法模式的定义、应用场景、实现步骤以及其优势与潜在缺点。无论你是Python新手还是有经验的开发者,都能从本文中获得关于如何在实际项目中有效应用工厂方法模式的启发。 ###