快速体验Spark Connect

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: 在Apache Spark 3.4中,引入了一个解耦的客户端-服务器架构的新模块Spark Connect,允许使用DataFrame API和未解析的逻辑计划作为协议远程连接到Spark集群。客户端

介绍

在Apache Spark 3.4中,引入了一个解耦的客户端-服务器架构的新模块Spark Connect,允许使用DataFrame API和未解析的逻辑计划作为协议远程连接到Spark集群。客户端和服务器之间的分离允许Spark及其开放生态系统从任何地方利用。它可以嵌入到现代数据应用程序,IDE,笔记本电脑和编程语言中。

工作原理

Spark Connect轻量级客户端将DataFrame操作转换为使用协议缓冲区编码的未解析逻辑查询计划。这些计划通过gRPC框架发送到服务器。

服务器上的Spark Connect服务端接收未解析的逻辑计划并将其转换为Spark的逻辑计划操作符,类似于解析SQL查询,其中解析属性和关系并构建初始解析计划。然后,标准的Spark执行过程启动,Spark Connect会利用Spark的所有优化和增强。最终的结果通过gRPC作为Apache Arrow编码的行批处理流回客户端。

场景用途

  1. 稳定性:使用过多内存的应用程序现在只会影响它们自己的环境,因为它们可以在自己的进程中运行。用户可以在客户端上定义自己的依赖项,而不需要担心与Spark驱动程序的潜在冲突。
  2. 可升级性:Spark驱动程序现在可以独立于应用程序进行无缝升级,例如从性能改进和安全修复中受益。这意味着应用程序可以向前兼容,只要服务器端RPC定义被设计为向后兼容。
  3. 可调试性和可观察性:Spark Connect支持在开发过程中直接从您喜爱的IDE进行交互式调试。类似地,可以使用应用程序的框架原生指标和日志库来监视应用程序。

简单总结一下,就是可以基于轻量级的依赖整合到java体系项目或者python体系项目,甚至是go体系的项目中,然后实现轻松调用spark connect服务端所在大数据集群的能力,进行数据分析或者数据处理。以后想在web项目中调用spark的能力不再需要引入过多的依赖导致依赖冲突等问题,也不会因为driver端和web服务整合在一起而导致不稳定。

快速上手

截至发稿时,Spark已经更新到3.5.0版本。下面的案例都以3.5.0版本为主。这里主要实践中Java项目中引入Spark Connect客户端,连接远程的Spark Connect服务器。

搭建connect server

启动包含jdk1.8的容器进行搭建

docker run -it -p 15002:15002 -p 4040:4040  --name spark --rm  registry.cn-hangzhou.aliyuncs.com/udh/jdk:1.8.141 bash

容器内下载spark3.5.0的包

wget https://dlcdn.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz

容器内启动connect server

./sbin/start-connect-server.sh --jars /root/.ivy2/jars/org.apache.spark_spark-connect_2.12-3.5.0.jar,/root/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar

需要在容器内提前下载好两个依赖的jar。

https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.0/spark-connect_2.12-3.5.0.jar

https://repo1.maven.org/maven2/org/spark-project/spark/unused/1.0.0/unused-1.0.0.jar

启动参数:

Usage: ./sbin/start-connect-server.sh [options]

