3.1 spark编程

简介: 3.1 spark编程

阿巴巴阿巴巴巴

package thisterm;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import scala.Tuple2;
public class task3_1 {
  public static void main(String[] args) {
  // TODO Auto-generated method stub
  SparkConf conf=new SparkConf().setAppName("filter").setMaster("local[2]");
  JavaSparkContext sc=new JavaSparkContext(conf);
   JavaRDD<String> data = sc.textFile("file:///home/gyq/eclipse-workspace/student/student.txt");
   JavaRDD<String> mathdata = sc.textFile("file:///home/gyq/eclipse-workspace/student/result_math.txt");
   JavaRDD<String> bigdata = sc.textFile("file:///home/gyq/eclipse-workspace/student/result_bigdata.txt");
   JavaPairRDD<String,Integer> prdd1=mathdata.mapToPair(f->new Tuple2<>(f.split(" ")[0],Integer.valueOf(f.split(" ")[2])));
   JavaPairRDD<String,Integer> prdd2=bigdata.mapToPair(f->new Tuple2<>(f.split(" ")[0],Integer.valueOf(f.split(" ")[2])));//ID加成绩
   JavaPairRDD<String,String> prdd3=data.mapToPair(f->new Tuple2<>(f.split(" ")[0],f.split(" ")[1]));//ID加名字
//   JavaRDD<String> sortmath=mathdata.sortBy(f->Integer.valueOf(f.split(" ")[2]), false, 1);
//   List<String> math_5=sortmath.take(5);
   //bigdata.foreach(f->System.err.println(f));
//   math_5.foreach(f->System.err.println(f));
   //for(String aa:math_5) {
    // System.out.println(aa);//数学前五名的
   //}
//   (2)找出单科成绩为100的学生ID
   /*JavaRDD<String> Sumcourse=mathdata.union(bigdata);
   JavaRDD<String> rdd1=Sumcourse.filter(f->f.split(" ")[2].equals("100"));//出单科成绩为100的学生的记录
   JavaRDD<String> rdd2=rdd1.map(f->f.split(" ")[0]).distinct();
   rdd2.foreach(f->System.err.println("单科成绩为100的学生ID: "+f));
   */
//   3)输出每位学生的总成绩,要求将两个成绩表中学生ID相同的成绩相加。
   JavaRDD<String> Sumcourse=mathdata.union(bigdata);
   JavaPairRDD<String,Integer> rdd3=Sumcourse.mapToPair(f->new Tuple2<>(f.split(" ")[0],Integer.valueOf(f.split(" ")[2])));
  //3 JavaRDD<String> rdd3=Sumcourse.map(f->f.split(" ")[0]+" "+f.split(" ")[2]);
   JavaPairRDD<String,Integer> rdd4=rdd3.reduceByKey((x,y)->x+y);//ID+总成绩
  // rdd4.foreach(f->System.err.println(f));//输出每位学生的总成绩
//   (4)计算每一个学生的平均成绩)
   JavaPairRDD<String,Double> rddave=rdd4.mapToPair(f->new Tuple2<>(f._1,Double.valueOf(f._2)/2));
   //rddave.foreach(f->System.err.println(f));
//   (5)汇总学生成绩并以文本格式存储在HDFS上,数据汇总为学生ID,姓名,大数据成绩,数学成绩,总分,平均分。)
   JavaPairRDD<String, Tuple2<Integer, Double>> rdd5=rdd4.join(rddave);
   JavaPairRDD<String, Tuple2<Integer, Integer>> rdd6=prdd2.join(prdd1);
   JavaPairRDD<String, Tuple2<Tuple2<Integer, Integer>, Tuple2<Integer, Double>>> rdd7= rdd6.join(rdd5);
   JavaPairRDD<String, Tuple2<String, Tuple2<Tuple2<Integer, Integer>, Tuple2<Integer, Double>>>> rdd8= prdd3.join(rdd7);
   rdd5.foreach(f->System.err.println(f)); 
   rdd6.foreach(f->System.err.println(f)); 
   rdd8.foreach(f->System.err.println(f)); 
  }
}

49.1.png

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
public final class 过滤黑名单{
       public static void main(String[] args) throws Exception 
  {
  if (args.length< 2) {
    System.err.println("需要传入参数:主机名端口号");
    System.exit(1);
             }
  // 设置拉取数据的频率,即批处理的时间间隔为1秒
  //控制台上显示的是每隔1000毫秒
  SparkConf  sparkConf = new SparkConf().setAppName
    ("JavaNetworkWordCount").setMaster("local[2]");
  JavaStreamingContext  ssc = new 
    JavaStreamingContext(sparkConf, Durations.seconds(10));
    JavaReceiverInputDStream<String> filedata = ssc.socketTextStream(
  args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
    JavaRDD<String>  blackname= ssc.sparkContext().textFile("file:///homeq/eclipse-workspace/t33");
//    JavaDStream<String> blackname=ssc.textFileStream("file:///homeq/eclipse-workspace/t33");
    JavaDStream<String> words = filedata.map(f->f);
   JavaPairRDD<String,String> prd=blackname.mapToPair(f->new Tuple2<String,String>(f.split(" ")[0],f.split(" ")[1]));
    //此处的reduceByKey方法,每次只输出当次的操作记录,
    //不保留上次的记录信息。对应的就是只针对本次的key,values。
    //要保留前次的操作记录。相对应的方法就是updateStateByKey。
    JavaPairDStream<String, Tuple2<Tuple2<String, String>, Optional<String>>> pard=words.mapToPair
    (f->new Tuple2<String,Tuple2<String,String>>(f.split(" ")[1],
      new Tuple2<>(f.split(" ")[0],f.split(" ")[1]))).transformToPair(fs->{
    JavaPairRDD<String, Tuple2<Tuple2<String, String>, Optional<String>>> prd2= fs.leftOuterJoin(prd);
    JavaPairRDD<String, Tuple2<Tuple2<String, String>, Optional<String>>> prd3=
        prd2.filter(f->!f._2._2().toString().contains("true"));
    return prd3;
      }
        );
    JavaPairDStream<String,String> prd4=pard.mapToPair(f->f._2._1);
prd4.print();
    ssc.start();     // 启动Spark Streaming,开始计算
  ssc.awaitTermination();    // 等待计算结束
   }
  }

相关文章
|
1月前
|
分布式计算 并行计算 大数据
Spark学习---day02、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---day02、Spark核心编程 RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
75 1
|
4月前
|
SQL 分布式计算 大数据
【大数据技术Spark】DStream编程操作讲解实战(图文解释 附源码)
【大数据技术Spark】DStream编程操作讲解实战(图文解释 附源码)
43 0
|
2月前
|
SQL 分布式计算 API
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
57 2
|
4月前
|
存储 分布式计算 负载均衡
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
61 0
|
1月前
|
分布式计算 Java Scala
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
41 1
|
1月前
|
SQL 分布式计算 Java
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
107 1
|
2月前
|
分布式计算 并行计算 Hadoop
Spark学习---day02、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---day02、Spark核心编程 RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
42 1
|
3月前
|
存储 分布式计算 Apache
Spark编程范例:Word Count示例解析
Spark编程范例:Word Count示例解析
|
8月前
|
存储 分布式计算 并行计算
Spark学习---2、SparkCore(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---2、SparkCore(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
|
4月前
|
分布式计算 数据处理 Spark
Spark【RDD编程(四)综合案例】
Spark【RDD编程(四)综合案例】