Spark Streaming实时流处理项目实战笔记——将统计结果写入到MySQL数据库中

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: Spark Streaming实时流处理项目实战笔记——将统计结果写入到MySQL数据库中

思路



两种方式,一种可优化(foreachRDD后,直接创建连接Mysql),一种在(foreachRDD后通过foreachPartition,通过分区获取)



代码实现


import java.sql.DriverManager
import Spark.UpdateStateByKey.workds
import Spark.WordCount.ssc
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object MysqlByKey extends App{
  val sparkConf = new SparkConf().setMaster("local[2]").setAppName("WordCount")
  val ssc = new StreamingContext(sparkConf,Seconds(10))
  // 第一点,如果要使用updateStateByKey算子,就必须设置一个checkpoint目录,开启checkpoint机制
  // 这样的话才能把每个key对应的state除了在内存中有,那么是不是也要checkpoint一份
  // 因为你要长期保存一份key的state的话,那么spark streaming是要求必须用checkpoint的,以便于在
  // 内存数据丢失的时候,可以从checkpoint中恢复数据
  // 开启checkpoint机制,很简单,只要调用jssc的checkpoint()方法,设置一个hdfs目录即可
  ssc.checkpoint("E:/test")
  // 实现基础的wordcount逻辑
  val lines = ssc.socketTextStream("hadoop2", 9999)
  //val lines = ssc.textFileStream("E:/test")
  val words = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
  //将结果写入MySql
  words.foreachRDD(rdd => rdd.foreachPartition(line => {
    Class.forName("com.mysql.jdbc.Driver")
    //获取mysql连接
    val conn = DriverManager.getConnection("jdbc:mysql://192.168.57.101:3306/test", "root", "1234")
    //把数据写入mysql
    try {
      for (row <- line) {
        val sql = "insert into wordcount(word,wordcount)values('" + row._1 + "','" + row._2 + "')"
        conn.prepareStatement(sql).executeUpdate()
      }
    } finally {
      conn.close()
    }
  }))
   /*方法二
words.foreachRDD(rdd=>{
     rdd.foreachPartition(partionOfRecords=>{
       if(partionOfRecords.size>0){
         val connection = createConnection()
         partionOfRecords.foreach(record=>{
           val sql = "insert into wordcount(word,wordcount) values("+record._1+","+record._2+")"
           connection.createStatement().execute(sql)
         })
         connection.close()
       }
     })
   })
  //获取通过jdbc连接数据库
  def createConnection()={
    Class.forName("com.mysql.jdbc.Driver")
    DriverManager.getConnection("jdbc:mysql://hadoop2:3306/test","root","1234")
  }*/
  words.print()
  ssc.start()
  ssc.awaitTermination()
}



文章知识点与官方知识档案匹配,可进一步学


相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
4月前
|
缓存 关系型数据库 数据库
关系型数据库高效查询和统计
【5月更文挑战第8天】关系型数据库高效查询和统计
68 7
|
2月前
|
SQL 数据库 关系型数据库
MySQL设计规约问题之为什么统计表中记录数时推荐使用COUNT(*)而不是COUNT(primary_key)或COUNT(1)
MySQL设计规约问题之为什么统计表中记录数时推荐使用COUNT(*)而不是COUNT(primary_key)或COUNT(1)
|
3月前
|
Oracle 关系型数据库 MySQL
深入了解 Linux 命令 `db_stat`:数据库统计信息的获取
本文模拟了 Linux 环境下使用 `db_stat` 命令获取数据库统计信息的场景,实际上`db_stat`是特定数据库系统的自定义工具。文章通过示例展示了如何针对Oracle、PostgreSQL和MySQL使用各自内置命令收集统计信息,强调了权限、性能影响和数据实时性等因素,并指出这些信息对优化数据库管理至关重要。
|
3月前
|
SQL 安全 API
Python基础教程(第3版)中文版 第13章 数据库支持(笔记)
Python基础教程(第3版)中文版 第13章 数据库支持(笔记)
|
2月前
数据库系统工程师考点笔记
数据库系统工程师考点笔记
111 0
|
2月前
|
编解码 算法 vr&ar
软考中级之数据库系统工程师笔记总结(六)多媒体基础
软考中级之数据库系统工程师笔记总结(六)多媒体基础
18 0
|
2月前
|
网络协议 安全 网络安全
软考中级之数据库系统工程师笔记总结(五)网络基础
软考中级之数据库系统工程师笔记总结(五)网络基础
16 0
|
2月前
|
人工智能 数据管理 Java
软考中级之数据库系统工程师笔记总结(四)程序设计基础
软考中级之数据库系统工程师笔记总结(四)程序设计基础
19 0
|
2月前
|
存储 算法 Unix
软考中级之数据库系统工程师笔记总结(三)操作系统
软考中级之数据库系统工程师笔记总结(三)操作系统
22 0
|
2月前
|
存储 算法 C语言
软考中级之数据库系统工程师笔记总结(二)数据结构与算法
软考中级之数据库系统工程师笔记总结(二)数据结构与算法
20 0

热门文章

最新文章

下一篇
云函数