Options:
  --master MASTER_URL         spark://host:port, mesos://host:port, yarn,
                              k8s://https://host:port, or local (Default: local[*]).
  --deploy-mode DEPLOY_MODE   Whether to launch the driver program locally ("client") or
                              on one of the worker machines inside the cluster ("cluster")
                              (Default: client).
  --class CLASS_NAME          Your application's main class (for Java / Scala apps).
  --name NAME                 A name of your application.
  --jars JARS                 Comma-separated list of jars to include on the driver
                              and executor classpaths.
  --packages                  Comma-separated list of maven coordinates of jars to include
                              on the driver and executor classpaths. Will search the local
                              maven repo, then maven central and any additional remote
                              repositories given by --repositories. The format for the
                              coordinates should be groupId:artifactId:version.
  --exclude-packages          Comma-separated list of groupId:artifactId, to exclude while
                              resolving the dependencies provided in --packages to avoid
                              dependency conflicts.
  --repositories              Comma-separated list of additional remote repositories to
                              search for the maven coordinates given with --packages.
  --py-files PY_FILES         Comma-separated list of .zip, .egg, or .py files to place
                              on the PYTHONPATH for Python apps.
  --files FILES               Comma-separated list of files to be placed in the working
                              directory of each executor. File paths of these files
                              in executors can be accessed via SparkFiles.get(fileName).
  --archives ARCHIVES         Comma-separated list of archives to be extracted into the
                              working directory of each executor.

  --conf, -c PROP=VALUE       Arbitrary Spark configuration property.
  --properties-file FILE      Path to a file from which to load extra properties. If not
                              specified, this will look for conf/spark-defaults.conf.

  --driver-memory MEM         Memory for driver (e.g. 1000M, 2G) (Default: 1024M).
  --driver-java-options       Extra Java options to pass to the driver.
  --driver-library-path       Extra library path entries to pass to the driver.
  --driver-class-path         Extra class path entries to pass to the driver. Note that
                              jars added with --jars are automatically included in the
                              classpath.

  --executor-memory MEM       Memory per executor (e.g. 1000M, 2G) (Default: 1G).

  --proxy-user NAME           User to impersonate when submitting the application.
                              This argument does not work with --principal / --keytab.

  --help, -h                  Show this help message and exit.
  --verbose, -v               Print additional debug output.
  --version,                  Print the version of current Spark.

 Spark Connect only:
   --remote CONNECT_URL       URL to connect to the server for Spark Connect, e.g.,
                              sc://host:port. --master and --deploy-mode cannot be set
                              together with this option. This option is experimental, and
                              might change between minor releases.

 Cluster deploy mode only:
  --driver-cores NUM          Number of cores used by the driver, only in cluster mode
                              (Default: 1).

 Spark standalone or Mesos with cluster deploy mode only:
  --supervise                 If given, restarts the driver on failure.

 Spark standalone, Mesos or K8s with cluster deploy mode only:
  --kill SUBMISSION_ID        If given, kills the driver specified.
  --status SUBMISSION_ID      If given, requests the status of the driver specified.

 Spark standalone, Mesos and Kubernetes only:
  --total-executor-cores NUM  Total cores for all executors.

 Spark standalone, YARN and Kubernetes only:
  --executor-cores NUM        Number of cores used by each executor. (Default: 1 in
                              YARN and K8S modes, or all available cores on the worker
                              in standalone mode).

 Spark on YARN and Kubernetes only:
  --num-executors NUM         Number of executors to launch (Default: 2).
                              If dynamic allocation is enabled, the initial number of
                              executors will be at least NUM.
  --principal PRINCIPAL       Principal to be used to login to KDC.
  --keytab KEYTAB             The full path to the file that contains the keytab for the
                              principal specified above.

 Spark on YARN only:
  --queue QUEUE_NAME          The YARN queue to submit to (Default: "default").

java整合connect client

参考官方文档,搭建客户端代码原本以为应该是比较简单的。。。

建立一个maven项目,添加pom

<dependencies>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql-api_2.12</artifactId>
      <version>3.5.0</version>
    </dependency>


    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-connect-client-jvm -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-connect-client-jvm_2.12</artifactId>
      <version>3.5.0</version>
    </dependency>

  </dependencies>

java代码

public class App 
{
    public static void main( String[] args )
    {
        SparkSession spark = SparkSession.builder().remote("sc://localhost").build();

        spark.stop();
    }
}

结果,报错了

查看spark的官方社区,找到了相同的问题:

https://issues.apache.org/jira/browse/SPARK-45255?jql=project%20%3D%20SPARK%20AND%20text%20~%20%22CacheLoader%22%20ORDER%20BY%20priority%20DESC%2C%20updated%20DESC

临时解决思路

shade包依赖生成项目

建立两个maven项目,一个叫shade-spark-connect。用于将缺失依赖shade倒入一个胖包。

