快速体验Spark Connect

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: 在Apache Spark 3.4中,引入了一个解耦的客户端-服务器架构的新模块Spark Connect,允许使用DataFrame API和未解析的逻辑计划作为协议远程连接到Spark集群。客户端

介绍

在Apache Spark 3.4中,引入了一个解耦的客户端-服务器架构的新模块Spark Connect,允许使用DataFrame API和未解析的逻辑计划作为协议远程连接到Spark集群。客户端和服务器之间的分离允许Spark及其开放生态系统从任何地方利用。它可以嵌入到现代数据应用程序,IDE,笔记本电脑和编程语言中。

工作原理

Spark Connect轻量级客户端将DataFrame操作转换为使用协议缓冲区编码的未解析逻辑查询计划。这些计划通过gRPC框架发送到服务器。

服务器上的Spark Connect服务端接收未解析的逻辑计划并将其转换为Spark的逻辑计划操作符,类似于解析SQL查询,其中解析属性和关系并构建初始解析计划。然后,标准的Spark执行过程启动,Spark Connect会利用Spark的所有优化和增强。最终的结果通过gRPC作为Apache Arrow编码的行批处理流回客户端。

场景用途

  1. 稳定性:使用过多内存的应用程序现在只会影响它们自己的环境,因为它们可以在自己的进程中运行。用户可以在客户端上定义自己的依赖项,而不需要担心与Spark驱动程序的潜在冲突。
  2. 可升级性:Spark驱动程序现在可以独立于应用程序进行无缝升级,例如从性能改进和安全修复中受益。这意味着应用程序可以向前兼容,只要服务器端RPC定义被设计为向后兼容。
  3. 可调试性和可观察性:Spark Connect支持在开发过程中直接从您喜爱的IDE进行交互式调试。类似地,可以使用应用程序的框架原生指标和日志库来监视应用程序。

简单总结一下,就是可以基于轻量级的依赖整合到java体系项目或者python体系项目,甚至是go体系的项目中,然后实现轻松调用spark connect服务端所在大数据集群的能力,进行数据分析或者数据处理。以后想在web项目中调用spark的能力不再需要引入过多的依赖导致依赖冲突等问题,也不会因为driver端和web服务整合在一起而导致不稳定。

快速上手

截至发稿时,Spark已经更新到3.5.0版本。下面的案例都以3.5.0版本为主。这里主要实践中Java项目中引入Spark Connect客户端,连接远程的Spark Connect服务器。

搭建connect server

启动包含jdk1.8的容器进行搭建

docker run -it -p 15002:15002 -p 4040:4040  --name spark --rm  registry.cn-hangzhou.aliyuncs.com/udh/jdk:1.8.141 bash

容器内下载spark3.5.0的包

wget https://dlcdn.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz

容器内启动connect server

./sbin/start-connect-server.sh --jars /root/.ivy2/jars/org.apache.spark_spark-connect_2.12-3.5.0.jar,/root/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar

需要在容器内提前下载好两个依赖的jar。

https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.0/spark-connect_2.12-3.5.0.jar

https://repo1.maven.org/maven2/org/spark-project/spark/unused/1.0.0/unused-1.0.0.jar

启动参数:

Usage: ./sbin/start-connect-server.sh [options]

