Flink的通用算法平台Alink

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Alink是基于Flink的通用算法平台,由阿里巴巴计算平台PAI团队研发

Alink

Alink是基于Flink的通用算法平台,由阿里巴巴计算平台PAI团队研发,欢迎大家加入Alink开源用户钉钉群进行交流。

image.png

开源算法列表

image.png

PyAlink 使用截图

image.png

快速开始

PyAlink 使用介绍

使用前准备:


包名和版本说明:

  • PyAlink 根据 Alink 所支持的 Flink 版本提供不同的 Python 包: 其中,pyalink 包对应为 Alink 所支持的最新 Flink 版本,当前为 1.13,而 pyalink-flink-*** 为旧版本的 Flink 版本,当前提供 pyalink-flink-1.12, pyalink-flink-1.11, pyalink-flink-1.10pyalink-flink-1.9
  • Python 包的版本号与 Alink 的版本号一致,例如1.4.0

####安装步骤:

  1. 确保使用环境中有Python3,版本限于 3.6,3.7 和 3.8。
  2. 确保使用环境中安装有 Java 8。
  3. 使用 pip 命令进行安装: pip install pyalinkpip install pyalink-flink-1.12pip install pyalink-flink-1.11pip install pyalink-flink-1.10 或者 pip install pyalink-flink-1.9

安装注意事项:

  1. pyalinkpyalink-flink-*** 不能同时安装,也不能与旧版本同时安装。 如果之前安装过 pyalink 或者 pyalink-flink-***,请使用pip uninstall pyalink 或者 pip uninstall pyalink-flink-*** 卸载之前的版本。
  2. 出现pip安装缓慢或不成功的情况,可以参考这篇文章修改pip源,或者直接使用下面的链接下载 whl 包,然后使用pip安装:
  • Flink 1.13:链接 (MD5: 94751cc1e31f174b446142455293fb30)
  • Flink 1.12:链接 (MD5: 4cbafe08b24b3467d9096f8a8a07321f)
  • Flink 1.11:链接 (MD5: 1810f6769bd2d2d77358f4e51948a937)
  • Flink 1.10:链接 (MD5: 79ceea2788ad2159ae23ad3e9e83a261)
  • Flink 1.9: 链接 (MD5: fc143df37d15d6bd3b0733fef8969ef1)
  1. 如果有多个版本的 Python,可能需要使用特定版本的 pip,比如 pip3;如果使用 Anaconda,则需要在 Anaconda 命令行中进行安装。

下载安装文件系统或 Catalog 依赖 jar 包:

安装 PyAlink 之后,可以直接运行 download_pyalink_dep_jars 命令,下载支持文件系统功能所需要的 jar 包。 (如果提示找不到这个命令,可以尝试直接运行脚本: python3 -c 'from pyalink.alink.download_pyalink_dep_jars import main;main()'。)

运行这个命令后,将提问是否安装某种文件系统对应的 jar 包,并选择合适的版本。 当前支持的文件系统包括:

  • OSS:3.4.1
  • Hadoop:2.8.3
  • Hive:2.3.4
  • MySQL: 5.1.27
  • Derby: 10.6.1.0
  • SQLite: 3.19.3
  • S3-hadoop: 1.11.788
  • S3-presto: 1.11.788
  • odps: 0.36.4-public

这些 jar 包将被下载到 PyAlink 安装路径的 lib/plugins 目录下,所以要求运行命令时有 PyAlink 安装目录的权限。

运行命令时,也可以增加参数:download_pyalink_dep_jars -d,将自动下载所有的 jar 包。

开始使用:


可以通过 Jupyter Notebook 来开始使用 PyAlink,能获得更好的使用体验。

使用步骤:

  1. 在命令行中启动Jupyter:jupyter notebook,并新建 Python 3 的 Notebook 。
  2. 导入 pyalink 包:from pyalink.alink import *
  3. 使用方法创建本地运行环境: useLocalEnv(parallism, flinkHome=None, config=None)。 其中,参数 parallism 表示执行所使用的并行度;flinkHome 为 flink 的完整路径,一般情况不需要设置;config为Flink所接受的配置参数。运行后出现如下所示的输出,表示初始化运行环境成功:
JVM listening on ***
  1. 开始编写 PyAlink 代码,例如:
source = CsvSourceBatchOp()\
    .setSchemaStr("sepal_length double, sepal_width double, petal_length double, petal_width double, category string")\
    .setFilePath("https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/iris.csv")
res = source.select(["sepal_length", "sepal_width"])
df = res.collectToDataframe()
print(df)

编写代码:


在 PyAlink 中,算法组件提供的接口基本与 Java API 一致,即通过默认构造方法创建一个算法组件,然后通过 setXXX 设置参数,通过 link/linkTo/linkFrom 与其他组件相连。 这里利用 Jupyter Notebook 的自动补全机制可以提供书写便利。

对于批式作业,可以通过批式组件的 print/collectToDataframe/collectToDataframes 等方法或者 BatchOperator.execute() 来触发执行;对于流式作业,则通过 StreamOperator.execute() 来启动作业。

更多用法:


Java 接口使用介绍


示例代码

String URL = "https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/iris.csv";
String SCHEMA_STR = "sepal_length double, sepal_width double, petal_length double, petal_width double, category string";
BatchOperator data = new CsvSourceBatchOp()
        .setFilePath(URL)
        .setSchemaStr(SCHEMA_STR);
VectorAssembler va = new VectorAssembler()
        .setSelectedCols(new String[]{"sepal_length", "sepal_width", "petal_length", "petal_width"})
        .setOutputCol("features");
