快速体验Spark Connect

简介: 在Apache Spark 3.4中,引入了一个解耦的客户端-服务器架构的新模块Spark Connect,允许使用DataFrame API和未解析的逻辑计划作为协议远程连接到Spark集群。客户端

介绍

在Apache Spark 3.4中,引入了一个解耦的客户端-服务器架构的新模块Spark Connect,允许使用DataFrame API和未解析的逻辑计划作为协议远程连接到Spark集群。客户端和服务器之间的分离允许Spark及其开放生态系统从任何地方利用。它可以嵌入到现代数据应用程序,IDE,笔记本电脑和编程语言中。

工作原理

Spark Connect轻量级客户端将DataFrame操作转换为使用协议缓冲区编码的未解析逻辑查询计划。这些计划通过gRPC框架发送到服务器。

服务器上的Spark Connect服务端接收未解析的逻辑计划并将其转换为Spark的逻辑计划操作符,类似于解析SQL查询,其中解析属性和关系并构建初始解析计划。然后,标准的Spark执行过程启动,Spark Connect会利用Spark的所有优化和增强。最终的结果通过gRPC作为Apache Arrow编码的行批处理流回客户端。

场景用途

  1. 稳定性:使用过多内存的应用程序现在只会影响它们自己的环境,因为它们可以在自己的进程中运行。用户可以在客户端上定义自己的依赖项,而不需要担心与Spark驱动程序的潜在冲突。
  2. 可升级性:Spark驱动程序现在可以独立于应用程序进行无缝升级,例如从性能改进和安全修复中受益。这意味着应用程序可以向前兼容,只要服务器端RPC定义被设计为向后兼容。
  3. 可调试性和可观察性:Spark Connect支持在开发过程中直接从您喜爱的IDE进行交互式调试。类似地,可以使用应用程序的框架原生指标和日志库来监视应用程序。

简单总结一下,就是可以基于轻量级的依赖整合到java体系项目或者python体系项目,甚至是go体系的项目中,然后实现轻松调用spark connect服务端所在大数据集群的能力,进行数据分析或者数据处理。以后想在web项目中调用spark的能力不再需要引入过多的依赖导致依赖冲突等问题,也不会因为driver端和web服务整合在一起而导致不稳定。

快速上手

截至发稿时,Spark已经更新到3.5.0版本。下面的案例都以3.5.0版本为主。这里主要实践中Java项目中引入Spark Connect客户端,连接远程的Spark Connect服务器。

搭建connect server

启动包含jdk1.8的容器进行搭建

docker run -it -p 15002:15002 -p 4040:4040  --name spark --rm  registry.cn-hangzhou.aliyuncs.com/udh/jdk:1.8.141 bash

容器内下载spark3.5.0的包

wget https://dlcdn.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz

容器内启动connect server

./sbin/start-connect-server.sh --jars /root/.ivy2/jars/org.apache.spark_spark-connect_2.12-3.5.0.jar,/root/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar

需要在容器内提前下载好两个依赖的jar。

https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.0/spark-connect_2.12-3.5.0.jar

https://repo1.maven.org/maven2/org/spark-project/spark/unused/1.0.0/unused-1.0.0.jar

启动参数:

Usage: ./sbin/start-connect-server.sh [options]