Options:
  --master MASTER_URL         spark://host:port, mesos://host:port, yarn,
                              k8s://https://host:port, or local (Default: local[*]).
  --deploy-mode DEPLOY_MODE   Whether to launch the driver program locally ("client") or
                              on one of the worker machines inside the cluster ("cluster")
                              (Default: client).
  --class CLASS_NAME          Your application's main class (for Java / Scala apps).
  --name NAME                 A name of your application.
  --jars JARS                 Comma-separated list of jars to include on the driver
                              and executor classpaths.
  --packages                  Comma-separated list of maven coordinates of jars to include
                              on the driver and executor classpaths. Will search the local
                              maven repo, then maven central and any additional remote
                              repositories given by --repositories. The format for the
                              coordinates should be groupId:artifactId:version.
  --exclude-packages          Comma-separated list of groupId:artifactId, to exclude while
                              resolving the dependencies provided in --packages to avoid
                              dependency conflicts.
  --repositories              Comma-separated list of additional remote repositories to
                              search for the maven coordinates given with --packages.
  --py-files PY_FILES         Comma-separated list of .zip, .egg, or .py files to place
                              on the PYTHONPATH for Python apps.
  --files FILES               Comma-separated list of files to be placed in the working
                              directory of each executor. File paths of these files
                              in executors can be accessed via SparkFiles.get(fileName).
  --archives ARCHIVES         Comma-separated list of archives to be extracted into the
                              working directory of each executor.

  --conf, -c PROP=VALUE       Arbitrary Spark configuration property.
  --properties-file FILE      Path to a file from which to load extra properties. If not
                              specified, this will look for conf/spark-defaults.conf.

  --driver-memory MEM         Memory for driver (e.g. 1000M, 2G) (Default: 1024M).
  --driver-java-options       Extra Java options to pass to the driver.
  --driver-library-path       Extra library path entries to pass to the driver.
  --driver-class-path         Extra class path entries to pass to the driver. Note that
                              jars added with --jars are automatically included in the
                              classpath.

  --executor-memory MEM       Memory per executor (e.g. 1000M, 2G) (Default: 1G).

  --proxy-user NAME           User to impersonate when submitting the application.
                              This argument does not work with --principal / --keytab.

  --help, -h                  Show this help message and exit.
  --verbose, -v               Print additional debug output.
  --version,                  Print the version of current Spark.

 Spark Connect only:
   --remote CONNECT_URL       URL to connect to the server for Spark Connect, e.g.,
                              sc://host:port. --master and --deploy-mode cannot be set
                              together with this option. This option is experimental, and
                              might change between minor releases.

 Cluster deploy mode only:
  --driver-cores NUM          Number of cores used by the driver, only in cluster mode
                              (Default: 1).

 Spark standalone or Mesos with cluster deploy mode only:
  --supervise                 If given, restarts the driver on failure.

 Spark standalone, Mesos or K8s with cluster deploy mode only:
  --kill SUBMISSION_ID        If given, kills the driver specified.
  --status SUBMISSION_ID      If given, requests the status of the driver specified.

 Spark standalone, Mesos and Kubernetes only:
  --total-executor-cores NUM  Total cores for all executors.

 Spark standalone, YARN and Kubernetes only:
  --executor-cores NUM        Number of cores used by each executor. (Default: 1 in
                              YARN and K8S modes, or all available cores on the worker
                              in standalone mode).

 Spark on YARN and Kubernetes only:
  --num-executors NUM         Number of executors to launch (Default: 2).
                              If dynamic allocation is enabled, the initial number of
                              executors will be at least NUM.
  --principal PRINCIPAL       Principal to be used to login to KDC.
  --keytab KEYTAB             The full path to the file that contains the keytab for the
                              principal specified above.

 Spark on YARN only:
  --queue QUEUE_NAME          The YARN queue to submit to (Default: "default").

java整合connect client

参考官方文档,搭建客户端代码原本以为应该是比较简单的。。。

建立一个maven项目,添加pom

<dependencies>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql-api_2.12</artifactId>
      <version>3.5.0</version>
    </dependency>


    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-connect-client-jvm -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-connect-client-jvm_2.12</artifactId>
      <version>3.5.0</version>
    </dependency>

  </dependencies>

java代码

public class App 
{
    public static void main( String[] args )
    {
        SparkSession spark = SparkSession.builder().remote("sc://localhost").build();

        spark.stop();
    }
}

结果,报错了

查看spark的官方社区,找到了相同的问题:

https://issues.apache.org/jira/browse/SPARK-45255?jql=project%20%3D%20SPARK%20AND%20text%20~%20%22CacheLoader%22%20ORDER%20BY%20priority%20DESC%2C%20updated%20DESC

临时解决思路

shade包依赖生成项目

建立两个maven项目,一个叫shade-spark-connect。用于将缺失依赖shade倒入一个胖包。

