UpdateStateByKey、Tranform 算子_2|学习笔记

简介: 快速学习 UpdateStateByKey、Tranform 算子_2

开发者学堂课程【大数据实时计算框架 Spark 快速入门:UpdateStateByKey、Tranform 算子_2】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/100/detail/1725


UpdateStateByKey、Tranform 算子_2


内容简介:

一、UpdateStateByKey 相关代码

二、Transform Operation变换操作介绍

三、TraratormOperation 相关代码

 

一、UpdateStateByKey 相关代码

1 package com.snsxt.stuay.streaming;

2

3 import java.util.Arrays;

19

20 public class UpdateStateByKeyWordcount{

21

22 public static void main(string[] args){

23SparkConfconf=newSparkConf().setAppName("UpdateStateByKeyWordcount").setMaster("local[2]");

24JavaStreamingContextjss= new JavaStreamingContext(conf,Durations.seconds(5));

25jssd.checkpoint(".);

26

27JavaReceiverInputDstream<String> linesmissc.socketTextStream("node24",8888);

28JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String,string>()(

29

30private static final long serialVersionUID = 1L;

31

32@Override

33 public Iterable<String> call(string line) throws Exception{

34 return Arrays.asList(line.split(""));

35

36});

37 JavaPairDStream<String, Integer> pairs =words.mapToPair(new PairFunction<String, string, Integer>()(

38

39private static final long serialversionUID=1L;

40

41@Override

42public Tuple2<String, Integer> call(string word)throws Exception{

43return new Tuple2<String, Integer>(word,1);

44}

45);

46 JavaPairDStream<String,Integer>wordcountspairs.updateStateByKey(new Function2<List<Integer>, Optionale

47

48private static final long serialVersionUID= 1L;

49

50   //实际上,对于每个单词,每次 batch 计算的时候,都会调用这个函数,第一个参values 相当于这个 batch 中

51   // 这个 key 对应的新的一组值,可能有多个,可能 2 个 1,(xuruyun,1)(xuruyun,1),那么这个 values 就是(1,1)

52//那么第二个参数表示的是这个 key 之前的状态,我们看类型 Integer 也就知道了,这里是泛型自己指定的

 

二、Transform Operation(变换操作)

The transform operation(along with is variations like transformwith)allows arbitraryRDO-to-RDD functions to be applyed on a DStream. it can be used to apply any ROD operation that is not expdsed in the DStream API. For example,the functionality of joining every batch in a data stream wth another dataset is not directly exposed in the DStream API.However, you can easily use transform to do this. This enables very powerful possibilities.For example,one can do real-time data cleaning by joining the input data stream with precomputed spam information (maybe generated with Spark as well) and then filtering based on it.

译文:变换操作(类似于 transforn with 的变体)允许将任意的 RDO 到 RDD 函数应用于 DStream 。它可以用于应用任何在 DStream API 中没有展开的 ROD 操作。例如,在数据流中加入每个批处理的功能另一个数据集不会直接暴露在 DS Stream API.不过,您可以轻松地使用 transform 来完成此操作。这非常具有可能性。例如,可以通过将输入数据流与预先计算的垃圾邮件信息(也可能使用 Spark 生成)连接起来,然后根据这些信息进行匹配,从而实现实时数据清理。

 

三、TraratormOperation 相关代码

1  package com.shsxt.study.streaming;

2

3  import java.util.ArrayList;

20

21 public class TransformOperation{

22

23  public static void main(String[] args)(

24 SparkConf conf=newSparkConf().setAppName("TransformOperation").setMaster("local[2]");

25JavaStreamingContex jssc=new JavaStreamingContext(conf,Durations.seconds(20));

26

27 //用户对于网上的广告可以进行点击!点击之后可以进行实时计算,但是有些用户就是刷广告!

28//所以说我们要有一个黑名单机制!只要是黑名单中的用户点击的广告,我们就给过掉!

29

30 // 先来模拟一个名单数据 RDDtrue 代表启用,false 代表不启用!

31 List<Tuple2<String,Boolean>> blacklist =new ArrayList<Tuple2<String, Boolean>>();

32blacklist.add(new Tuple2<String,Boolean>("yasaka", true));

33 blacklist.add(new Tuple2<String,Boolean>("xuruyun",false));

34

35finalJavaPairRDO<String,Boolean>blacklistRDD =jssc.se().parallelizePairs(blacklist);

36

37    // time adId name

38  JavaReceiverInputDStream<String>adsClickLogDStream =jssc.socketTextStream("node24",8888);

39

40  JavaPairDStream<String,String>adsClickLogPairDStream = adsClickLogDStream.mapToPair(new PairFunction<String, String, String>()

41

42  private static final long serialVersionuID=1L;

43

44  @Override

45 public Tuple2<String, String> call(string line)throws Exception(

46 return new Tuple2<String,String>(line.split("")[2],line);

47

48));

49

50JavaDStream<String>normalLogs=adsClickLogPairDStream.transform(new Function<JavaPairRDD<String,String>, JavaRDD<String() {

51

52  private static final long serialVersionuID = 1L;

53

54 @Override

55 public JavaRDD<String> call(JavaPairRDD<String, String> userLogBatchrDo)

56  throws Exception{

57

58 JavaPairRDD<String,Tuple2<String,Optional<Boolean>>> joinedRDD=userLogBatchRDD.leftOuterJoin(blacklistRDD);

59

60

61 JavaPairRDD<String,Tuple2<String, Optional<Boolean>>> filteredRDD =

62 joinedRDD.filter(new Function<Tuple2<String,Tuple2<String,Optional<Boolean>>>,Boolean>()

63

64 private static final long serialVersionUID = 1L;

65

66 @Override

67publicBoolean call(Tuple2<String,Tuple2<String,Optional<Boolean>>> tuple)

68 throws Exception{

69 

70 if(tuple.2.2.isPresent()&&tuple.2.2.get()){

71 return false;

72}

73

74return true;

75}

相关文章
|
存储 算法
halcon模板匹配实践(1)算子参数说明与算子简介
halcon模板匹配实践(1)算子参数说明与算子简介
780 0
|
存储 缓存 分布式计算
Spark RDD算子进阶(转换算子、行动算子、缓存、持久化)(下)
Spark RDD算子进阶(转换算子、行动算子、缓存、持久化)(下)
142 0
Spark RDD算子进阶(转换算子、行动算子、缓存、持久化)(下)
|
分布式计算
|
分布式计算 大数据 Spark
Rdd 算子_转换_回顾 | 学习笔记
快速学习 Rdd 算子_转换_回顾
Rdd 算子_转换_回顾 | 学习笔记
|
分布式计算 大数据 Scala
RDD 算子_转换_ foldByKey | 学习笔记
快速学习 RDD 算子_转换_ foldByKey
153 0
RDD 算子_转换_  foldByKey | 学习笔记
|
分布式计算 算法 大数据
Rdd 算子_转换_mapvalues | 学习笔记
快速学习 Rdd 算子_转换_mapvalues
125 0
Rdd 算子_转换_mapvalues | 学习笔记
|
分布式计算 大数据 开发者
RDD 算子_转换_ aggregateByKey | 学习笔记
快速学习 RDD 算子_转换_ aggregateByKey
106 0
RDD 算子_转换_ aggregateByKey | 学习笔记
|
分布式计算 算法 大数据
RDD 算子_转换_ combineByKey | 学习笔记
快速学习 RDD 算子_转换_ combineByKey
120 0
RDD 算子_转换_ combineByKey | 学习笔记
|
分布式计算 大数据 开发者
RDD 算子_转换 join | 学习笔记
快速学习 RDD 算子_转换 join
RDD 算子_转换 join | 学习笔记
|
分布式计算 大数据 Spark
RDD 算子_转换_排序 | 学习笔记
快速学习 RDD 算子_转换_排序
RDD 算子_转换_排序 | 学习笔记