介绍
在Apache Spark 3.4中,引入了一个解耦的客户端-服务器架构的新模块Spark Connect,允许使用DataFrame API和未解析的逻辑计划作为协议远程连接到Spark集群。客户端和服务器之间的分离允许Spark及其开放生态系统从任何地方利用。它可以嵌入到现代数据应用程序,IDE,笔记本电脑和编程语言中。
工作原理
Spark Connect轻量级客户端将DataFrame操作转换为使用协议缓冲区编码的未解析逻辑查询计划。这些计划通过gRPC框架发送到服务器。
服务器上的Spark Connect服务端接收未解析的逻辑计划并将其转换为Spark的逻辑计划操作符,类似于解析SQL查询,其中解析属性和关系并构建初始解析计划。然后,标准的Spark执行过程启动,Spark Connect会利用Spark的所有优化和增强。最终的结果通过gRPC作为Apache Arrow编码的行批处理流回客户端。
场景用途
- 稳定性:使用过多内存的应用程序现在只会影响它们自己的环境,因为它们可以在自己的进程中运行。用户可以在客户端上定义自己的依赖项,而不需要担心与Spark驱动程序的潜在冲突。
- 可升级性:Spark驱动程序现在可以独立于应用程序进行无缝升级,例如从性能改进和安全修复中受益。这意味着应用程序可以向前兼容,只要服务器端RPC定义被设计为向后兼容。
- 可调试性和可观察性:Spark Connect支持在开发过程中直接从您喜爱的IDE进行交互式调试。类似地,可以使用应用程序的框架原生指标和日志库来监视应用程序。
简单总结一下,就是可以基于轻量级的依赖整合到java体系项目或者python体系项目,甚至是go体系的项目中,然后实现轻松调用spark connect服务端所在大数据集群的能力,进行数据分析或者数据处理。以后想在web项目中调用spark的能力不再需要引入过多的依赖导致依赖冲突等问题,也不会因为driver端和web服务整合在一起而导致不稳定。
快速上手
截至发稿时,Spark已经更新到3.5.0版本。下面的案例都以3.5.0版本为主。这里主要实践中Java项目中引入Spark Connect客户端,连接远程的Spark Connect服务器。
搭建connect server
启动包含jdk1.8的容器进行搭建
docker run -it -p 15002:15002 -p 4040:4040 --name spark --rm registry.cn-hangzhou.aliyuncs.com/udh/jdk:1.8.141 bash
容器内下载spark3.5.0的包
wget https://dlcdn.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
容器内启动connect server
./sbin/start-connect-server.sh --jars /root/.ivy2/jars/org.apache.spark_spark-connect_2.12-3.5.0.jar,/root/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar
需要在容器内提前下载好两个依赖的jar。
https://repo1.maven.org/maven2/org/spark-project/spark/unused/1.0.0/unused-1.0.0.jar
启动参数:
Usage: ./sbin/start-connect-server.sh [options]
Options:
--master MASTER_URL spark://host:port, mesos://host:port, yarn,
k8s://https://host:port, or local (Default: local[*]).
--deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or
on one of the worker machines inside the cluster ("cluster")
(Default: client).
--class CLASS_NAME Your application's main class (for Java / Scala apps).
--name NAME A name of your application.
--jars JARS Comma-separated list of jars to include on the driver
and executor classpaths.
--packages Comma-separated list of maven coordinates of jars to include
on the driver and executor classpaths. Will search the local
maven repo, then maven central and any additional remote
repositories given by --repositories. The format for the
coordinates should be groupId:artifactId:version.
--exclude-packages Comma-separated list of groupId:artifactId, to exclude while
resolving the dependencies provided in --packages to avoid
dependency conflicts.
--repositories Comma-separated list of additional remote repositories to
search for the maven coordinates given with --packages.
--py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place
on the PYTHONPATH for Python apps.
--files FILES Comma-separated list of files to be placed in the working
directory of each executor. File paths of these files
in executors can be accessed via SparkFiles.get(fileName).
--archives ARCHIVES Comma-separated list of archives to be extracted into the
working directory of each executor.
--conf, -c PROP=VALUE Arbitrary Spark configuration property.
--properties-file FILE Path to a file from which to load extra properties. If not
specified, this will look for conf/spark-defaults.conf.
--driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 1024M).
--driver-java-options Extra Java options to pass to the driver.
--driver-library-path Extra library path entries to pass to the driver.
--driver-class-path Extra class path entries to pass to the driver. Note that
jars added with --jars are automatically included in the
classpath.
--executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
--proxy-user NAME User to impersonate when submitting the application.
This argument does not work with --principal / --keytab.
--help, -h Show this help message and exit.
--verbose, -v Print additional debug output.
--version, Print the version of current Spark.
Spark Connect only:
--remote CONNECT_URL URL to connect to the server for Spark Connect, e.g.,
sc://host:port. --master and --deploy-mode cannot be set
together with this option. This option is experimental, and
might change between minor releases.
Cluster deploy mode only:
--driver-cores NUM Number of cores used by the driver, only in cluster mode
(Default: 1).
Spark standalone or Mesos with cluster deploy mode only:
--supervise If given, restarts the driver on failure.
Spark standalone, Mesos or K8s with cluster deploy mode only:
--kill SUBMISSION_ID If given, kills the driver specified.
--status SUBMISSION_ID If given, requests the status of the driver specified.
Spark standalone, Mesos and Kubernetes only:
--total-executor-cores NUM Total cores for all executors.
Spark standalone, YARN and Kubernetes only:
--executor-cores NUM Number of cores used by each executor. (Default: 1 in
YARN and K8S modes, or all available cores on the worker
in standalone mode).
Spark on YARN and Kubernetes only:
--num-executors NUM Number of executors to launch (Default: 2).
If dynamic allocation is enabled, the initial number of
executors will be at least NUM.
--principal PRINCIPAL Principal to be used to login to KDC.
--keytab KEYTAB The full path to the file that contains the keytab for the
principal specified above.
Spark on YARN only:
--queue QUEUE_NAME The YARN queue to submit to (Default: "default").
java整合connect client
参考官方文档,搭建客户端代码原本以为应该是比较简单的。。。
建立一个maven项目,添加pom
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-api_2.12</artifactId>
<version>3.5.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-connect-client-jvm -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-connect-client-jvm_2.12</artifactId>
<version>3.5.0</version>
</dependency>
</dependencies>
java代码
public class App
{
public static void main( String[] args )
{
SparkSession spark = SparkSession.builder().remote("sc://localhost").build();
spark.stop();
}
}
结果,报错了
查看spark的官方社区,找到了相同的问题:
临时解决思路
shade包依赖生成项目
建立两个maven项目,一个叫shade-spark-connect。用于将缺失依赖shade倒入一个胖包。
定义pom
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>shade-spark-connect</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>shade-spark-connect</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<spark.shade.packageName>org.sparkproject.connect.client</spark.shade.packageName>
<io.grpc.version>1.59.0</io.grpc.version>
<netty.version>4.1.100.Final</netty.version>
</properties>
<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.0.1-jre</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.25.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<artifactSet>
<!-- 包含 guava jar,但排除 com.google.thirdparty目录 -->
<includes>
<include>com.google.guava:*</include>
</includes>
</artifactSet>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/>
</transformers>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>com/google/thirdparty/**</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<artifactSet>
<includes>
<include>com.google.protobuf:**</include>
<include>com.google.guava:**</include>
</includes>
</artifactSet>
<!-- 依赖的 jar 包中的一些类文件打包到项目构建生成的 jar 包中,在打包的时候把类重命名-->
<relocations>
<relocation>
<pattern>com.google.common</pattern>
<shadedPattern>org.sparkproject.connect.client.com.google.common</shadedPattern>
<includes>
<include>com.google.common.*</include>
</includes>
</relocation>
<relocation>
<pattern>com.google.protobuf</pattern>
<shadedPattern>org.sparkproject.connect.client.com.google.protobuf</shadedPattern>
<includes>
<include>com.google.protobuf.**</include>
</includes>
</relocation>
</relocations>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/>
</transformers>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
测试connect client的项目
建另一个maven项目,用于测试connect代码。
pom代码
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>spark-connect-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>spark-connect-demo</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<io.grpc.version>1.59.0</io.grpc.version>
<netty.version>4.1.100.Final</netty.version>
</properties>
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${io.grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${io.grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-services</artifactId>
<version>${io.grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${io.grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-inprocess</artifactId>
<version>${io.grpc.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http2</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler-proxy</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-unix-common</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-connect-client-jvm_2.12</artifactId>
<version>3.5.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.example</groupId>
<artifactId>demo</artifactId>
<version>1.0</version>
<scope>system</scope>
<systemPath>E:\opensource\shade-spark-connect\target\shade-spark-connect-1.0-SNAPSHOT.jar</systemPath>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.12.2</version> <!-- 适用于你的 Scala 版本 -->
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
其中E:\opensource\shade-spark-connect\target\shade-spark-connect-1.0-SNAPSHOT.jar指向上面shade-spark-connect使用package命令生成到包。
java代码
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.expressions._
object Myapp extends App {
println(spark.version)
val spark: SparkSession = SparkSession.builder.remote("sc://localhost").getOrCreate()
//using list to init df
val df = spark.createDataFrame(List(("a a a", 1), ("b b b b", 2), ("c c d a", 3))).toDF("value", "col2")
df.show()
// word count
val words = df.select(explode(split(col("value"), " ")).as("word"))
val wordCounts = words.groupBy("word").count()
wordCounts.show()
spark.stop()
}
可以看到connect server在计算完返回的结果了
总结
我也尝试过使用PySpark的Connect客户端连接Connect服务器,按照官方文档的步骤进行操作是完全没有问题的。**猜测可能是因为目前更多的用户结合Python使用这个模块,而使用Scala和Java的用户相对较少。或许随着时间的推移,官方会解决这个问题。如果你希望尽早体验这一功能,可以按照文章中的方法迅速进行尝试。