IDEA构建Spark编程环境和用Scala实现PageRank算法

简介: 写在前边的话       以前在学习编写mapreduce时,由于没有shell,就是首先在eclipse里配置环境,利用eclipse的强大功能进行编写,调试,编译,最终打包到集群上运行,同样面对Spark的时候,虽然spark提供了强大的shell 脚本能力,但对于定期或者处理时间很长...

写在前边的话

      以前在学习编写mapreduce时,由于没有shell,就是首先在eclipse里配置环境,利用eclipse的强大功能进行编写,调试,编译,最终打包到集群上运行,同样面对Spark的时候,虽然spark提供了强大的shell 脚本能力,但对于定期或者处理时间很长的程序来说并不适合使用shell,所以这里我选用了强大IDEA 作为spark的开发环境

      环境说明(点击下载):Java1.8.1_101    /    Scala 2.11.8    /     Intellij DEA 2.16.2   /  Spark 1.6.2

     10分钟 帮你打开Scala的编程大门:点击阅读 

    注意事项(我掉过的坑):本地Scala编程时,注意环境与集群的一致性,由于我的集群是spark1.6.2,scala2.10.x,java1.7.x,而本地环境java和scala都比集群高了一个版本,所以本地打包在集群上运行时就会出现版本不匹配的错误(如果用到java时,也是一样的),这一点大家要十分注意

一:IDEA构建Spark编程环境

       部署参考文章:使用IntelliJ IDEA编写SparkPi直接在Spark中运行

      需要注意的有两个地方

      1:在官方给的example中需要加入两行代码

    conf.setMaster("spark://192.168.48.130:7077")     //指定你的spark集群

    spark.addJar("/home/master/SparkApp/SparkTest.jar")   //指明位置
           由于我是在本地打包好的jar,所以上传jar到linux下时,必须保证位置与 代码中的一致        

       2:在提交jar包时,出现错误

 

           可以看出是17行的问题,原代码中17行:

