import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.regex.Pattern; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.Optional; import org.apache.spark.api.java.StorageLevels; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; public final class 过滤黑名单{ public static void main(String[] args) throws Exception { if (args.length< 2) { System.err.println("需要传入参数:主机名端口号"); System.exit(1); } // 设置拉取数据的频率,即批处理的时间间隔为1秒 //控制台上显示的是每隔1000毫秒 SparkConf sparkConf = new SparkConf().setAppName ("JavaNetworkWordCount").setMaster("local[2]"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(10)); JavaReceiverInputDStream<String> filedata = ssc.socketTextStream( args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER); JavaRDD<String> blackname= ssc.sparkContext().textFile("file:///homeq/eclipse-workspace/t33"); // JavaDStream<String> blackname=ssc.textFileStream("file:///homeq/eclipse-workspace/t33"); JavaDStream<String> words = filedata.map(f->f); JavaPairRDD<String,String> prd=blackname.mapToPair(f->new Tuple2<String,String>(f.split(" ")[0],f.split(" ")[1])); //此处的reduceByKey方法,每次只输出当次的操作记录, //不保留上次的记录信息。对应的就是只针对本次的key,values。 //要保留前次的操作记录。相对应的方法就是updateStateByKey。 JavaPairDStream<String, Tuple2<Tuple2<String, String>, Optional<String>>> pard=words.mapToPair (f->new Tuple2<String,Tuple2<String,String>>(f.split(" ")[1], new Tuple2<>(f.split(" ")[0],f.split(" ")[1]))).transformToPair(fs->{ JavaPairRDD<String, Tuple2<Tuple2<String, String>, Optional<String>>> prd2= fs.leftOuterJoin(prd); JavaPairRDD<String, Tuple2<Tuple2<String, String>, Optional<String>>> prd3= prd2.filter(f->!f._2._2().toString().contains("true")); return prd3; } ); JavaPairDStream<String,String> prd4=pard.mapToPair(f->f._2._1); prd4.print(); ssc.start(); // 启动Spark Streaming,开始计算 ssc.awaitTermination(); // 等待计算结束 } }