通过 EMR Serverless Spark 提交 PySpark 流任务

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 在大数据快速发展的时代,流式处理技术对于实时数据分析至关重要。EMR Serverless Spark提供了一个强大而可扩展的平台,它不仅简化了实时数据处理流程,还免去了服务器管理的烦恼,提升了效率。本文将指导您使用EMR Serverless Spark提交PySpark流式任务,展示其在流处理方面的易用性和可运维性。

在大数据快速发展的时代,流式处理技术对于实时数据分析至关重要。EMR Serverless Spark提供了一个强大而可扩展的平台,它不仅简化了实时数据处理流程,还免去了服务器管理的烦恼,提升了效率。本文将指导您使用EMR Serverless Spark提交PySpark流式任务,展示其在流处理方面的易用性和可运维性。


前提条件

已创建工作空间,详情请参见创建工作空间


操作流程

步骤一:创建实时数据流集群并产生消息

  1. 在EMR on ECS页面,创建包含Kafka服务的实时数据流集群,详情请参见创建集群
  2. 登录EMR集群的Master节点,详情请参见登录集群
  3. 执行以下命令,切换目录。
cd /var/log/emr/taihao_exporter


  1. 执行以下命令,创建Topic。
# 创建名为taihaometrics的Topic,分区数10,副本因子2。
kafka-topics.sh --partitions 10 --replication-factor 2 --bootstrap-server core-1-1:9092 --topic taihaometrics --create


  1. 执行以下命令,发送消息。
# 使用kafka-console-producer发送消息到taihaometrics Topic。
tail -f metrics.log | kafka-console-producer.sh --broker-list core-1-1:9092 --topic taihaometrics


步骤二:新增网络连接

  1. 进入网络连接页面。
  1. 在EMR控制台的左侧导航栏,选择EMR Serverless > Spark
  2. Spark页面,单击目标工作空间名称。
  3. EMR Serverless Spark页面,单击左侧导航栏中的网络连接
  1. 网络连接页面,单击新增网络连接
  2. 新增网络连接对话框中,配置以下信息,单击确定

参数

说明

连接名称

输入新增连接的名称。例如,connection_to_emr_kafka。

专有网络

选择与EMR集群相同的专有网络。

如果当前没有可选择的专有网络,请单击创建专有网络,前往专有网络控制台创建,详情请参见创建和管理专有网络

交换机

选择与EMR集群部署在同一专有网络下的相同交换机。

如果当前可用区没有交换机,请单击虚拟交换机,前往专有网络控制台创建,详情请参见创建和管理交换机

状态显示为已成功时,表示新增网络连接成功。


步骤三:为EMR集群添加安全组规则

  1. 获取集群节点交换机的网段。
    您可以在节点管理页面,单击节点组名称,查看关联的交换机信息,然后登录专有网络管理控制台,在交换机页面获取交换机的网段。


  1. 添加安全组规则。
  1. 集群管理页面,单击目标集群的集群ID。
  2. 基础信息页面,单击集群安全组后面的链接。
  3. 在安全组规则页面,单击手动添加,填写端口范围和授权对象,然后单击保存

参数

说明

端口范围

填写9092端口。

授权对象

填写前一步骤中获取的指定交换机的网段。

重要

为防止被外部的用户攻击导致安全问题,授权对象禁止填写为0.0.0.0/0。


步骤四:上传JAR包至OSS

上传kafka.zip中的所有JAR包至OSS,上传操作可以参见简单上传


步骤五:上传资源文件

  1. 在EMR Serverless Spark页面,单击左侧导航栏中的资源上传
  2. 资源上传页面,单击上传文件
  3. 上传文件对话框中,单击待上传文件区域选择pyspark_ss_demo.py文件。


步骤六:新建并启动流任务

  1. 在EMR Serverless Spark页面,单击左侧的任务开发
  2. 单击新建
  3. 输入任务名称,新建一个Application(流任务) > PySpark类型的任务,然后单击确定
  4. 在新建的任务开发中,配置以下信息,其余参数无需配置,然后单击保存

参数

说明

主Python资源

选择前一个步骤中在资源上传页面上传的pyspark_ss_demo.py文件。

引擎版本

Spark的版本,详情请参见引擎版本介绍

运行参数

EMR集群core-1-1节点的内网IP地址。您可以在EMR集群的节点管理页面的Core节点组下查看。

Spark配置

Spark的配置信息。本文示例如下。