val slices = if (args.length > 0) args(0).toInt else 2
        所以在运行jar包时需要指定该参数,官网给出的样例是这样的( http://spark.apache.org/docs/latest/

./bin/run-example SparkPi 10
        所以这里要指定数目

/opt/spark/bin/spark-submit /home/master/SparkApp/SparkApp.jar 10  --class "SparkApp"
        最终的运行结果如下



          此时我们在看Spark的Web界面监控


二:Spark执行PageRank算法

         PageRank算法解析参考:点击阅读

         PageRank的MapReduce实现参考:点击阅读

                                                     

        Shell 运行如下:

scala> val links = sc.parallelize(
     |  Array(
     |   ('A', Array('D')), 
     |   ('B', Array('A')), 
     |   ('C', Array('A', 'B')), 
     |   ('D', Array('A', 'C'))
     |  )
     | )
links: org.apache.spark.rdd.RDD[(Char, Array[Char])] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> var ranks = sc.parallelize(
     |  Array(
     |   ('A', 1.0), 
     |   ('B', 1.0), 
     |   ('C', 1.0), 
     |   ('D', 1.0)
     |  )
     | )
ranks: org.apache.spark.rdd.RDD[(Char, Double)] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> for(i <- 1 to 6){
     |     val joinRdd = links.join(ranks)
     |     val contribsRdd = joinRdd.flatMap{
     |       case(srcURL, (links, rank)) => links.map(destURL => (destURL, rank / links.size))
     |     }
     |     ranks = contribsRdd.reduceByKey(_ + _).mapValues(0.15 + _ * 0.85)
     |     ranks.take(4).foreach(println)
     |     println()
     | }

         六次迭代的结果:

        

      代码注释:

//图的初始化
val links = sc.parallelize(
 Array(
  ('A', Array('D')), 
  ('B', Array('A')), 
  ('C', Array('A', 'B')), 
  ('D', Array('A', 'C'))
 )
)

//PR值的初始化
//这里可以用 var ranks  = links.mapValues(_=> 1.0)代替
var ranks = sc.parallelize(
 Array(
  ('A', 1.0), 
  ('B', 1.0), 
  ('C', 1.0), 
  ('D', 1.0)
 )
)

//6 为循环次数,这里可以自己设置
for(i <- 1 to 6){
    val joinRdd = links.join(ranks)    //连接两个rdd
    //计算来自其他网页的PR 贡献值
    val contribsRdd = joinRdd.flatMap{
      // 注意这里的links为模式匹配得到的值, 类型为Array[Char], 并非前面的ParallelCollectionRDD
      case(srcURL, (links, rank)) => links.map(destURL => (destURL, rank / links.size))
    }
    //ranks进行更新
    ranks = contribsRdd.reduceByKey(_ + _).mapValues(0.15 + _ * 0.85)
    //打印出ranks的值
    ranks.take(4).foreach(println)
    println()   //换行,便于观察
}

           打包PageRank算法在Spark集群上运行(Jar包下载:github

/opt/spark/bin/spark-submit /home/master/SparkApp/Spark.jar --class "PageRank"

          运行结果如下:

         

         可以看到和Shell脚本运行的结果是一样的

相关文章
|
2月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
74 0
|
2月前
|
Java 大数据 数据库连接
大数据-163 Apache Kylin 全量增量Cube的构建 手动触发合并 JDBC 操作 Scala
大数据-163 Apache Kylin 全量增量Cube的构建 手动触发合并 JDBC 操作 Scala
40 2
大数据-163 Apache Kylin 全量增量Cube的构建 手动触发合并 JDBC 操作 Scala
|
1月前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
2月前
|
数据可视化 关系型数据库 MySQL
【IDEA】配置mysql环境并创建mysql数据库
【IDEA】配置mysql环境并创建mysql数据库
266 0
|
4月前
|
缓存 Java 应用服务中间件
支付宝 网站支付Demo 案例【沙箱环境】IDEA如何配置启动Eclipse项目
该博客文章讲述了如何在IntelliJ IDEA中配置和启动一个使用Eclipse开发的支付宝网站支付Demo案例。文章详细记录了从导入项目到配置Tomcat,再到解决启动过程中遇到的问题的步骤。作者还分享了在IDEA中遇到的一些常见问题,如项目配置、依赖库添加、编码问题等,并提供了相应的解决方案。此外,文章还提供了支付效果的展示以及一些支付宝案例文档中需要修改的参数信息。
支付宝 网站支付Demo 案例【沙箱环境】IDEA如何配置启动Eclipse项目
|
4月前
|
Java Windows
【Azure 环境】IntelliJ IDEA Community Edition 2021.2.3登陆Azure账号时,无法切换到中国区
【Azure 环境】IntelliJ IDEA Community Edition 2021.2.3登陆Azure账号时,无法切换到中国区
|
6月前
|
分布式计算 Hadoop Java
MapReduce编程模型——在idea里面邂逅CDH MapReduce
MapReduce编程模型——在idea里面邂逅CDH MapReduce
99 15
|
5月前
|
网络协议 安全 Linux
在IntelliJ IDEA中使用固定公网地址远程SSH连接服务器环境进行开发
在IntelliJ IDEA中使用固定公网地址远程SSH连接服务器环境进行开发
122 2
|
7月前
|
Java 编译器 Scala
IDEA上的Scala环境搭建
本文指导如何搭建Scala开发环境。首先,安装Scala编译器`scala-2.12.10.msi`,通过DOS窗口验证安装成功。然后,在IDEA中,安装Scala插件,创建Maven工程,删除默认包,新建Scala源码包,并在其中创建Scala Object类。接着,配置项目结构,添加Scala SDK,确保Maven、Language Level和Compiler的bytecode版本设置正确。最后,编写并测试基本的Scala代码。
444 2
IDEA上的Scala环境搭建
|
6月前
|
网络协议 网络安全 数据安全/隐私保护
如何在IDEA中使用固定公网地址SSH远程连接服务器开发环境(三)
在IDEA中通过固定公网地址SSH远程连接服务器开发环境,需要配置固定TCP端口以避免地址随机变化。首先,升级cpolar至专业版及以上,然后在官网保留一个固定TCP地址。进入cpolar管理界面,编辑隧道信息,将保留的固定地址填入,更新隧道。最后,在IDEA中新建SSH连接,输入固定地址和端口,验证连接。成功后,即可稳定远程开发。