从新手到专家:UDF for Python 使用全指南

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 在此前发布的 TDengine 3.0.4.0 版本中,我们引入的一个重磅新功能是“使用 Python 语言编写用户自定义函数(UDF)”。这一突破性功能不仅为数据库操作提供了前所未有的灵活性,而且极大地降低了技术门槛——即使是编程初学者也能使用 Python 语言轻松地定制和操纵数据库。这些自定义函数可以像内置函数一样在 SQL 语句中灵活使用,实现个性化数据处理。本篇文章中将详细介绍如何有效利用这一功能,开启你的数据库自定义之旅。

在此前发布的 TDengine 3.0.4.0 版本中,我们引入的一个重磅新功能是“使用 Python 语言编写用户自定义函数(UDF)”。这一突破性功能不仅为数据库操作提供了前所未有的灵活性,而且极大地降低了技术门槛——即使是编程初学者也能使用 Python 语言轻松地定制和操纵数据库。这些自定义函数可以像内置函数一样在 SQL 语句中灵活使用,实现个性化数据处理。本篇文章中将详细介绍如何有效利用这一功能,开启你的数据库自定义之旅。


UDF 基础操作

创建 UDF

我们先来看下如何创建 Python 的 UDF 函数:

CREATE  [OR REPLACE] [AGGREGATE] FUNCTION function_name
                               as library_path  OUTPUTTYPE output_type  [BUFSIZE buffer_size] [LANGUAGE 'C‘ | ’Python']


选项说明:

1)   CREATE [OR REPLACE]:第一次全新创建使用 CREATE ,已经创建要更新代码可以加上 REPLACE

2)AGGREGATE:可选项,加上此选项表示创建的是聚合函数,不加是投影函数

3)function_name:自定义函数名称,创建完后在 SQL 语句中使用的名称,最大为 64 字节,超出部分会截断

4)OUTPUTTYPE:自定义函数输出数据类型,支持类型如下:

序号

支持数据类型

序号

支持数据类型

1

TIMESTAMP

8

BINARY

2

INT

9

SMALLINT

3

INT UNSIGNED

10

SMALLINT UNSIGNED

4

BIGINT

11

TINYINT

5

BIGINT UNSIGNED

12

TINYINT UNSIGNED

6

FLOAT

13

BOOL

7

DOUBLE

14

NCHAR

5)BUFSIZE:设置自定义函数可以使用的内存缓存大小 ,此选项仅在使用 AGGREGATE 时才有效,也就是说只有创建聚合函数才能使用此选项。最大为 256k,计数单位为 byte。分配的缓存是留给 Python 自定义函数使用的,缓存生命周期是从调用聚合函数 start 开始到调用 finish() 结束的整个聚合计算过程,所以可以当做全局变量来使用。

6)LANGUAGE 是创建自定义函数的语言,目前 TDengine 支持 Python 和 C 两种。

删除 UDF

DROP FUNCTION function_name; 是删除指定名称的 UDF 函数。function_name 参数的含义与 CREATE 指令中的 function_name 参数一致,即要删除函数的名称。

查看 UDF

使用 SHOW FUNCTIONS;  可以简单查看下 UDF 创建的函数名。使用 select * from information_schema.ins_functions; 可以详细查看 UDF 创建时候的各个参数,加 \G 可以以竖列查看到完整内容,如:select * from information_schema.ins_functions\G;

安装环境

接下来带你安装 Python UDF 的开发环境,安装过程比较简单。

首先,准备安装环境

1) CMAKE:最低版本要求 3.0.2

2) GCC:因为需要在本机编译支持 Python UDF 函数的 so 文件,所以需要安装 GCC 环境,GCC 版本最低要求 7.5 以上。

3)Python:要求 3.7 及以上版本

然后开始安装插件 Python3 -m pip install taospyudf,安装成功后,执行 ldconfig。这样,开发环境就已经安装就绪了。

编写 UDF 函数

按上面步骤环境准备好后,我们便可以开始编写自己的 UDF 函数了。UDF 函数分两大类,一类是投影函数,另一类是聚合函数。这两类函数创建及编写都是完全不相同的,所以这里分开介绍。


