开发者学堂课程【大数据实时计算框架 Spark 快速入门:Update StateByKey 算子. Tranform 算子_ 1】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/100/detail/1724
Update StateByKey 算子. Tranform 算子_ 1
具体操作如下:
package com. shsxt. study,streaming;
import java. sql. Connection ;
public class PersistMySQLWordcount {
public static void main(String[]args){
Sparkconf conf=new SparkConf(). settlaster ("local[1]"). setAppName ("Persist# ySQIWordcoun JavaStreamingContextjsc =new JavaStreamingContext (conf, Durations,seconds(5));
JavaDStream <String>lines=jssc. textFileStream ("hdfs://node21:8020/wordcount _ dir"); JavaDStream <String>words=lines,flatMap(new FlatMapFunction <String, Stringy()(
private static final long serial VersionUID =1L;
@Override
public Iterable<String>call(String line) throws Exception{
return Arrays. astist(line, split("“));
}
});
JavaPairDStream <String,Integer>pairs=words.mapToPair(new Pairfunction <String, string.
private static final long se rialVersionUID =1L;
@Override
publicTuple2 <String, Integer>call(String word) throws Exception{
wordpress. print();
wordcounts . foreachRDD (new VoidFunction < 3avaPairRDD <String, Integer>>(){
private static final long serial VersionUID =1L;
@Override public void call( JavaPairRDD <String,Integer> wordcountsRDD ) throws Exception{word countsRDD. foreach Partition(new Void Function<Iterator<Tuple2<String, Integer>>>(){
private static final long serial VersionUID =1L;
@Override public void call(Iterator<Tuple2<String, Integer> wordcounts ) throws Exception{ Connection conn= ConnectionPool . getConnection ();
Tuple2<String, Integer>wordcount=null;
while( wordcounts .hasNext()}{
wordpress= wordcounts . next();
String sql="insert into wordcount(word, count)"
+"values('"+wordcount._1+"',"+wordcount._2+")";
Statement stmt=conn. createStatement ();
stmt executeUpdate (sql);
Conne ctionPool. returnConne ction(conn);
}
});
jssc. start();
jssc. awaitTermination ();
jssc. close();
}