定义pom

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>org.example</groupId>
  <artifactId>shade-spark-connect</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>shade-spark-connect</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <spark.shade.packageName>org.sparkproject.connect.client</spark.shade.packageName>
    <io.grpc.version>1.59.0</io.grpc.version>
    <netty.version>4.1.100.Final</netty.version>

  </properties>

  <dependencies>
    <dependency>
      <groupId>com.google.guava</groupId>
      <artifactId>guava</artifactId>
      <version>32.0.1-jre</version>
    </dependency>

    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>3.25.1</version>
    </dependency>
  </dependencies>


  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.1</version>
        <configuration>
          <shadedArtifactAttached>false</shadedArtifactAttached>
          <artifactSet>
            <!-- 包含 guava jar,但排除 com.google.thirdparty目录 -->
            <includes>
              <include>com.google.guava:*</include>
            </includes>
          </artifactSet>
          <transformers>
            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/>
          </transformers>
        </configuration>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <filters>
                <filter>
                  <artifact>*:*</artifact>
                  <excludes>
                    <exclude>com/google/thirdparty/**</exclude>
                  </excludes>
                </filter>
              </filters>
            </configuration>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.1</version>
        <configuration>
          <shadedArtifactAttached>false</shadedArtifactAttached>
          <artifactSet>

            <includes>
              <include>com.google.protobuf:**</include>
              <include>com.google.guava:**</include>


            </includes>
          </artifactSet>

          <!--                  依赖的 jar 包中的一些类文件打包到项目构建生成的 jar 包中,在打包的时候把类重命名-->
          <relocations>
            <relocation>
              <pattern>com.google.common</pattern>
              <shadedPattern>org.sparkproject.connect.client.com.google.common</shadedPattern>
              <includes>
                <include>com.google.common.*</include>
              </includes>

            </relocation>
            <relocation>
              <pattern>com.google.protobuf</pattern>
              <shadedPattern>org.sparkproject.connect.client.com.google.protobuf</shadedPattern>
              <includes>
                <include>com.google.protobuf.**</include>
              </includes>

            </relocation>
          </relocations>
          <transformers>
            <transformer
                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/>
          </transformers>
        </configuration>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

    </plugins>

  </build>
</project>

测试connect client的项目

建另一个maven项目,用于测试connect代码。

pom代码

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>org.example</groupId>
  <artifactId>spark-connect-demo</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>spark-connect-demo</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <io.grpc.version>1.59.0</io.grpc.version>
    <netty.version>4.1.100.Final</netty.version>

  </properties>

  <dependencies>
    <dependency>
      <groupId>io.grpc</groupId>
      <artifactId>grpc-netty</artifactId>
      <version>${io.grpc.version}</version>
    </dependency>
    <dependency>
      <groupId>io.grpc</groupId>
      <artifactId>grpc-protobuf</artifactId>
      <version>${io.grpc.version}</version>
    </dependency>
    <dependency>
      <groupId>io.grpc</groupId>
      <artifactId>grpc-services</artifactId>
      <version>${io.grpc.version}</version>
    </dependency>
    <dependency>
      <groupId>io.grpc</groupId>
      <artifactId>grpc-stub</artifactId>
      <version>${io.grpc.version}</version>
    </dependency>
    <dependency>
      <groupId>io.grpc</groupId>
      <artifactId>grpc-inprocess</artifactId>
      <version>${io.grpc.version}</version>
    </dependency>
    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-codec-http2</artifactId>
      <version>${netty.version}</version>
    </dependency>
    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-handler-proxy</artifactId>
      <version>${netty.version}</version>
    </dependency>
    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-transport-native-unix-common</artifactId>
      <version>${netty.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-connect-client-jvm_2.12</artifactId>
      <version>3.5.0</version>
    </dependency>

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.example</groupId>
      <artifactId>demo</artifactId>
      <version>1.0</version>
      <scope>system</scope>
      <systemPath>E:\opensource\shade-spark-connect\target\shade-spark-connect-1.0-SNAPSHOT.jar</systemPath>
    </dependency>
  </dependencies>


  <build>
   <plugins>


     <plugin>
       <groupId>org.scala-tools</groupId>
       <artifactId>maven-scala-plugin</artifactId>
       <version>2.12.2</version> <!-- 适用于你的 Scala 版本 -->
       <executions>
         <execution>
           <goals>
             <goal>compile</goal>
             <goal>testCompile</goal>
           </goals>
         </execution>
       </executions>
     </plugin>
   </plugins>


  </build>
</project>

其中E:\opensource\shade-spark-connect\target\shade-spark-connect-1.0-SNAPSHOT.jar指向上面shade-spark-connect使用package命令生成到包。

java代码

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.expressions._
object Myapp extends App {
   
   

  println(spark.version)
  val spark: SparkSession = SparkSession.builder.remote("sc://localhost").getOrCreate()
//using list to init  df
  val df = spark.createDataFrame(List(("a a a", 1), ("b b b b", 2), ("c c d a", 3))).toDF("value", "col2")
  df.show()


// word count
  val words = df.select(explode(split(col("value"), " ")).as("word"))
  val wordCounts = words.groupBy("word").count()
  wordCounts.show()


  spark.stop()

}

可以看到connect server在计算完返回的结果了

总结

我也尝试过使用PySpark的Connect客户端连接Connect服务器,按照官方文档的步骤进行操作是完全没有问题的。**猜测可能是因为目前更多的用户结合Python使用这个模块,而使用Scala和Java的用户相对较少。或许随着时间的推移,官方会解决这个问题。如果你希望尽早体验这一功能,可以按照文章中的方法迅速进行尝试。

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
8月前
|
Python
【已解决】Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
【已解决】Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
325 0
|
消息中间件 分布式计算 大数据
【Spark Summit East 2017】 使用Kafka Connect和Spark Streaming构建实时数据管道
本讲义出自Ewen Cheslack Postava在Spark Summit East 2017上的演讲,主要介绍了面对使用Spark Streaming构建大规模实时数据管道的挑战,Kafka项目最近推出了新的工具—— Kafka Connect,该工具将帮助简化从Kafka导入和导出数据,Ewen Cheslack Postava分享了如何使用Kafka Connect和Spark Streaming构建实时数据管道。
2254 0
|
2月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
154 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
3月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
77 0
|
3月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
51 0
|
3月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
105 0
|
2月前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
101 6
|
2月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
129 2
|
2月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
93 1
|
2月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
73 1