在介绍函数前,我们先来了解下自定义函数的调用流程,如下图:

首先 UDF 处理框架是以数据块为处理单位,每次调用到自定义函数中时输入数据都是一个数据块,通过调用数据块对象的 data(row,col) 方法输入行号和列号可以取到数据块中任何一个位置上的数据。这样做的目的是减少 C 框架与 Python 语言之间的调用次数,提升性能。


返回数据时,投影函数原来有多少行,就需要返回相同的行数,聚合函数只需要返回一行即可。下面详细进行介绍:

投影函数

投影函数就像它的名字一样,像是一个投影,输出数据的行数与输入数据的行数需保持相同,如果不相同会报错。下面用一个完整的例子来说明,我们来实现一个 TDengine 内置的 concat 字符串连接函数,如下:

1) 编写函数

函数说明:

  • init         - 在 UDF 模块初始化的时候调用一次,可以做一些初始化的工作
  • destroy - 在 UDF 模块退出的时候调用一次,可以做一些退出的工作
  • process - 每个数据块到来后要调用的数据处理函数,调用 shape() 方法返回数据块行及列数
  • nrows 返回数据块拥有的行数
  • ncols  返回数据块拥有的列数,列数实际是 concat() 函数的参数个数

返回值:

  • 投影函数的返回对象必须是一个列表,非列表对象会直接报错
  • 列表对象中元素的个数应该与块行数 nrows 相同,否则也会报错


2) 创建函数

编写好自定义函数后,我们就可以直接在 taos - shell 中创建了,输入如下:create function py_concat as '/home/py_concat.py' outputtype varchar(256) language 'Python';

创建函数名 py_concat ,创建 Python 文件位置在 /home/py_concat.py,输出数据类型为 varchar,长度 256 字节,语言为 Python。

3)执行函数  

上步操作成功后,即可像内置函数一样在 SQL 中任意使用,如 taos-shell 中输入 select sf_concat(factory_name,room_name), concat(factory_name,room_name) from devices;

grade_name class_name 均为 varchar 数据类型

把工厂和车间名连接成一个字符串返回,可以对比 UDF 输出和内置函数输出结果,预期是相同的。

聚合函数

聚合函数是把数据进行聚合计算,最后只输出一行聚合结果即可。这里我们用一个大家最熟悉的统计个数 count 实例来讲解:

1) 编写函数

实现原理:

  1. 计数的累加值在 start 初始化回调的时候把 0 值保存进 buf 中做为初始值
  2. 在 reduct 函数中,如果不为 None 就不断累加,在 reduce 返回值会被存储在 buf 中,下次回调 reduce 时再作为参数 buf 传过来,这样可以反复使用 buf
  3. 最后在 finish 函数中,buf 也会通过参数传进来,把自己前面存储在 buf 中的值取出来,作为返回值返回,即为最终 count 结果。


函数说明:

  • init  - 在 UDF 模块初始化的时候调用一次,可以做一些初始化的工作
  • destroy - 在 UDF 模块退出的时候调用一次,可以做一些退出的工作
  • start. - 开始进行聚合函数计算时调用一次,主要完成对聚合函数使用的缓存进行初始化
  • reduce - 每个数据块到来后要调用的数据处理函数, 调用 shape() 方法返回数据块行及列数
  • rows 返回数据块拥有的行数
  • cols  返回数据块拥有的列数,列数是传入 UDF 函数的参数个数

                此函数会被循环调用

  • finish -  计算最终聚合结果,此函数只在最后调用一次,在此函数中返回最终结果

返回值:

返回的数据类型为创建 UDF 函数时指定的 OUTPUTTYPE 数据类型,返回类型不正确会报错,允许 None 对象返回。


2) 创建函数

编写好自定义函数后,我们就可以直接在 taos - shell 中创建了,输入如下:create aggregate function af_count as ''/home/af_count.py'' outputtype bigint bufsize 4096 language 'Python';


创建函数名 af_count ,创建 Python 文件位置在 /home/af_count.py,输出数据类型为 bigint,语言为 Python。

3)执行函数  

