教材P164操作题。编写Spark Steaming程序,使用leftOuterJoin操作及filter方法过滤掉黑名单的数据

简介: 教材P164操作题。编写Spark Steaming程序,使用leftOuterJoin操作及filter方法过滤掉黑名单的数据
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
public final class 过滤黑名单{
        public static void main(String[] args) throws Exception 
  {
    if (args.length< 2) {
      System.err.println("需要传入参数:主机名端口号");
      System.exit(1);
              }
  // 设置拉取数据的频率,即批处理的时间间隔为1秒
  //控制台上显示的是每隔1000毫秒
    SparkConf  sparkConf = new SparkConf().setAppName
        ("JavaNetworkWordCount").setMaster("local[2]");
    JavaStreamingContext  ssc = new 
      JavaStreamingContext(sparkConf, Durations.seconds(10));
     JavaReceiverInputDStream<String> filedata = ssc.socketTextStream(
  args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
     JavaRDD<String>  blackname= ssc.sparkContext().textFile("file:///homeq/eclipse-workspace/t33");
//     JavaDStream<String> blackname=ssc.textFileStream("file:///homeq/eclipse-workspace/t33");
     JavaDStream<String> words = filedata.map(f->f);
     JavaPairRDD<String,String> prd=blackname.mapToPair(f->new Tuple2<String,String>(f.split(" ")[0],f.split(" ")[1]));
     //此处的reduceByKey方法,每次只输出当次的操作记录,
     //不保留上次的记录信息。对应的就是只针对本次的key,values。
     //要保留前次的操作记录。相对应的方法就是updateStateByKey。
     JavaPairDStream<String, Tuple2<Tuple2<String, String>, Optional<String>>> pard=words.mapToPair
        (f->new Tuple2<String,Tuple2<String,String>>(f.split(" ")[1],
            new Tuple2<>(f.split(" ")[0],f.split(" ")[1]))).transformToPair(fs->{
      JavaPairRDD<String, Tuple2<Tuple2<String, String>, Optional<String>>> prd2= fs.leftOuterJoin(prd);
      JavaPairRDD<String, Tuple2<Tuple2<String, String>, Optional<String>>> prd3=
             prd2.filter(f->!f._2._2().toString().contains("true"));
      return prd3;
            }
                );
     JavaPairDStream<String,String> prd4=pard.mapToPair(f->f._2._1);
prd4.print();
     ssc.start();     // 启动Spark Streaming,开始计算
  ssc.awaitTermination();    // 等待计算结束
    }
  }
相关文章
|
5天前
|
存储 分布式计算 Java
|
5天前
|
分布式计算 监控 大数据
如何处理 Spark 中的倾斜数据?
【8月更文挑战第13天】
35 4
|
5天前
|
存储 缓存 分布式计算
|
5天前
|
SQL 存储 分布式计算
|
4天前
|
分布式计算 Apache 数据安全/隐私保护
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
11 1
|
1月前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之 Spark Local模式启动报错,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
1月前
|
SQL 分布式计算 数据处理
MaxCompute操作报错合集之使用Spark查询时函数找不到的原因是什么
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
1月前
|
分布式计算 数据处理 流计算
实时计算 Flink版产品使用问题之使用Spark ThriftServer查询同步到Hudi的数据时,如何实时查看数据变化
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
弹性计算 分布式计算 DataWorks
DataWorks产品使用合集之spark任务如何跨空间取表数据
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
22 1
|
2月前
|
分布式计算 DataWorks MaxCompute
MaxCompute操作报错合集之在Spark访问OSS时出现证书错误的问题,该如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。