定义pom

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>org.example</groupId>
  <artifactId>shade-spark-connect</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>shade-spark-connect</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <spark.shade.packageName>org.sparkproject.connect.client</spark.shade.packageName>
    <io.grpc.version>1.59.0</io.grpc.version>
    <netty.version>4.1.100.Final</netty.version>

  </properties>

  <dependencies>
    <dependency>
      <groupId>com.google.guava</groupId>
      <artifactId>guava</artifactId>
      <version>32.0.1-jre</version>
    </dependency>

    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>3.25.1</version>
    </dependency>
  </dependencies>


  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.1</version>
        <configuration>
          <shadedArtifactAttached>false</shadedArtifactAttached>
          <artifactSet>
            <!-- 包含 guava jar,但排除 com.google.thirdparty目录 -->
            <includes>
              <include>com.google.guava:*</include>
            </includes>
          </artifactSet>
          <transformers>
            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/>
          </transformers>
        </configuration>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <filters>
                <filter>
                  <artifact>*:*</artifact>
                  <excludes>
                    <exclude>com/google/thirdparty/**</exclude>
                  </excludes>
                </filter>
              </filters>
            </configuration>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.1</version>
        <configuration>
          <shadedArtifactAttached>false</shadedArtifactAttached>
          <artifactSet>

            <includes>
              <include>com.google.protobuf:**</include>
              <include>com.google.guava:**</include>


            </includes>
          </artifactSet>

          <!--                  依赖的 jar 包中的一些类文件打包到项目构建生成的 jar 包中,在打包的时候把类重命名-->
          <relocations>
            <relocation>
              <pattern>com.google.common</pattern>
              <shadedPattern>org.sparkproject.connect.client.com.google.common</shadedPattern>
              <includes>
                <include>com.google.common.*</include>
              </includes>

            </relocation>
            <relocation>
              <pattern>com.google.protobuf</pattern>
              <shadedPattern>org.sparkproject.connect.client.com.google.protobuf</shadedPattern>
              <includes>
                <include>com.google.protobuf.**</include>
              </includes>

            </relocation>
          </relocations>
          <transformers>
            <transformer
                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/>
          </transformers>
        </configuration>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

    </plugins>

  </build>
</project>

测试connect client的项目

建另一个maven项目,用于测试connect代码。

pom代码

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>org.example</groupId>
  <artifactId>spark-connect-demo</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>spark-connect-demo</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <io.grpc.version>1.59.0</io.grpc.version>
    <netty.version>4.1.100.Final</netty.version>

  </properties>

  <dependencies>
    <dependency>
      <groupId>io.grpc</groupId>
      <artifactId>grpc-netty</artifactId>
      <version>${io.grpc.version}</version>
    </dependency>
    <dependency>
      <groupId>io.grpc</groupId>
      <artifactId>grpc-protobuf</artifactId>
      <version>${io.grpc.version}</version>
    </dependency>
    <dependency>
      <groupId>io.grpc</groupId>
      <artifactId>grpc-services</artifactId>
      <version>${io.grpc.version}</version>
    </dependency>
    <dependency>
      <groupId>io.grpc</groupId>
      <artifactId>grpc-stub</artifactId>
      <version>${io.grpc.version}</version>
    </dependency>
    <dependency>
      <groupId>io.grpc</groupId>
      <artifactId>grpc-inprocess</artifactId>
      <version>${io.grpc.version}</version>
    </dependency>
    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-codec-http2</artifactId>
      <version>${netty.version}</version>
    </dependency>
    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-handler-proxy</artifactId>
      <version>${netty.version}</version>
    </dependency>
    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-transport-native-unix-common</artifactId>
      <version>${netty.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-connect-client-jvm_2.12</artifactId>
      <version>3.5.0</version>
    </dependency>

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.example</groupId>
      <artifactId>demo</artifactId>
      <version>1.0</version>
      <scope>system</scope>
      <systemPath>E:\opensource\shade-spark-connect\target\shade-spark-connect-1.0-SNAPSHOT.jar</systemPath>
    </dependency>
  </dependencies>


  <build>
   <plugins>


     <plugin>
       <groupId>org.scala-tools</groupId>
       <artifactId>maven-scala-plugin</artifactId>
       <version>2.12.2</version> <!-- 适用于你的 Scala 版本 -->
       <executions>
         <execution>
           <goals>
             <goal>compile</goal>
             <goal>testCompile</goal>
           </goals>
         </execution>
       </executions>
     </plugin>
   </plugins>


  </build>
</project>

其中E:\opensource\shade-spark-connect\target\shade-spark-connect-1.0-SNAPSHOT.jar指向上面shade-spark-connect使用package命令生成到包。

java代码

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.expressions._
object Myapp extends App {
   
   

  println(spark.version)
  val spark: SparkSession = SparkSession.builder.remote("sc://localhost").getOrCreate()
//using list to init  df
  val df = spark.createDataFrame(List(("a a a", 1), ("b b b b", 2), ("c c d a", 3))).toDF("value", "col2")
  df.show()


// word count
  val words = df.select(explode(split(col("value"), " ")).as("word"))
  val wordCounts = words.groupBy("word").count()
  wordCounts.show()


  spark.stop()

}

可以看到connect server在计算完返回的结果了

总结

我也尝试过使用PySpark的Connect客户端连接Connect服务器,按照官方文档的步骤进行操作是完全没有问题的。**猜测可能是因为目前更多的用户结合Python使用这个模块,而使用Scala和Java的用户相对较少。或许随着时间的推移,官方会解决这个问题。如果你希望尽早体验这一功能,可以按照文章中的方法迅速进行尝试。

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
4月前
|
Python
【已解决】Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
【已解决】Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
164 0
|
消息中间件 分布式计算 大数据
【Spark Summit East 2017】 使用Kafka Connect和Spark Streaming构建实时数据管道
本讲义出自Ewen Cheslack Postava在Spark Summit East 2017上的演讲,主要介绍了面对使用Spark Streaming构建大规模实时数据管道的挑战,Kafka项目最近推出了新的工具—— Kafka Connect,该工具将帮助简化从Kafka导入和导出数据,Ewen Cheslack Postava分享了如何使用Kafka Connect和Spark Streaming构建实时数据管道。
2239 0
|
2月前
|
机器学习/深度学习 分布式计算 算法
Spark快速大数据分析PDF下载读书分享推荐
《Spark快速大数据分析》适合初学者,聚焦Spark实用技巧,同时深入核心概念。作者团队来自Databricks,书中详述Spark 3.0新特性,结合机器学习展示大数据分析。Spark是大数据分析的首选工具,本书助你驾驭这一利器。[PDF下载链接][1]。 ![Spark Book Cover][2] [1]: https://zhangfeidezhu.com/?p=345 [2]: https://i-blog.csdnimg.cn/direct/6b851489ad1944548602766ea9d62136.png#pic_center
104 1
Spark快速大数据分析PDF下载读书分享推荐
|
29天前
|
分布式计算 资源调度 大数据
【决战大数据之巅】:Spark Standalone VS YARN —— 揭秘两大部署模式的恩怨情仇与终极对决!
【8月更文挑战第7天】随着大数据需求的增长,Apache Spark 成为关键框架。本文对比了常见的 Spark Standalone 与 YARN 部署模式。Standalone 作为自带的轻量级集群管理服务,易于设置,适用于小规模或独立部署;而 YARN 作为 Hadoop 的资源管理系统,支持资源的统一管理和调度,更适合大规模生产环境及多框架集成。我们将通过示例代码展示如何在这两种模式下运行 Spark 应用程序。
107 3
|
5天前
|
机器学习/深度学习 分布式计算 大数据
Spark 适合解决多种类型的大数据处理问题
【9月更文挑战第1天】Spark 适合解决多种类型的大数据处理问题
16 3
|
8天前
|
分布式计算 大数据 Apache
跨越界限:当.NET遇上Apache Spark,大数据世界的新篇章如何谱写?
【8月更文挑战第28天】随着信息时代的发展,大数据已成为推动企业决策、科研与技术创新的关键力量。Apache Spark凭借其卓越的分布式计算能力和多功能数据处理特性,在大数据领域占据重要地位。然而,对于.NET开发者而言,如何在Spark生态中发挥自身优势成为一个新课题。为此,微软与Apache Spark社区共同推出了.NET for Apache Spark,使开发者能用C#、F#等语言编写Spark应用,不仅保留了Spark的强大功能,还融合了.NET的强类型系统、丰富库支持及良好跨平台能力,极大地降低了学习门槛并拓展了.NET的应用范围。
24 3
|
14天前
|
分布式计算 大数据 数据处理
Apache Spark的应用与优势:解锁大数据处理的无限潜能
【8月更文挑战第23天】Apache Spark以其卓越的性能、易用性、通用性、弹性与可扩展性以及丰富的生态系统,在大数据处理领域展现出了强大的竞争力和广泛的应用前景。随着大数据技术的不断发展和普及,Spark必将成为企业实现数字化转型和业务创新的重要工具。未来,我们有理由相信,Spark将继续引领大数据处理技术的发展潮流,为企业创造更大的价值。
|
5天前
|
Java Spring API
Spring框架与GraphQL的史诗级碰撞:颠覆传统,重塑API开发的未来传奇!
【8月更文挑战第31天】《Spring框架与GraphQL:构建现代API》介绍了如何结合Spring框架与GraphQL构建高效、灵活的API。首先通过引入`spring-boot-starter-data-graphql`等依赖支持GraphQL,然后定义查询和类型,利用`@GraphQLQuery`等注解实现具体功能。Spring的依赖注入和事务管理进一步增强了GraphQL服务的能力。示例展示了从查询到突变的具体实现,证明了Spring与GraphQL结合的强大潜力,适合现代API设计与开发。
15 0
|
29天前
|
分布式计算 Hadoop 大数据
Spark 与 Hadoop 的大数据之战:一场惊心动魄的技术较量,决定数据处理的霸权归属!
【8月更文挑战第7天】无论是 Spark 的高效内存计算,还是 Hadoop 的大规模数据存储和处理能力,它们都为大数据的发展做出了重要贡献。
60 2
|
3月前
|
存储 分布式计算 Hadoop
Spark和Hadoop都是大数据处理领域的重要工具
【6月更文挑战第17天】Spark和Hadoop都是大数据处理领域的重要工具
161 59
下一篇
DDNS