开发者学堂课程【大数据实时计算框架 Spark 快速入门:UpdateStateByKey、Tranform 算子_2】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/100/detail/1725
UpdateStateByKey、Tranform 算子_2
内容简介:
一、UpdateStateByKey 相关代码
二、Transform Operation(变换操作) 介绍
三、TraratormOperation 相关代码
一、UpdateStateByKey 相关代码
1
package
com.snsxt.stuay.streaming;
2
3
import
java.util.Arrays;
19
20
public class
UpdateStateByKeyWordcount{
21
22
public static void
main(string[] args){
23SparkConfconf=
new
SparkConf().setAppName("
UpdateStateByKeyWordcount
").setMaster("
local[2]
");
24
JavaStreamingContextjss
=
new
JavaStreamingContext(conf,Durations.seconds(5));
25
jssd.checkpoint(".);
26
27
JavaReceiverInputDstream<String> linesmissc.socketTextStream("
node24
",8888);
28
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String,string>()(
29
30
private static final long
serialVersionUID
= 1L;
31
32
@Override
33
public
Iterable<String> call(string line)
throws
Exception{
34
return
Arrays.asList(line.split(""));
35
36
});
37
JavaPairDStream<String, Integer> pairs =words.mapToPair(new PairFunction<String, string, Integer>()(
38
39
private static final long
serialversionUID
=1L;
40
41
@Override
42
public
Tuple2<String, Integer> call(string word)throws Exception{
43
return new
Tuple2<String, Integer>(word,1);
44}
45
);
46
JavaPairDStream<String,Integer>wordcountspairs.updateStateByKey(new Function2<List<Integer>, Optionale
47
4
8
private static final long
serialVersionUID
= 1L;
49
50 //实际上,对于每个单词,每次 batch 计算的时候,都会调用这个函数,第一个参数 values 相当于这个 batch 中
51 // 这个 key 对应的新的一组值,可能有多个,可能 2 个 1,(xuruyun,1)(xuruyun,1),那么这个 values 就是(1,1)
52//那么第二个参数表示的是这个 key 之前的状态,我们看类型 Integer 也就知道了,这里是泛型自己指定的
二、Transform Operation(变换操作)
The transform operation(along with is variations like transformwith)allows arbitraryRDO-to-RDD functions to be applyed on a DStream. it can be used to apply any ROD operation that is not expdsed in the DStream API. For example,the functionality of joining every batch in a data stream wth another dataset is not directly exposed in the DStream API.However, you can easily use transform to do this. This enables very powerful possibilities.For example,one can do real-time data cleaning by joining the input data stream with precomputed spam information (maybe generated with Spark as well) and then filtering based on it.
译文:变换操作(类似于 transforn with 的变体)允许将任意的 RDO 到 RDD 函数应用于 DStream 。它可以用于应用任何在 DStream API 中没有展开的 ROD 操作。例如,在数据流中加入每个批处理的功能,另一个数据集不会直接暴露在 DS Stream API.不过,您可以轻松地使用 transform 来完成此操作。这非常具有可能性。例如,可以通过将输入数据流与预先计算的垃圾邮件信息(也可能使用 Spark 生成)连接起来,然后根据这些信息进行匹配,从而实现实时数据清理。
三、TraratormOperation 相关代码
1
package
com.shsxt.study.streaming;
2
3
import
java.util.ArrayList;
20
21
public class
TransformOperation{
22
23
public static void
main(String[] args)(
24
SparkConf conf=
new
SparkConf().setAppName("
TransformOperation
").setMaster("local[2]");
25JavaStreamingContex
jssc=
new
JavaStreamingContext(conf,Durations.seconds(20));
26
27 //用户对于网上的广告可以进行点击!点击之后可以进行实时计算,但是有些用户就是刷广告!
28//所以说我们要有一个黑名单机制!只要是黑名单中的用户点击的广告,我们就给过滤掉!
29
30 // 先来模拟一个黑名单数据 RDD,true 代表启用,false 代表不启用!
31 List<Tuple2<String,Boolean>> blacklist =
new
ArrayList<Tuple2<String, Boolean>>();
32
blacklist.add(
new
Tuple2<String,Boolean>("
yasaka
",
true
));
33 blacklist.add(
new
Tuple2<String,Boolean>("
xuruyun
",
false
));
34
35
final
JavaPairRDO<String,Boolean>blacklistRDD =jssc.se().parallelizePairs(blacklist);
36
37
// time ad
I
d name
38
JavaReceiverInputDStream<String>adsClickLogDStream =jssc.socketTextStream("node24",8888);
39
40
JavaPairDStream<String,String>adsClickLogPairDStream = adsClickLogDStream.mapToPair(new
PairFunction<String, String, String>()
41
42
private static final long
serialVersionuID
=
1L;
43
44
@Override
45
public
Tuple2<String, String> call(string line)throws Exception(
46
return new
Tuple2<String,String>(line.split("")[2],line);
47
48));
49
50JavaDStream<String>normalLogs=adsClickLogPairDStream.transform(new Function<JavaPairRDD<String,String>, JavaRDD<String() {
51
52
private static final long
serialVersionuID
= 1L;
53
54
@Override
55
public
JavaRDD<String> call(JavaPairRDD<String, String> userLogBatchrDo)
56
throws
Exception{
57
58
JavaPairRDD<String,Tuple2<String,Optional<Boolean>>> joined
R
DD
=
userLogBatchRDD.leftOuterJoin(blacklist
R
DD);
59
60
61 JavaPairRDD<String,Tuple2<String, Optional<Boolean>>> filteredRDD =
62 joinedRDD.filter(
new
Function<Tuple2<String,Tuple2<String,Optional<Boolean>>>,Boolean>()
63
64
private static final long
serialVersionUID
= 1L;
65
66 @Override
67
public
Boolean call(Tuple2<String,Tuple2<String,Optional<Boolean>>> tuple)
68
throws
Exception{
69
70
if
(tuple.2.2.isPresent()&&tuple.2.2.get()){
71
return false
;
72}
73
74
return
true
;
75}