上步操作成功后,即可像内置函数一样在 SQL 中任意使用,如 taos-shell 中输入select af_count(col1) from devices; 。这样你就拥有了自己的 count 统计函数,想要统计什么,完全由你自己来决定。

数据类型映射关系

Python 语言与 C 语言交互,最重要的就是数据类型如何转化的问题。也是 Python UDF 函数编写最容易出错的地方,所以这里要重点介绍下:

首先我们看下映射关系表:

TDengine 数据类型

映射为 Python 对象

TINYINT/TINYINT UNSIGNED/ SMALLINT/SMALLINT UNSIGNED/ INT/INT UNSIGNED/ BIGINT/BIGINT UNSIGNED

int

FLOAT/DOUBLE

float

BOOL

bool

BINARY/NCHAR/VARCHAR

bytes

TIMESTAMP

int

JSON

Not supported

  1)int 类型在 Python3 中没有大小限制

  2)binary / nchar / varchar 类型都映射为了 Python 的 bytes 对象,所以在使用的时候要加以区别——varchar 数据类型是 binary 的别名。


因为 binary 和 nchar 都映射为了相同的  bytes 数据类型,所以自定义函数的开发者自己约定输入自定义函数参数的类型,不同数据类型需要不同的转化方式:

  • binary 类型转 str 对象, 使用 bytes.decode('utf-8')
  • nchar 类型转 str 对象,  使用 bytes.decode('utf_32_le')


当把 str 对象内容输出为 OUTPUTTYPE 指定的不同类型时,也需要进行区分:

  • str 对象当 binary 类型返回,str.encode('utf-8')    
  • str 对象当 nchar  类型返回,str.encode('utf_32_le')

开发技巧

更新函数代码

由于我们在开发 UDF 函数时需要频繁修改代码再调试运行,因此需要知道如何让新修改的代码生效。

从上面流程图中我们可以看到,在创建 UDF 函数时指定的 .py 文件路径,只在创建的那一时刻使用,把文件内容读取出来存放到 mnode 中,便于在集群的任何节点上都可以使用。所以 .py 文件创建完 UDF 后就不再使用了,我们更新了 .py 文件中的代码后,需要再把 .py文件中的代码更新到 MNODE 中才可以起作用。


目前更新 Python 自定义函数的代码提供了直接更新的命令:

增加 OR REPLACE  即可直接把 library_path 指向的 Python 自定义函数的内容更新到 MNODE 中,再次调用就会使用更新后的代码了。

输出日志

TDengine 的 Python UDF 不支持 Python 代码的调试,但支持了日志输出,如大家常用的 logging 库,都可以在 UDF 函数中使用,建议日志输出到文件中查看。print 函数打印的信息是看不到的,所以就不要用此函数输出信息了。如:

抛出异常

如果开发者在 UDF 函数在检测到异常数据后,需要终止查询,可以通过 raise 方式抛出异常,同时要保证自己 raise 的异常不会被自己捕获,因为自己捕获了框架就捕不到了,所以要位于最上层抛出异常。


抛出的异常会被 UDF 框架捕获到并终止当前查询,同时在应用端调用的查询接口中会相应的返回专属的 UDF 函数抛出异常错误码:0x8000290D,应用程序可根据此错误码做应用层的相应处理。

抛出的错误可以在 taos.cfg 中配置的日志文件的目录下找到,异常日志输出到了taosudfpy.log 中了,如下是一实例输出的异常日志:

异常日志的内容及发生异常的文件名及行号都在此日志文件中。

查看 UDF 框架生成的日志

UDF 框架返回错误描述时只按大数返回,所以详细的错误原因还需要开发者通过日志来查看。相关日志主要有两个,都在 TDengine 日志目录下存放,这里分别介绍下:

1)taospyudf.log(UDF for Python 的日志文件)

 日志文件中记录的是进程  udfd 加载 Python UDF 函数,执行 UDF 函数过程中发生的异常、错误及调用过程等的记录。我们开发 pythyon udf 函数主要看这个日志文件就可以了。