spark.jars oss://<yourBucket>/kafka-lib/commons-pool2-2.11.1.jar,oss://<yourBucket>/kafka-lib/kafka-clients-2.8.1.jar,oss://<yourBucket>/kafka-lib/spark-sql-kafka-0-10_2.12-3.3.1.jar,oss://<yourBucket>/kafka-lib/spark-token-provider-kafka-0-10_2.12-3.3.1.jar
spark.emr.serverless.network.service.name connection_to_emr_kafka

说明

spark.jars用于指定Spark任务运行时需要加载的外部JAR包路径。本文示例为您步骤四中上传至OSS的路径,请您根据实际情况替换。


  1. 单击发布
  2. 发布任务对话框中,单击确定
  3. 启动流任务。
  1. 单击前往运维
  2. 单击启动


步骤七:查看日志

  1. 单击日志探查页签。
  2. Driver日志列表中,单击stdOut.log
    在打开的日志文件中,您可以看到应用程序执行的相关信息以及返回的结果。


相关文档

  1. EMR Serverless Spark 版官网:https://www.aliyun.com/product/bigdata/serverlessspark
  2. 产品控制台:https://emr-next.console.aliyun.com/
  3. 产品文档:https://help.aliyun.com/zh/emr/emr-serverless-spark/
  4. PySpark 批任务的开发流程示例:PySpark任务快速入门



EMR Serverless Spark 在 2024年5月正式开启公测,在公测期间可以免费使用最高 100 CU 计算资源,欢迎试用。如果您在使用 EMR Serverless Spark 版的过程中遇到任何疑问,可钉钉扫描以下二维码加入钉钉群(群号:58570004119)咨询。

相关实践学习
基于函数计算一键部署掌上游戏机
本场景介绍如何使用阿里云计算服务命令快速搭建一个掌上游戏机。
建立 Serverless 思维
本课程包括: Serverless 应用引擎的概念, 为开发者带来的实际价值, 以及让您了解常见的 Serverless 架构模式
目录
相关文章
|
2天前
|
弹性计算 分布式计算 Serverless
全托管一站式大规模数据处理和分析Serverless平台 | EMR Serverless Spark 评测
【7月更文挑战第6天】全托管一站式大规模数据处理和分析Serverless平台 | EMR Serverless Spark 评测
|
5天前
|
分布式计算 Hadoop Serverless
数据处理的艺术:EMR Serverless Spark实践及应用体验
阿里云EMR Serverless Spark是基于Spark的全托管大数据处理平台,融合云原生弹性与自动化,提供任务全生命周期管理,让数据工程师专注数据分析。它内置高性能Fusion Engine,性能比开源Spark提升200%,并有成本优化的Celeborn服务。支持计算存储分离、OSS-HDFS兼容、DLF元数据管理,实现一站式的开发体验和Serverless资源管理。适用于数据报表、科学项目等场景,简化开发与运维流程。用户可通过阿里云控制台快速配置和体验EMR Serverless Spark服务。
|
10天前
|
运维 Cloud Native Serverless
函数计算产品使用问题之之前部署的sd应用可以正常使用,但现在点击链接却显示“无法访问此页面”,是什么原因
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
|
7天前
|
运维 Serverless 应用服务中间件
Serverless 应用引擎产品使用合集之关于OSS映射目录的大小限制,如何可以跳过
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
Serverless 应用引擎产品使用合集之关于OSS映射目录的大小限制,如何可以跳过
|
7天前
|
运维 Serverless API
Serverless 应用引擎产品使用合集之通过 API 调用 /tagger/v1/interrogate 时,出现unsupported protocol scheme "" 错误,如何处理
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
|
7天前
|
缓存 运维 监控
Serverless 应用引擎产品使用合集之在使用函数计算 FC 部署 stable-diffusion 应用时,选了 tagger 扩展插件却拿不到提示词,还报错“Error”,是什么原因
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
|
7天前
|
关系型数据库 MySQL Serverless
Serverless 应用引擎产品使用合集之在SAE2.0上的应用如何访问云原生数据库PolarDB MySQL版集群
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
|
7天前
|
运维 Kubernetes Java
Serverless 应用引擎产品使用合集之如何设置能让应用定时启停
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
|
7天前
|
域名解析 运维 网络协议
Serverless 应用引擎产品使用合集之一般情况下在SAE中如何关联域名
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
|
7天前
|
运维 Serverless 测试技术
Serverless 应用引擎产品使用合集之在SAE 2.0中,如何区分生产环境和测试环境
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。