教材P164操作题。编写Spark Steaming程序,使用leftOuterJoin操作及filter方法过滤掉黑名单的数据

简介: 教材P164操作题。编写Spark Steaming程序,使用leftOuterJoin操作及filter方法过滤掉黑名单的数据
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
public final class 过滤黑名单{
        public static void main(String[] args) throws Exception 
  {
    if (args.length< 2) {
      System.err.println("需要传入参数:主机名端口号");
      System.exit(1);
              }
  // 设置拉取数据的频率,即批处理的时间间隔为1秒
  //控制台上显示的是每隔1000毫秒
    SparkConf  sparkConf = new SparkConf().setAppName
        ("JavaNetworkWordCount").setMaster("local[2]");
    JavaStreamingContext  ssc = new 
      JavaStreamingContext(sparkConf, Durations.seconds(10));
     JavaReceiverInputDStream<String> filedata = ssc.socketTextStream(
  args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
     JavaRDD<String>  blackname= ssc.sparkContext().textFile("file:///homeq/eclipse-workspace/t33");
//     JavaDStream<String> blackname=ssc.textFileStream("file:///homeq/eclipse-workspace/t33");
     JavaDStream<String> words = filedata.map(f->f);
     JavaPairRDD<String,String> prd=blackname.mapToPair(f->new Tuple2<String,String>(f.split(" ")[0],f.split(" ")[1]));
     //此处的reduceByKey方法,每次只输出当次的操作记录,
     //不保留上次的记录信息。对应的就是只针对本次的key,values。
     //要保留前次的操作记录。相对应的方法就是updateStateByKey。
     JavaPairDStream<String, Tuple2<Tuple2<String, String>, Optional<String>>> pard=words.mapToPair
        (f->new Tuple2<String,Tuple2<String,String>>(f.split(" ")[1],
            new Tuple2<>(f.split(" ")[0],f.split(" ")[1]))).transformToPair(fs->{
      JavaPairRDD<String, Tuple2<Tuple2<String, String>, Optional<String>>> prd2= fs.leftOuterJoin(prd);
      JavaPairRDD<String, Tuple2<Tuple2<String, String>, Optional<String>>> prd3=
             prd2.filter(f->!f._2._2().toString().contains("true"));
      return prd3;
            }
                );
     JavaPairDStream<String,String> prd4=pard.mapToPair(f->f._2._1);
prd4.print();
     ssc.start();     // 启动Spark Streaming,开始计算
  ssc.awaitTermination();    // 等待计算结束
    }
  }
相关文章
|
1月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
49 3
|
2月前
|
消息中间件 分布式计算 Java
Linux环境下 java程序提交spark任务到Yarn报错
Linux环境下 java程序提交spark任务到Yarn报错
41 5
|
1月前
|
SQL 分布式计算 大数据
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(一)
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(一)
30 0
|
1月前
|
SQL 分布式计算 大数据
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(二)
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(二)
29 0
|
1月前
|
SQL 分布式计算 大数据
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
37 0
|
3月前
|
存储 分布式计算 Java
|
3月前
|
分布式计算 监控 大数据
如何处理 Spark 中的倾斜数据?
【8月更文挑战第13天】
246 4
|
3月前
|
存储 缓存 分布式计算
|
3月前
|
SQL 存储 分布式计算
|
3月前
|
分布式计算 Apache 数据安全/隐私保护
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
49 1