UpdateStateByKey、Tranform 算子_2|学习笔记

简介: 快速学习 UpdateStateByKey、Tranform 算子_2

开发者学堂课程【大数据实时计算框架 Spark 快速入门:UpdateStateByKey、Tranform 算子_2】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/100/detail/1725


UpdateStateByKey、Tranform 算子_2


内容简介:

一、UpdateStateByKey 相关代码

二、Transform Operation变换操作介绍

三、TraratormOperation 相关代码

 

一、UpdateStateByKey 相关代码

1 package com.snsxt.stuay.streaming;

2

3 import java.util.Arrays;

19

20 public class UpdateStateByKeyWordcount{

21

22 public static void main(string[] args){

23SparkConfconf=newSparkConf().setAppName("UpdateStateByKeyWordcount").setMaster("local[2]");

24JavaStreamingContextjss= new JavaStreamingContext(conf,Durations.seconds(5));

25jssd.checkpoint(".);

26

27JavaReceiverInputDstream<String> linesmissc.socketTextStream("node24",8888);

28JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String,string>()(

29

30private static final long serialVersionUID = 1L;

31

32@Override

33 public Iterable<String> call(string line) throws Exception{

34 return Arrays.asList(line.split(""));

35

36});

37 JavaPairDStream<String, Integer> pairs =words.mapToPair(new PairFunction<String, string, Integer>()(

38

39private static final long serialversionUID=1L;

40

41@Override

42public Tuple2<String, Integer> call(string word)throws Exception{

43return new Tuple2<String, Integer>(word,1);

44}

45);

46 JavaPairDStream<String,Integer>wordcountspairs.updateStateByKey(new Function2<List<Integer>, Optionale

47

48private static final long serialVersionUID= 1L;

49

50   //实际上,对于每个单词,每次 batch 计算的时候,都会调用这个函数,第一个参values 相当于这个 batch 中

51   // 这个 key 对应的新的一组值,可能有多个,可能 2 个 1,(xuruyun,1)(xuruyun,1),那么这个 values 就是(1,1)

52//那么第二个参数表示的是这个 key 之前的状态,我们看类型 Integer 也就知道了,这里是泛型自己指定的

 

二、Transform Operation(变换操作)

The transform operation(along with is variations like transformwith)allows arbitraryRDO-to-RDD functions to be applyed on a DStream. it can be used to apply any ROD operation that is not expdsed in the DStream API. For example,the functionality of joining every batch in a data stream wth another dataset is not directly exposed in the DStream API.However, you can easily use transform to do this. This enables very powerful possibilities.For example,one can do real-time data cleaning by joining the input data stream with precomputed spam information (maybe generated with Spark as well) and then filtering based on it.

译文:变换操作(类似于 transforn with 的变体)允许将任意的 RDO 到 RDD 函数应用于 DStream 。它可以用于应用任何在 DStream API 中没有展开的 ROD 操作。例如,在数据流中加入每个批处理的功能另一个数据集不会直接暴露在 DS Stream API.不过,您可以轻松地使用 transform 来完成此操作。这非常具有可能性。例如,可以通过将输入数据流与预先计算的垃圾邮件信息(也可能使用 Spark 生成)连接起来,然后根据这些信息进行匹配,从而实现实时数据清理。

 

三、TraratormOperation 相关代码

1  package com.shsxt.study.streaming;

2

3  import java.util.ArrayList;

20

21 public class TransformOperation{

22

23  public static void main(String[] args)(

24 SparkConf conf=newSparkConf().setAppName("TransformOperation").setMaster("local[2]");

25JavaStreamingContex jssc=new JavaStreamingContext(conf,Durations.seconds(20));

26

27 //用户对于网上的广告可以进行点击!点击之后可以进行实时计算,但是有些用户就是刷广告!

28//所以说我们要有一个黑名单机制!只要是黑名单中的用户点击的广告,我们就给过掉!

29

30 // 先来模拟一个名单数据 RDDtrue 代表启用,false 代表不启用!

31 List<Tuple2<String,Boolean>> blacklist =new ArrayList<Tuple2<String, Boolean>>();

32blacklist.add(new Tuple2<String,Boolean>("yasaka", true));

33 blacklist.add(new Tuple2<String,Boolean>("xuruyun",false));

34

35finalJavaPairRDO<String,Boolean>blacklistRDD =jssc.se().parallelizePairs(blacklist);

36

37    // time adId name

38  JavaReceiverInputDStream<String>adsClickLogDStream =jssc.socketTextStream("node24",8888);

39

40  JavaPairDStream<String,String>adsClickLogPairDStream = adsClickLogDStream.mapToPair(new PairFunction<String, String, String>()

41

42  private static final long serialVersionuID=1L;

43

44  @Override

45 public Tuple2<String, String> call(string line)throws Exception(

46 return new Tuple2<String,String>(line.split("")[2],line);

47

48));

49

50JavaDStream<String>normalLogs=adsClickLogPairDStream.transform(new Function<JavaPairRDD<String,String>, JavaRDD<String() {

51

52  private static final long serialVersionuID = 1L;

53

54 @Override

55 public JavaRDD<String> call(JavaPairRDD<String, String> userLogBatchrDo)

56  throws Exception{

57

58 JavaPairRDD<String,Tuple2<String,Optional<Boolean>>> joinedRDD=userLogBatchRDD.leftOuterJoin(blacklistRDD);

59

60

61 JavaPairRDD<String,Tuple2<String, Optional<Boolean>>> filteredRDD =

62 joinedRDD.filter(new Function<Tuple2<String,Tuple2<String,Optional<Boolean>>>,Boolean>()

63

64 private static final long serialVersionUID = 1L;

65

66 @Override

67publicBoolean call(Tuple2<String,Tuple2<String,Optional<Boolean>>> tuple)

68 throws Exception{

69 

70 if(tuple.2.2.isPresent()&&tuple.2.2.get()){

71 return false;

72}

73

74return true;

75}

相关文章
|
4天前
|
云安全 人工智能 自然语言处理
|
8天前
|
人工智能 Java API
Java 正式进入 Agentic AI 时代:Spring AI Alibaba 1.1 发布背后的技术演进
Spring AI Alibaba 1.1 正式发布,提供极简方式构建企业级AI智能体。基于ReactAgent核心,支持多智能体协作、上下文工程与生产级管控,助力开发者快速打造可靠、可扩展的智能应用。
793 17
|
11天前
|
数据采集 人工智能 自然语言处理
Meta SAM3开源:让图像分割,听懂你的话
Meta发布并开源SAM 3,首个支持文本或视觉提示的统一图像视频分割模型,可精准分割“红色条纹伞”等开放词汇概念,覆盖400万独特概念,性能达人类水平75%–80%,推动视觉分割新突破。
801 59
Meta SAM3开源:让图像分割,听懂你的话
|
2天前
|
人工智能 安全 小程序
阿里云无影云电脑是什么?最新收费价格个人版、企业版和商业版无影云电脑收费价格
阿里云无影云电脑是运行在云端的虚拟电脑,分企业版和个人版。企业版适用于办公、设计等场景,4核8G配置低至199元/年;个人版适合游戏、娱乐,黄金款14元/月起。支持多端接入,灵活按需使用。
235 164
|
9天前
|
搜索推荐 编译器 Linux
一个可用于企业开发及通用跨平台的Makefile文件
一款适用于企业级开发的通用跨平台Makefile,支持C/C++混合编译、多目标输出(可执行文件、静态/动态库)、Release/Debug版本管理。配置简洁,仅需修改带`MF_CONFIGURE_`前缀的变量,支持脚本化配置与子Makefile管理,具备完善日志、错误提示和跨平台兼容性,附详细文档与示例,便于学习与集成。
334 116
|
2天前
|
机器学习/深度学习 人工智能 自然语言处理
Z-Image:冲击体验上限的下一代图像生成模型
通义实验室推出全新文生图模型Z-Image,以6B参数实现“快、稳、轻、准”突破。Turbo版本仅需8步亚秒级生成,支持16GB显存设备,中英双语理解与文字渲染尤为出色,真实感和美学表现媲美国际顶尖模型,被誉为“最值得关注的开源生图模型之一”。
350 3
|
6天前
|
弹性计算 搜索推荐 应用服务中间件
阿里云服务器租用价格:一年、1小时及一个月收费标准及优惠活动参考
阿里云服务器优惠汇总:轻量应用服务器200M带宽38元/年起,ECS云服务器2核2G 99元/年、2核4G 199元/年,4核16G 89元/月,8核32G 160元/月,香港轻量服务器25元/月起,支持按小时计费,新老用户同享,续费同价,限时秒杀低至1折。
406 166

热门文章

最新文章