KMeans kMeans = new KMeans().setVectorCol("features").setK(3)
        .setPredictionCol("prediction_result")
        .setPredictionDetailCol("prediction_detail")
        .setReservedCols("category")
        .setMaxIter(100);
Pipeline pipeline = new Pipeline().add(va).add(kMeans);
pipeline.fit(data).transform(data).print();

Flink-1.13 的 Maven 依赖

<dependency>
    <groupId>com.alibaba.alink</groupId>
    <artifactId>alink_core_flink-1.13_2.11</artifactId>
    <version>1.4.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.11</artifactId>
    <version>1.13.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.11</artifactId>
    <version>1.13.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>1.13.0</version>
</dependency>

快速开始在集群上运行Alink算法

  1. 准备Flink集群
  wget https://archive.apache.org/dist/flink/flink-1.13.0/flink-1.13.0-bin-scala_2.11.tgz
  tar -xf flink-1.13.0-bin-scala_2.11.tgz && cd flink-1.13.0
  ./bin/start-cluster.sh
  1. 准备Alink算法包
  git clone https://github.com/alibaba/Alink.git
  # add <scope>provided</scope> in pom.xml of alink_examples.
  cd Alink && mvn -Dmaven.test.skip=true clean package shade:shade
  1. 运行Java示例
  ./bin/flink run -p 1 -c com.alibaba.alink.ALSExample [path_to_Alink]/examples/target/alink_examples-1.4-SNAPSHOT.jar
  # ./bin/flink run -p 1 -c com.alibaba.alink.GBDTExample [path_to_Alink]/examples/target/alink_examples-1.4-SNAPSHOT.jar
  # ./bin/flink run -p 1 -c com.alibaba.alink.KMeansExample [path_to_Alink]/examples/target/alink_examples-1.4-SNAPSHOT.jar

部署


集群部署

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
机器学习/深度学习 算法 搜索推荐
外卖平台推荐算法的优化与实践
外卖平台推荐算法的优化与实践
|
20天前
|
机器学习/深度学习 人工智能 自然语言处理
【CVPR2024】阿里云人工智能平台PAI图像编辑算法论文入选CVPR2024
近期,阿里云人工智能平台PAI发表的图像编辑算法论文在CVPR-2024上正式亮相发表。论文成果是阿里云与华南理工大学贾奎教授领衔的团队共同研发。此次入选标志着阿里云人工智能平台PAI自主研发的图像编辑算法达到了先进水平,赢得了国际学术界的认可。在阿里云人工智能平台PAI算法团队和华南理工大学的老师学生们一同的坚持和热情下,将阿里云在图像生成与编辑领域的先进理念得以通过学术论文和会议的形式,向业界传递和展现。
|
26天前
|
机器学习/深度学习 人工智能 Apache
人工智能平台PAI操作报错合集之alink任务可以在本地运行,上传到flink web运行就报错,如何解决
阿里云人工智能平台PAI (Platform for Artificial Intelligence) 是阿里云推出的一套全面、易用的机器学习和深度学习平台,旨在帮助企业、开发者和数据科学家快速构建、训练、部署和管理人工智能模型。在使用阿里云人工智能平台PAI进行操作时,可能会遇到各种类型的错误。以下列举了一些常见的报错情况及其可能的原因和解决方法。
|
10天前
|
算法 搜索推荐 Java
基于SpringBoot+协同过滤算法的家政服务平台设计和实现(源码+LW+调试文档+讲解等)
基于SpringBoot+协同过滤算法的家政服务平台设计和实现(源码+LW+调试文档+讲解等)
|
2月前
|
算法 关系型数据库 MySQL
实时计算 Flink版产品使用合集之哪个版本可以做增量快照算法
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
机器学习/深度学习 人工智能 流计算
人工智能平台PAI 操作报错合集之在集群上提交了包含alink相关功能的flink任务,但是却报错如何解决
阿里云人工智能平台PAI (Platform for Artificial Intelligence) 是阿里云推出的一套全面、易用的机器学习和深度学习平台,旨在帮助企业、开发者和数据科学家快速构建、训练、部署和管理人工智能模型。在使用阿里云人工智能平台PAI进行操作时,可能会遇到各种类型的错误。以下列举了一些常见的报错情况及其可能的原因和解决方法。
|
2月前
|
机器学习/深度学习 人工智能 运维
人工智能平台PAI 操作报错合集之请问Alink的算法中的序列异常检测组件,是对数据进行分组后分别在每个组中执行异常检测,而不是将数据看作时序数据进行异常检测吧
阿里云人工智能平台PAI (Platform for Artificial Intelligence) 是阿里云推出的一套全面、易用的机器学习和深度学习平台,旨在帮助企业、开发者和数据科学家快速构建、训练、部署和管理人工智能模型。在使用阿里云人工智能平台PAI进行操作时,可能会遇到各种类型的错误。以下列举了一些常见的报错情况及其可能的原因和解决方法。
|
2月前
|
人工智能 算法 数据安全/隐私保护
AIGC变革下人工智能平台的算法黑箱问题
AIGC变革下人工智能平台的算法黑箱问题
279 1
AIGC变革下人工智能平台的算法黑箱问题
|
2月前
|
存储 SQL 算法
flink cdc 算法问题之low hign点位有重叠如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2天前
|
机器学习/深度学习 算法 调度
Matlab|基于改进鲸鱼优化算法的微网系统能量优化管理matlab-源码
基于改进鲸鱼优化算法的微网系统能量管理源码实现,结合LSTM预测可再生能源和负荷,优化微网运行成本与固定成本。方法应用于冷热电联供微网,结果显示经济成本平均降低4.03%,提高经济效益。代码包括数据分段、LSTM网络定义及训练,最终展示了一系列运行结果图表。