2)udfdlog.0

 这个是 udfd 进程的框架日志,里面包括调用 C 及 Python 等多种语言的UDF 函数在框架中出的错的日志,都会记录在此文件中,所以这个日志文件如果整个大框架出问题了,日志会记录在这里,一般情况下不用看。

常见错误码说明

以下错误码及错误描述是在开发 UDF 函数时经常会遇到的,这里做下说明:

序号

错误码

错误描述

说明

1

0x80002901

udf is stopping

udf 框架下在停止进程中,对外不能提供服务

2

0x80002902

udf pipe read error

taosd 和 udfd 进程间使用管道通信时读数据失败

3

0x80002903

udf pipe connect error

taosd 和 udfd 进程间使用管道通信时连接失败

4

0x80002904

udf no pipe

taosd 和 udfd 进程间通信时创建管道失败

5

0x80002905

udf load failure

加载 UDF 模块失败

6

0x80002906

udf invalid state

UDF 框架的状态不正确,如正在初始化等,对外不能提供服务

7

0x80002907

udf invalid function input

指输入 udf 函数的参数无效错误

8

0x80002908

udf no function handle

内部错误,没有函数句柄

9

0x80002909

udf invalid bufsize

使用聚合函数时分配的 BUFSIZE 不足,使用超限了

10

0x8000290A

udf invalid output type

实际返回的数据类型与创建UDF 函数时指定的 OUTPUTTYPE 不匹配,可通过查看日志了解详细

11

0x8000290B

udf program language not supported

指定的开发语言还未支持

12

0x8000290C

udf function execution failure

执行 UDF 函数时出错,这是一个大类,里面可能包括各种出错原因,可通过查看日志了解详细

13

0x8000290D

udf raise exception

UDF 函数内部发生异常或主动抛出异常,查询终止

其中 10 和 12 是开发 Python UDF 最容易遇到的错误,详细的原因需要查看日志 taospyudf.log 。

参考实例

在 TDengine 的开源仓库中有几个 UDF for Python 的测试实例,可供大家参考:

https://github.com/taosdata/TDengine/tree/3.0/tests/system-test/0-others/udfpy

实例被如下 CI 测试用例使用:

https://github.com/taosdata/TDengine/blob/3.0/tests/system-test/0-others/udfpy_main.py

运行测试用例:

    1)正确安装 Python3

    2)  拉 TDengine 社区版代码下来

    3)进入 TDengine/tests/system-test/ 目录

    4)运行 Python3 test.py -f others/udfpy_main.py

注意事项

UDF 投影函数最容易在返回行数上出错

UDF 投影函数要求返回行数与输入行数相同,在处理时如果程序稍复杂一些就容易犯此错误,开发过程中一定要小心,不要漏行了。

OUTPUTTYPE 输出类型不匹配错

UDF 函数要求最终返回的数据类型一定要和创建函数时指定的 OUTPUTTYPE 类型一致,如果不一致就会报错。所以这里也是非常容易出错的地方。

投影函数的返回值在 process 函数中,必须要求是 list 对象,list对象中放的一定都是 OUTPUTTYP 映射匹配的类型对象。

聚合函数的最后返回值在 finish 函数中,聚合函数中返回一个值,这个值一定是创建函数时指定的 OUTPUTTYPE 映射匹配的 Python 类型值。

一直报无法加载 libtaospyudf.so 文件

按照上面安装步骤已经安装了 taospyudf 插件,但还是报 libtaospyudf.so 无法加载的错,如下:

这个库一般会安装在  /usr/local/lib/libtaospyudf.so 目录下及 Python 的插件目录下,lib 目录是正式使用的,插件目录是安装时产生的,或者使用  find / -name 'libtaospyudf.so' 查询在本地的具体位置。

如果文件没有或不存在,那可能插件安装有问题,需要重新卸载安装插件如果文件存在,还是报这个错,那使用  ldd 查看 so 文件依赖的Python 库文件是否能够找到,如下是典型的 Python 安装的有问题:

如上错误,重新正确安装 Python3.9 可解决此问题。

结语

