Update StateByKey 算子. Tranform算子_ 1|学习笔记

简介: 快速学习 Update StateByKey 算子. Tranform 算子_ 1

开发者学堂课程【大数据实时计算框架 Spark 快速入门:Update StateByKey  算子. Tranform 算子_ 1】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/100/detail/1724


Update StateByKey  算子. Tranform 算子_ 1


具体操作如下:

package com. shsxt. study,streaming;

import java. sql.  Connection ;

public class  PersistMySQLWordcount {

public static void main(String[]args){

Sparkconf conf=new SparkConf().  settlaster ("local[1]"). setAppName ("Persist# ySQIWordcoun   JavaStreamingContextjsc =new  JavaStreamingContext (conf, Durations,seconds(5));

JavaDStream <String>lines=jssc. textFileStream ("hdfs://node21:8020/wordcount _ dir");  JavaDStream <String>words=lines,flatMap(new  FlatMapFunction <String, Stringy()(

private static final long serial  VersionUID =1L;

@Override

public Iterable<String>call(String line) throws Exception{

return Arrays. astist(line, split("“));

}

});  

JavaPairDStream <String,Integer>pairs=words.mapToPair(new  Pairfunction <String, string.

private static final long se  rialVersionUID =1L;

@Override

publicTuple2 <String, Integer>call(String word) throws Exception{

wordpress. print();

wordcounts . foreachRDD (new VoidFunction < 3avaPairRDD <String, Integer>>(){

private static final long serial  VersionUID =1L;

@Override public void call( JavaPairRDD <String,Integer> wordcountsRDD ) throws Exception{word countsRDD. foreach Partition(new Void Function<Iterator<Tuple2<String, Integer>>>(){

private static final long serial  VersionUID =1L;

@Override public void call(Iterator<Tuple2<String, Integer> wordcounts ) throws Exception{ Connection  conn= ConnectionPool . getConnection ();

Tuple2<String, Integer>wordcount=null;

while( wordcounts .hasNext()}{

wordpress= wordcounts . next();

String sql="insert into wordcount(word, count)"

+"values('"+wordcount._1+"',"+wordcount._2+")";

Statement stmt=conn. createStatement ();

stmt  executeUpdate (sql);

Conne ctionPool. returnConne  ction(conn);

}

});

jssc. start();

jssc. awaitTermination ();

jssc. close();

}

相关文章
|
7月前
|
存储 分布式计算 Scala
bigdata-36-Spark转换算子与动作算子
bigdata-36-Spark转换算子与动作算子
54 0
|
7月前
|
机器学习/深度学习 分布式计算 数据库连接
[Spark精进]必须掌握的4个RDD算子之filter算子
[Spark精进]必须掌握的4个RDD算子之filter算子
165 2
|
存储 缓存 分布式计算
Spark RDD算子进阶(转换算子、行动算子、缓存、持久化)(下)
Spark RDD算子进阶(转换算子、行动算子、缓存、持久化)(下)
157 0
Spark RDD算子进阶(转换算子、行动算子、缓存、持久化)(下)
|
分布式计算 大数据 开发者
RDD 算子_转换 join | 学习笔记
快速学习 RDD 算子_转换 join
107 0
RDD 算子_转换 join | 学习笔记
|
缓存 分布式计算 Scala
Spark RDD算子进阶(转换算子、行动算子、缓存、持久化)(上)
Spark RDD算子进阶(转换算子、行动算子、缓存、持久化)(上)
250 0
Spark RDD算子进阶(转换算子、行动算子、缓存、持久化)(上)
|
缓存 分布式计算 大数据
Spark RDD算子进阶(转换算子、行动算子、缓存、持久化)(中)
Spark RDD算子进阶(转换算子、行动算子、缓存、持久化)(中)
222 0
Spark RDD算子进阶(转换算子、行动算子、缓存、持久化)(中)
|
分布式计算 Spark
【Spark】(五)Spark Transformation(转换算子) 和 Action(执行算子)2
【Spark】(五)Spark Transformation(转换算子) 和 Action(执行算子)2
160 0
【Spark】(五)Spark Transformation(转换算子) 和 Action(执行算子)2
|
分布式计算 算法 Hadoop
【Spark】(五)Spark Transformation(转换算子) 和 Action(执行算子)1
【Spark】(五)Spark Transformation(转换算子) 和 Action(执行算子)1
461 0
【Spark】(五)Spark Transformation(转换算子) 和 Action(执行算子)1
|
分布式计算 大数据 数据库
Spark 算子操作剖析 3
快速学习 Spark 算子操作剖析 3
110 0
Spark 算子操作剖析 3
|
分布式计算 大数据 开发者
UpdateStateByKey、Tranform 算子_3|学习笔记
快速学习 UpdateStateByKey、Tranform 算子_3