阿巴巴阿巴巴巴
package thisterm; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.Optional; import scala.Tuple2; public class task3_1 { public static void main(String[] args) { // TODO Auto-generated method stub SparkConf conf=new SparkConf().setAppName("filter").setMaster("local[2]"); JavaSparkContext sc=new JavaSparkContext(conf); JavaRDD<String> data = sc.textFile("file:///home/gyq/eclipse-workspace/student/student.txt"); JavaRDD<String> mathdata = sc.textFile("file:///home/gyq/eclipse-workspace/student/result_math.txt"); JavaRDD<String> bigdata = sc.textFile("file:///home/gyq/eclipse-workspace/student/result_bigdata.txt"); JavaPairRDD<String,Integer> prdd1=mathdata.mapToPair(f->new Tuple2<>(f.split(" ")[0],Integer.valueOf(f.split(" ")[2]))); JavaPairRDD<String,Integer> prdd2=bigdata.mapToPair(f->new Tuple2<>(f.split(" ")[0],Integer.valueOf(f.split(" ")[2])));//ID加成绩 JavaPairRDD<String,String> prdd3=data.mapToPair(f->new Tuple2<>(f.split(" ")[0],f.split(" ")[1]));//ID加名字 // JavaRDD<String> sortmath=mathdata.sortBy(f->Integer.valueOf(f.split(" ")[2]), false, 1); // List<String> math_5=sortmath.take(5); //bigdata.foreach(f->System.err.println(f)); // math_5.foreach(f->System.err.println(f)); //for(String aa:math_5) { // System.out.println(aa);//数学前五名的 //} // (2)找出单科成绩为100的学生ID /*JavaRDD<String> Sumcourse=mathdata.union(bigdata); JavaRDD<String> rdd1=Sumcourse.filter(f->f.split(" ")[2].equals("100"));//出单科成绩为100的学生的记录 JavaRDD<String> rdd2=rdd1.map(f->f.split(" ")[0]).distinct(); rdd2.foreach(f->System.err.println("单科成绩为100的学生ID: "+f)); */ // 3)输出每位学生的总成绩,要求将两个成绩表中学生ID相同的成绩相加。 JavaRDD<String> Sumcourse=mathdata.union(bigdata); JavaPairRDD<String,Integer> rdd3=Sumcourse.mapToPair(f->new Tuple2<>(f.split(" ")[0],Integer.valueOf(f.split(" ")[2]))); //3 JavaRDD<String> rdd3=Sumcourse.map(f->f.split(" ")[0]+" "+f.split(" ")[2]); JavaPairRDD<String,Integer> rdd4=rdd3.reduceByKey((x,y)->x+y);//ID+总成绩 // rdd4.foreach(f->System.err.println(f));//输出每位学生的总成绩 // (4)计算每一个学生的平均成绩) JavaPairRDD<String,Double> rddave=rdd4.mapToPair(f->new Tuple2<>(f._1,Double.valueOf(f._2)/2)); //rddave.foreach(f->System.err.println(f)); // (5)汇总学生成绩并以文本格式存储在HDFS上,数据汇总为学生ID,姓名,大数据成绩,数学成绩,总分,平均分。) JavaPairRDD<String, Tuple2<Integer, Double>> rdd5=rdd4.join(rddave); JavaPairRDD<String, Tuple2<Integer, Integer>> rdd6=prdd2.join(prdd1); JavaPairRDD<String, Tuple2<Tuple2<Integer, Integer>, Tuple2<Integer, Double>>> rdd7= rdd6.join(rdd5); JavaPairRDD<String, Tuple2<String, Tuple2<Tuple2<Integer, Integer>, Tuple2<Integer, Double>>>> rdd8= prdd3.join(rdd7); rdd5.foreach(f->System.err.println(f)); rdd6.foreach(f->System.err.println(f)); rdd8.foreach(f->System.err.println(f)); } }
import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.regex.Pattern; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.Optional; import org.apache.spark.api.java.StorageLevels; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; public final class 过滤黑名单{ public static void main(String[] args) throws Exception { if (args.length< 2) { System.err.println("需要传入参数:主机名端口号"); System.exit(1); } // 设置拉取数据的频率,即批处理的时间间隔为1秒 //控制台上显示的是每隔1000毫秒 SparkConf sparkConf = new SparkConf().setAppName ("JavaNetworkWordCount").setMaster("local[2]"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(10)); JavaReceiverInputDStream<String> filedata = ssc.socketTextStream( args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER); JavaRDD<String> blackname= ssc.sparkContext().textFile("file:///homeq/eclipse-workspace/t33"); // JavaDStream<String> blackname=ssc.textFileStream("file:///homeq/eclipse-workspace/t33"); JavaDStream<String> words = filedata.map(f->f); JavaPairRDD<String,String> prd=blackname.mapToPair(f->new Tuple2<String,String>(f.split(" ")[0],f.split(" ")[1])); //此处的reduceByKey方法,每次只输出当次的操作记录, //不保留上次的记录信息。对应的就是只针对本次的key,values。 //要保留前次的操作记录。相对应的方法就是updateStateByKey。 JavaPairDStream<String, Tuple2<Tuple2<String, String>, Optional<String>>> pard=words.mapToPair (f->new Tuple2<String,Tuple2<String,String>>(f.split(" ")[1], new Tuple2<>(f.split(" ")[0],f.split(" ")[1]))).transformToPair(fs->{ JavaPairRDD<String, Tuple2<Tuple2<String, String>, Optional<String>>> prd2= fs.leftOuterJoin(prd); JavaPairRDD<String, Tuple2<Tuple2<String, String>, Optional<String>>> prd3= prd2.filter(f->!f._2._2().toString().contains("true")); return prd3; } ); JavaPairDStream<String,String> prd4=pard.mapToPair(f->f._2._1); prd4.print(); ssc.start(); // 启动Spark Streaming,开始计算 ssc.awaitTermination(); // 等待计算结束 } }