前言
Spark作为大数据处理神器,对大数据处理能力我们比较熟知,经常有小伙伴问我怎么去入手开发Spark的程序,很多人因为所谓的没有大数据环境问题导致入门到放弃,深有感触,把一些心得分享出来,希望会优先帮助。
本地模式
事实上spark到了2.x版本之后,在window上面编程和运行Spark程序其实完全没啥问题,不一定非要跑到linux上面或者需要MAC本子才能干活,本地模式运行可以很方便让我们调试,单步运行的工作,强烈推荐。
比如说我之前的UDF开发程序,完全就是在本地模式下面完成开发和测试,这部分工作做完之后,再把程序上层到大环境上面执行,可以大大加快效率。
本机上面模拟hive环境问题
Spark在本机上面运行执行了 .enableHiveSupport()操作时候会在本机生成一个hive的仓库,这个仓库其实完全可以自由建表导数据的工作和hive中的做法其实是一样的,更为关键的是,你的仓库其实可以从外部盘设置,每次运行的时候可以复用
SparkSession spark = SparkSession .builder() .appName("MultiplyRowTest") .config("spark.sql.warehouse.dir", warehouseLocation) //配置一个本机的路径 .master("local[*]") .enableHiveSupport() //开启Hive支持 .getOrCreate();
Spark的例子中提供了建表导入数据的操作,只是很多人不清楚这个其实是可以直接在windows本机执行的
spark.sql("create table if not exists src (key int, value string) using hive"); spark.sql("load data local inpath 'src/main/resources/kv1.txt' into table src");
因为数据本身在本地,所以我们的SQL执行的时候也是从本机去读取数据的,并不需要外部的hdfs读取数据,不过反过来说当这个表指向一个远程的hdfs路径的时候,自然的会去读取远程的文件了
Scala语言的便利性
SQL语句拼接很清晰
Spark的程序是可以用Java和Scala一起写的,但是Scala的便利性,尤其是是写SQL和操作数据的时候,可以简化很多,比如说我们写SQL的时候,Java帮我们拼接SQL语句会搞很多的+""这种操作,scala里面就清晰很多了
val wordSql= """ select count(*) as cnt,word from ( select explode(split(line,' ')) as word from tbl_word ) t group by word |""".stripMargin //单词计数 session.sql(wordSql).show(10)
可以直接写,SQL会清晰很多
隐式转换造数据很方便
import session.implicits._ val df=List( "Hello Me", "Hello You", "Hello World", "Hello World", "Hello World", "Hello World" ).toDF("line") df.show(10) df.printSchema() df.createOrReplaceTempView("tbl_word")
SparkCore的单步
首先一点,我们的Spark可以直接在Idea中导入和调试的,所以直接导入没问题,不过也是建议选择高点的版本,我是2.4.5
但是由于Spark的代码是比较多的,单步起来带来了模块编译,我们有时候就是想简单看看某个数据处理的过程,但是也不用看太多代码,分享一个小技巧,就是在自己的代码里面取名字和包名完全一样,然后把需要跟踪的代码直接复制进去,这样子我们在运行的时候就会优先使用到你的代码了
比如说我为了追踪Shuffle文件读写的过程,我就把BlockStoreShuffleReader这个类直接写到工程里面去,然后我自己其实需要跟踪嘛,就直接把我想看到的信息直接打印一下,这里其实debug也行,都随意,这种操作在我们有时候希望单步到某次执行过程的时候很有帮助。
我们那个单词统计程序,我们就可以比较方便看到文件写入本机磁盘和从磁盘中读取的过程了
SparkUI本机查看
Spark的UI在本机执行之后会随着会话退出而消失,我们希望看到ui,直接在代码最后加一个Sleep的操作即可
TimeUnit.HOURS.sleep(1)
执行程序的会生成端口为4040的地址,注意观察日志
点开之后就可以找到我们的UI了,我这里是需要看bucket是否生效,一目了然,这种方式对于测试一些SQL优化策略极其有用
模板工程
这个自然是需要有一个引入maven配置和打包的模板,准备好之后可以copy,好处自然不必说,我这边提供一份
<?xml version="1.0" encoding="UTF-8"?> <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.apache.spark</groupId> <artifactId>spark-examples</artifactId> <version>0.1-SNAPSHOT</version> <name>spark-examples</name> <organization> <name>apache</name> </organization> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> <scala.version>2.11.8</scala.version> <scala.binary.version>2.11</scala.binary.version> <spark.tdw.version>3.6.0</spark.tdw.version> <spark.community.version>2.4.3</spark.community.version> <spark.deps.scope>provided</spark.deps.scope> </properties> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.community.version}</version> <scope>${spark.deps.scope}</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.community.version}</version> <scope>${spark.deps.scope}</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_2.11</artifactId> <version>${spark.community.version}</version> <scope>${spark.deps.scope}</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>${spark.community.version}</version> <scope>${spark.deps.scope}</scope> </dependency> <dependency> <groupId>com.github.scopt</groupId> <artifactId>scopt_${scala.binary.version}</artifactId> <version>3.7.0</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.48</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> </dependency> </dependencies> <repositories> <repository> <id>alimaven</id> <name>aliyun maven</name> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> <build> <plugins> <!-- bind the maven-assembly-plugin to the package phase this will create a jar file without the storm dependencies suitable for deployment to a cluster. --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <id>scala-compile-first</id> <phase>process-resources</phase> <goals> <goal>compile</goal> </goals> </execution> <execution> <id>scala-test-compile-first</id> <phase>process-test-resources</phase> <goals> <goal>testCompile</goal> </goals> </execution> <execution> <id>attach-scaladocs</id> <phase>verify</phase> <goals> <goal>doc-jar</goal> </goals> </execution> </executions> <configuration> <scalaVersion>${scala.version}</scalaVersion> <recompileMode>incremental</recompileMode> <useZincServer>true</useZincServer> <args> <arg>-unchecked</arg> <arg>-deprecation</arg> <arg>-feature</arg> </args> <javacArgs> <javacArg>-source</javacArg> <javacArg>${java.version}</javacArg> <javacArg>-target</javacArg> <javacArg>${java.version}</javacArg> </javacArgs> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.5.3</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> <encoding>${project.build.sourceEncoding}</encoding> <maxmem>1024m</maxmem> <fork>true</fork> </configuration> </plugin> </plugins> <resources> <resource> <directory>src/main/resources</directory> </resource> </resources> </build> <profiles> <profile> <id>spark-local</id> <properties> <spark.deps.scope>compile</spark.deps.scope> </properties> </profile> </profiles> </project>
后记
几个小技巧都是平时大量使用的,因为程序的东西大部分还是实践容易加深理解