UpdateStateByKey、Tranform 算子_2|学习笔记

简介: 快速学习 UpdateStateByKey、Tranform 算子_2

开发者学堂课程【大数据实时计算框架 Spark 快速入门:UpdateStateByKey、Tranform 算子_2】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/100/detail/1725


UpdateStateByKey、Tranform 算子_2


内容简介:

一、UpdateStateByKey 相关代码

二、Transform Operation变换操作介绍

三、TraratormOperation 相关代码

 

一、UpdateStateByKey 相关代码

1 package com.snsxt.stuay.streaming;

2

3 import java.util.Arrays;

19

20 public class UpdateStateByKeyWordcount{

21

22 public static void main(string[] args){

23SparkConfconf=newSparkConf().setAppName("UpdateStateByKeyWordcount").setMaster("local[2]");

24JavaStreamingContextjss= new JavaStreamingContext(conf,Durations.seconds(5));

25jssd.checkpoint(".);

26

27JavaReceiverInputDstream<String> linesmissc.socketTextStream("node24",8888);

28JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String,string>()(

29

30private static final long serialVersionUID = 1L;

31

32@Override

33 public Iterable<String> call(string line) throws Exception{

34 return Arrays.asList(line.split(""));

35

36});

37 JavaPairDStream<String, Integer> pairs =words.mapToPair(new PairFunction<String, string, Integer>()(

38

39private static final long serialversionUID=1L;

40

41@Override

42public Tuple2<String, Integer> call(string word)throws Exception{

43return new Tuple2<String, Integer>(word,1);

44}

45);

46 JavaPairDStream<String,Integer>wordcountspairs.updateStateByKey(new Function2<List<Integer>, Optionale

47

48private static final long serialVersionUID= 1L;

49

50   //实际上,对于每个单词,每次 batch 计算的时候,都会调用这个函数,第一个参values 相当于这个 batch 中

51   // 这个 key 对应的新的一组值,可能有多个,可能 2 个 1,(xuruyun,1)(xuruyun,1),那么这个 values 就是(1,1)

52//那么第二个参数表示的是这个 key 之前的状态,我们看类型 Integer 也就知道了,这里是泛型自己指定的

 

二、Transform Operation(变换操作)

The transform operation(along with is variations like transformwith)allows arbitraryRDO-to-RDD functions to be applyed on a DStream. it can be used to apply any ROD operation that is not expdsed in the DStream API. For example,the functionality of joining every batch in a data stream wth another dataset is not directly exposed in the DStream API.However, you can easily use transform to do this. This enables very powerful possibilities.For example,one can do real-time data cleaning by joining the input data stream with precomputed spam information (maybe generated with Spark as well) and then filtering based on it.

译文:变换操作(类似于 transforn with 的变体)允许将任意的 RDO 到 RDD 函数应用于 DStream 。它可以用于应用任何在 DStream API 中没有展开的 ROD 操作。例如,在数据流中加入每个批处理的功能另一个数据集不会直接暴露在 DS Stream API.不过,您可以轻松地使用 transform 来完成此操作。这非常具有可能性。例如,可以通过将输入数据流与预先计算的垃圾邮件信息(也可能使用 Spark 生成)连接起来,然后根据这些信息进行匹配,从而实现实时数据清理。

 

三、TraratormOperation 相关代码

1  package com.shsxt.study.streaming;

2

3  import java.util.ArrayList;

20

21 public class TransformOperation{

22

23  public static void main(String[] args)(

24 SparkConf conf=newSparkConf().setAppName("TransformOperation").setMaster("local[2]");

25JavaStreamingContex jssc=new JavaStreamingContext(conf,Durations.seconds(20));

26

27 //用户对于网上的广告可以进行点击!点击之后可以进行实时计算,但是有些用户就是刷广告!

28//所以说我们要有一个黑名单机制!只要是黑名单中的用户点击的广告,我们就给过掉!

29

30 // 先来模拟一个名单数据 RDDtrue 代表启用,false 代表不启用!

31 List<Tuple2<String,Boolean>> blacklist =new ArrayList<Tuple2<String, Boolean>>();

32blacklist.add(new Tuple2<String,Boolean>("yasaka", true));

33 blacklist.add(new Tuple2<String,Boolean>("xuruyun",false));

34

35finalJavaPairRDO<String,Boolean>blacklistRDD =jssc.se().parallelizePairs(blacklist);

36

37    // time adId name

38  JavaReceiverInputDStream<String>adsClickLogDStream =jssc.socketTextStream("node24",8888);

39

40  JavaPairDStream<String,String>adsClickLogPairDStream = adsClickLogDStream.mapToPair(new PairFunction<String, String, String>()

41

42  private static final long serialVersionuID=1L;

43

44  @Override

45 public Tuple2<String, String> call(string line)throws Exception(

46 return new Tuple2<String,String>(line.split("")[2],line);

47

48));

49

50JavaDStream<String>normalLogs=adsClickLogPairDStream.transform(new Function<JavaPairRDD<String,String>, JavaRDD<String() {

51

52  private static final long serialVersionuID = 1L;

53

54 @Override

55 public JavaRDD<String> call(JavaPairRDD<String, String> userLogBatchrDo)

56  throws Exception{

57

58 JavaPairRDD<String,Tuple2<String,Optional<Boolean>>> joinedRDD=userLogBatchRDD.leftOuterJoin(blacklistRDD);

59

60

61 JavaPairRDD<String,Tuple2<String, Optional<Boolean>>> filteredRDD =

62 joinedRDD.filter(new Function<Tuple2<String,Tuple2<String,Optional<Boolean>>>,Boolean>()

63

64 private static final long serialVersionUID = 1L;

65

66 @Override

67publicBoolean call(Tuple2<String,Tuple2<String,Optional<Boolean>>> tuple)

68 throws Exception{

69 

70 if(tuple.2.2.isPresent()&&tuple.2.2.get()){

71 return false;

72}

73

74return true;

75}

相关文章
|
15天前
|
存储 弹性计算 人工智能
【2025云栖精华内容】 打造持续领先,全球覆盖的澎湃算力底座——通用计算产品发布与行业实践专场回顾
2025年9月24日,阿里云弹性计算团队多位产品、技术专家及服务器团队技术专家共同在【2025云栖大会】现场带来了《通用计算产品发布与行业实践》的专场论坛,本论坛聚焦弹性计算多款通用算力产品发布。同时,ECS云服务器安全能力、资源售卖模式、计算AI助手等用户体验关键环节也宣布升级,让用云更简单、更智能。海尔三翼鸟云服务负责人刘建锋先生作为特邀嘉宾,莅临现场分享了关于阿里云ECS g9i推动AIoT平台的场景落地实践。
【2025云栖精华内容】 打造持续领先,全球覆盖的澎湃算力底座——通用计算产品发布与行业实践专场回顾
|
6天前
|
云安全 人工智能 安全
Dify平台集成阿里云AI安全护栏,构建AI Runtime安全防线
阿里云 AI 安全护栏加入Dify平台,打造可信赖的 AI
|
9天前
|
人工智能 运维 Java
Spring AI Alibaba Admin 开源!以数据为中心的 Agent 开发平台
Spring AI Alibaba Admin 正式发布!一站式实现 Prompt 管理、动态热更新、评测集构建、自动化评估与全链路可观测,助力企业高效构建可信赖的 AI Agent 应用。开源共建,现已上线!
877 31
|
9天前
|
机器学习/深度学习 人工智能 搜索推荐
万字长文深度解析最新Deep Research技术:前沿架构、核心技术与未来展望
近期发生了什么自 2025 年 2 月 OpenAI 正式发布Deep Research以来,深度研究/深度搜索(Deep Research / Deep Search)正在成为信息检索与知识工作的全新范式:系统以多步推理驱动大规模联网检索、跨源证据。
624 52
|
2天前
|
监控 BI 数据库
打工人救星!来看看这两家企业如何用Quick BI让业务更高效
Quick BI专业版监控告警助力企业高效运作,通过灵活配置规则与多渠道推送,让数据异常早发现、快响应,推动业务敏捷决策与持续增长。
打工人救星!来看看这两家企业如何用Quick BI让业务更高效
|
8天前
|
人工智能 Java Nacos
基于 Spring AI Alibaba + Nacos 的分布式 Multi-Agent 构建指南
本文将针对 Spring AI Alibaba + Nacos 的分布式多智能体构建方案展开介绍,同时结合 Demo 说明快速开发方法与实际效果。
586 42