Options:
  --master MASTER_URL         spark://host:port, mesos://host:port, yarn,
                              k8s://https://host:port, or local (Default: local[*]).
  --deploy-mode DEPLOY_MODE   Whether to launch the driver program locally ("client") or
                              on one of the worker machines inside the cluster ("cluster")
                              (Default: client).
  --class CLASS_NAME          Your application's main class (for Java / Scala apps).
  --name NAME                 A name of your application.
  --jars JARS                 Comma-separated list of jars to include on the driver
                              and executor classpaths.
  --packages                  Comma-separated list of maven coordinates of jars to include
                              on the driver and executor classpaths. Will search the local
                              maven repo, then maven central and any additional remote
                              repositories given by --repositories. The format for the
                              coordinates should be groupId:artifactId:version.
  --exclude-packages          Comma-separated list of groupId:artifactId, to exclude while
                              resolving the dependencies provided in --packages to avoid
                              dependency conflicts.
  --repositories              Comma-separated list of additional remote repositories to
                              search for the maven coordinates given with --packages.
  --py-files PY_FILES         Comma-separated list of .zip, .egg, or .py files to place
                              on the PYTHONPATH for Python apps.
  --files FILES               Comma-separated list of files to be placed in the working
                              directory of each executor. File paths of these files
                              in executors can be accessed via SparkFiles.get(fileName).
  --archives ARCHIVES         Comma-separated list of archives to be extracted into the
                              working directory of each executor.

  --conf, -c PROP=VALUE       Arbitrary Spark configuration property.
  --properties-file FILE      Path to a file from which to load extra properties. If not
                              specified, this will look for conf/spark-defaults.conf.

  --driver-memory MEM         Memory for driver (e.g. 1000M, 2G) (Default: 1024M).
  --driver-java-options       Extra Java options to pass to the driver.
  --driver-library-path       Extra library path entries to pass to the driver.
  --driver-class-path         Extra class path entries to pass to the driver. Note that
                              jars added with --jars are automatically included in the
                              classpath.

  --executor-memory MEM       Memory per executor (e.g. 1000M, 2G) (Default: 1G).

  --proxy-user NAME           User to impersonate when submitting the application.
                              This argument does not work with --principal / --keytab.

  --help, -h                  Show this help message and exit.
  --verbose, -v               Print additional debug output.
  --version,                  Print the version of current Spark.

 Spark Connect only:
   --remote CONNECT_URL       URL to connect to the server for Spark Connect, e.g.,
                              sc://host:port. --master and --deploy-mode cannot be set
                              together with this option. This option is experimental, and
                              might change between minor releases.

 Cluster deploy mode only:
  --driver-cores NUM          Number of cores used by the driver, only in cluster mode
                              (Default: 1).

 Spark standalone or Mesos with cluster deploy mode only:
  --supervise                 If given, restarts the driver on failure.

 Spark standalone, Mesos or K8s with cluster deploy mode only:
  --kill SUBMISSION_ID        If given, kills the driver specified.
  --status SUBMISSION_ID      If given, requests the status of the driver specified.

 Spark standalone, Mesos and Kubernetes only:
  --total-executor-cores NUM  Total cores for all executors.

 Spark standalone, YARN and Kubernetes only:
  --executor-cores NUM        Number of cores used by each executor. (Default: 1 in
                              YARN and K8S modes, or all available cores on the worker
                              in standalone mode).

 Spark on YARN and Kubernetes only:
  --num-executors NUM         Number of executors to launch (Default: 2).
                              If dynamic allocation is enabled, the initial number of
                              executors will be at least NUM.
  --principal PRINCIPAL       Principal to be used to login to KDC.
  --keytab KEYTAB             The full path to the file that contains the keytab for the
                              principal specified above.

 Spark on YARN only:
  --queue QUEUE_NAME          The YARN queue to submit to (Default: "default").

java整合connect client

参考官方文档,搭建客户端代码原本以为应该是比较简单的。。。

建立一个maven项目,添加pom

<dependencies>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql-api_2.12</artifactId>
      <version>3.5.0</version>
    </dependency>


    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-connect-client-jvm -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-connect-client-jvm_2.12</artifactId>
      <version>3.5.0</version>
    </dependency>

  </dependencies>

java代码

public class App 
{
    public static void main( String[] args )
    {
        SparkSession spark = SparkSession.builder().remote("sc://localhost").build();

        spark.stop();
    }
}

结果,报错了

查看spark的官方社区,找到了相同的问题:

https://issues.apache.org/jira/browse/SPARK-45255?jql=project%20%3D%20SPARK%20AND%20text%20~%20%22CacheLoader%22%20ORDER%20BY%20priority%20DESC%2C%20updated%20DESC

临时解决思路

shade包依赖生成项目

建立两个maven项目,一个叫shade-spark-connect。用于将缺失依赖shade倒入一个胖包。