目前我们推出的是 UDF for Python 的初版 1.0,该版本的某些功能仍在持续完善中。我们诚邀大家积极使用这一功能,定义完全属于自己的数据库。同时,我们也希望大家在使用过程中多提宝贵意见,以便我们共同改进并增强这一功能,让这个能解决特殊需求的实用功能越来越便捷、强大。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
3月前
|
数据处理 Apache 数据库
将 Python UDF 部署到 Apache IoTDB 的详细步骤与注意事项
【10月更文挑战第21天】将 Python UDF 部署到 Apache IoTDB 中需要一系列的步骤和注意事项。通过仔细的准备、正确的部署和测试,你可以成功地将自定义的 Python UDF 应用到 Apache IoTDB 中,为数据处理和分析提供更灵活和强大的支持。在实际操作过程中,要根据具体情况进行调整和优化,以确保实现最佳的效果。还可以结合具体的代码示例和实际部署经验,进一步深入了解和掌握这一过程。
34 2
|
3月前
|
数据采集 设计模式 算法
拥抱变化:从Python新手到专家的旅程
【10月更文挑战第5天】在编程的世界里,Python以其简洁明了的语法和强大的功能库成为了无数开发者的首选语言。本文将带你走进一个Python新手如何一步步成长为专家的故事,探索学习过程中的困惑、挑战以及最终的成就,并通过代码示例揭示学习之旅的关键时刻。
43 2
|
4月前
|
机器学习/深度学习 测试技术 数据处理
KAN专家混合模型在高性能时间序列预测中的应用:RMoK模型架构探析与Python代码实验
Kolmogorov-Arnold网络(KAN)作为一种多层感知器(MLP)的替代方案,为深度学习领域带来新可能。尽管初期测试显示KAN在时间序列预测中的表现不佳,近期提出的可逆KAN混合模型(RMoK)显著提升了其性能。RMoK结合了Wav-KAN、JacobiKAN和TaylorKAN等多种专家层,通过门控网络动态选择最适合的专家层,从而灵活应对各种时间序列模式。实验结果显示,RMoK在多个数据集上表现出色,尤其是在长期预测任务中。未来研究将进一步探索RMoK在不同领域的应用潜力及其与其他先进技术的结合。
114 4
|
4月前
|
机器学习/深度学习 数据挖掘 TensorFlow
从数据小白到AI专家:Python数据分析与TensorFlow/PyTorch深度学习的蜕变之路
【9月更文挑战第10天】从数据新手成长为AI专家,需先掌握Python基础语法,并学会使用NumPy和Pandas进行数据分析。接着,通过Matplotlib和Seaborn实现数据可视化,最后利用TensorFlow或PyTorch探索深度学习。这一过程涉及从数据清洗、可视化到构建神经网络的多个步骤,每一步都需不断实践与学习。借助Python的强大功能及各类库的支持,你能逐步解锁数据的深层价值。
79 0
|
5月前
|
监控 Java Serverless
Serverless 应用的监控与调试问题之PyFlink对于Python UDF的性能如何提升
Serverless 应用的监控与调试问题之PyFlink对于Python UDF的性能如何提升
|
6月前
|
资源调度 分布式计算 DataWorks
DataWorks产品使用合集之如何使用Python UDF(User-Defined Function)来引用第三方模块
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
6月前
|
机器学习/深度学习 分布式计算 大数据
MaxCompute产品使用合集之是否可以将5个资源包统一写到同一个python UDF脚本
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
7月前
|
分布式计算 大数据 调度
MaxCompute产品使用问题之为什么用python写的udf函数跑起来比本地还要慢
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
7月前
|
SQL 分布式计算 大数据
MaxCompute产品使用问题之建了一个python 的 UDF脚本,生成函数引用总是说类不存在,是什么导致的
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
29天前
|
人工智能 数据可视化 数据挖掘
探索Python编程:从基础到高级
在这篇文章中,我们将一起深入探索Python编程的世界。无论你是初学者还是有经验的程序员,都可以从中获得新的知识和技能。我们将从Python的基础语法开始,然后逐步过渡到更复杂的主题,如面向对象编程、异常处理和模块使用。最后,我们将通过一些实际的代码示例,来展示如何应用这些知识解决实际问题。让我们一起开启Python编程的旅程吧!