定义pom

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>org.example</groupId>
  <artifactId>shade-spark-connect</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>shade-spark-connect</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <spark.shade.packageName>org.sparkproject.connect.client</spark.shade.packageName>
    <io.grpc.version>1.59.0</io.grpc.version>
    <netty.version>4.1.100.Final</netty.version>

  </properties>

  <dependencies>
    <dependency>
      <groupId>com.google.guava</groupId>
      <artifactId>guava</artifactId>
      <version>32.0.1-jre</version>
    </dependency>

    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>3.25.1</version>
    </dependency>
  </dependencies>


  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.1</version>
        <configuration>
          <shadedArtifactAttached>false</shadedArtifactAttached>
          <artifactSet>
            <!-- 包含 guava jar,但排除 com.google.thirdparty目录 -->
            <includes>
              <include>com.google.guava:*</include>
            </includes>
          </artifactSet>
          <transformers>
            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/>
          </transformers>
        </configuration>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <filters>
                <filter>
                  <artifact>*:*</artifact>
                  <excludes>
                    <exclude>com/google/thirdparty/**</exclude>
                  </excludes>
                </filter>
              </filters>
            </configuration>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.1</version>
        <configuration>
          <shadedArtifactAttached>false</shadedArtifactAttached>
          <artifactSet>

            <includes>
              <include>com.google.protobuf:**</include>
              <include>com.google.guava:**</include>


            </includes>
          </artifactSet>

          <!--                  依赖的 jar 包中的一些类文件打包到项目构建生成的 jar 包中,在打包的时候把类重命名-->
          <relocations>
            <relocation>
              <pattern>com.google.common</pattern>
              <shadedPattern>org.sparkproject.connect.client.com.google.common</shadedPattern>
              <includes>
                <include>com.google.common.*</include>
              </includes>

            </relocation>
            <relocation>
              <pattern>com.google.protobuf</pattern>
              <shadedPattern>org.sparkproject.connect.client.com.google.protobuf</shadedPattern>
              <includes>
                <include>com.google.protobuf.**</include>
              </includes>

            </relocation>
          </relocations>
          <transformers>
            <transformer
                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/>
          </transformers>
        </configuration>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

    </plugins>

  </build>
</project>

测试connect client的项目

建另一个maven项目,用于测试connect代码。

pom代码

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>org.example</groupId>
  <artifactId>spark-connect-demo</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>spark-connect-demo</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <io.grpc.version>1.59.0</io.grpc.version>
    <netty.version>4.1.100.Final</netty.version>

  </properties>

  <dependencies>
    <dependency>
      <groupId>io.grpc</groupId>
      <artifactId>grpc-netty</artifactId>
      <version>${io.grpc.version}</version>
    </dependency>
    <dependency>
      <groupId>io.grpc</groupId>
      <artifactId>grpc-protobuf</artifactId>
      <version>${io.grpc.version}</version>
    </dependency>
    <dependency>
      <groupId>io.grpc</groupId>
      <artifactId>grpc-services</artifactId>
      <version>${io.grpc.version}</version>
    </dependency>
    <dependency>
      <groupId>io.grpc</groupId>
      <artifactId>grpc-stub</artifactId>
      <version>${io.grpc.version}</version>
    </dependency>
    <dependency>
      <groupId>io.grpc</groupId>
      <artifactId>grpc-inprocess</artifactId>
      <version>${io.grpc.version}</version>
    </dependency>
    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-codec-http2</artifactId>
      <version>${netty.version}</version>
    </dependency>
    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-handler-proxy</artifactId>
      <version>${netty.version}</version>
    </dependency>
    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-transport-native-unix-common</artifactId>
      <version>${netty.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-connect-client-jvm_2.12</artifactId>
      <version>3.5.0</version>
    </dependency>

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.example</groupId>
      <artifactId>demo</artifactId>
      <version>1.0</version>
      <scope>system</scope>
      <systemPath>E:\opensource\shade-spark-connect\target\shade-spark-connect-1.0-SNAPSHOT.jar</systemPath>
    </dependency>
  </dependencies>


  <build>
   <plugins>


     <plugin>
       <groupId>org.scala-tools</groupId>
       <artifactId>maven-scala-plugin</artifactId>
       <version>2.12.2</version> <!-- 适用于你的 Scala 版本 -->
       <executions>
         <execution>
           <goals>
             <goal>compile</goal>
             <goal>testCompile</goal>
           </goals>
         </execution>
       </executions>
     </plugin>
   </plugins>


  </build>
</project>

其中E:\opensource\shade-spark-connect\target\shade-spark-connect-1.0-SNAPSHOT.jar指向上面shade-spark-connect使用package命令生成到包。

java代码

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.expressions._
object Myapp extends App {
   
   

  println(spark.version)
  val spark: SparkSession = SparkSession.builder.remote("sc://localhost").getOrCreate()
//using list to init  df
  val df = spark.createDataFrame(List(("a a a", 1), ("b b b b", 2), ("c c d a", 3))).toDF("value", "col2")
  df.show()


// word count
  val words = df.select(explode(split(col("value"), " ")).as("word"))
  val wordCounts = words.groupBy("word").count()
  wordCounts.show()


  spark.stop()

}

可以看到connect server在计算完返回的结果了

总结

我也尝试过使用PySpark的Connect客户端连接Connect服务器,按照官方文档的步骤进行操作是完全没有问题的。**猜测可能是因为目前更多的用户结合Python使用这个模块,而使用Scala和Java的用户相对较少。或许随着时间的推移,官方会解决这个问题。如果你希望尽早体验这一功能,可以按照文章中的方法迅速进行尝试。

相关实践学习
容器服务Serverless版ACK Serverless 快速入门:在线魔方应用部署和监控
通过本实验,您将了解到容器服务Serverless版ACK Serverless 的基本产品能力,即可以实现快速部署一个在线魔方应用,并借助阿里云容器服务成熟的产品生态,实现在线应用的企业级监控,提升应用稳定性。
云原生实践公开课
课程大纲 开篇:如何学习并实践云原生技术 基础篇: 5 步上手 Kubernetes 进阶篇:生产环境下的 K8s 实践 相关的阿里云产品:容器服务&nbsp;ACK 容器服务&nbsp;Kubernetes&nbsp;版(简称&nbsp;ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情:&nbsp;https://www.aliyun.com/product/kubernetes
相关文章
|
3月前
|
自然语言处理 算法
PIKA最新推出 Lip Sync(口型同步)
【2月更文挑战第9天】PIKA最新推出 Lip Sync(口型同步)
130 1
PIKA最新推出 Lip Sync(口型同步)
|
SQL 前端开发 Java
安装部署--impala 服务启动、关闭 | 学习笔记
快速学习 安装部署--impala 服务启动、关闭
753 0
安装部署--impala 服务启动、关闭 | 学习笔记
|
4月前
|
SQL 分布式计算 数据处理
Spark的生态系统概览:Spark SQL、Spark Streaming
Spark的生态系统概览:Spark SQL、Spark Streaming
|
5月前
|
分布式计算 Hadoop Java
Note_Spark_Day01:Spark 基础环境
Note_Spark_Day01:Spark 基础环境
53 0
|
5月前
|
资源调度 Kubernetes Java
Streampark使用体验与建议
Streampark使用体验与建议
81 0
|
9月前
|
SQL 机器学习/深度学习 分布式计算
Apache Doris Spark Load快速体验之Spark部署(1)1
Apache Doris Spark Load快速体验之Spark部署(1)1
93 0
|
9月前
|
分布式计算 Apache Spark
Apache Doris Spark Load快速体验之Spark部署(1)2
Apache Doris Spark Load快速体验之Spark部署(1)2
98 0
|
9月前
|
消息中间件 固态存储 Kafka
Apache Doris Routine Load快速体验之案例(2)1
Apache Doris Routine Load快速体验之案例(2)1
95 0
|
9月前
|
消息中间件 Kafka Apache
Apache Doris Routine Load快速体验之案例(2)2
Apache Doris Routine Load快速体验之案例(2)2
204 0
|
分布式计算 Apache Spark
《Apache Kylin Speed up Cubing with Spark》电子版地址
Apache Kylin: Speed up Cubing with Spark
59 0
《Apache Kylin Speed up Cubing with Spark》电子版地址