Spark_Streaming

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介:

练习例子
1。

package com.haiyang

import java.nio.charset.Charset

import org.apache.flume.api.RpcClientFactory
import org.apache.flume.event.EventBuilder
//flume 数据发送 productor端口 主要用于发送产生的Event
object FlumeMsgSender {
  val client =RpcClientFactory.getDefaultInstance("master",33333)
//客户端通过RPC协议工厂实现默认的主节点 以及端口
  def sendEvent(msg:String)={
//发送event的Body
    val event =EventBuilder.withBody(msg,Charset.forName("UTF-8"))
//追加event
    client.append(event)
  }

  def main(args: Array[String]): Unit = {
//产生时间并发送
    (1 to 100).foreach(x=>{
      sendEvent(s"hello flume--$x")
    })
    client.close()
  }
}

package com.haiyang

import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}


object FlumePushStreaming {
//连接Spark 创建Streaming  Secods(5)表示五秒产生一个RDD
  val conf =new SparkConf().setMaster("local[*]").setAppName("flume get")
  val ssc=new StreamingContext(conf,Seconds(5))

  def main(args: Array[String]): Unit = {
//创建FlumeUntils 连接Stream  :监听的IP 和端口
    val flumeDstream =FlumeUtils.createStream(ssc,"192.168.6.168",33333)
    flumeDstream.flatMap(x=>new String(x.event.getBody.array()).split("\\s"))
      .map(x=>(x,1))
      .reduceByKey(_+_)
      .print()
//将得到的数据展平 计算个数
    ssc.start()
    ssc.awaitTermination()
  }

}

//自建的工具类 连接Hbase的配置文件 连接HBase上的表明


package com.haiyang

import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.ConnectionFactory

object HbaseUtils {


  val conf =HBaseConfiguration.create()
  val connection =ConnectionFactory.createConnection(conf)

  def  getTable(tableName:String)={
    connection.getTable(TableName.valueOf(tableName))
  }

}

package com.haiyang

import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Duration, StreamingContext}


object KafkaDirectWordCount {
//连接Spark 连接Streaming
  val conf =new SparkConf().setMaster("local[*]").setAppName("use direct get wordCount")
  val ssc=new StreamingContext(conf,Duration(3000))

  def main(args: Array[String]): Unit = {
//实现kafka配置需要的信息 bootstrap.server key 以及value的Deserializer
    val kafkaParams =Map(("bootstrap.servers","master:9092,slave1:9092,slave2:9092"),("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
      ,("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"),("group.id","kafkatest"),("enable.auto.commit","false"))

//    val topic =Set("forstreaming")
//    val consumerStrategies =ConsumerStrategies Subscribe[String,String](topic, kafkaParams)
//    val kafkaDStream =KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent,consumerStrategies)
//   kafkaDStream.map(x=>x.value())
//        .flatMap(x=>x.split("\\s"))
//              .map(_,1)
//          .reduceByKey(_+_)
//            .print()
//    ssc.start()
//    ssc.awaitTermination()
//发送topic 信息
    val topic =Set("test1")
    //Subscribe后面的两个泛型要与map的kv类型对应
 //consumerStrategies 消费者策略 就是连接到topic 并给定关于kafka参数信息 kafka 
   val 
consumerStrategies=ConsumerStrategies.Subscribe[String,String](topic,kafkaParams)
    val kafkaDstream=KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent,consumerStrategies)
    kafkaDstream.map(x=>x.value())
      .flatMap(x=>x.split("\\s"))
      .map((_,1))
      .reduceByKey(_+_)
      .print()
    ssc.start()
    ssc.awaitTermination()

  }

}
package com.haiyang

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SpoolDirectoryWordCount {
//配置信息 连接SparkStreaming
  val conf =new SparkConf().setMaster("local[*]").setAppName("jianting")
  val ssc =new StreamingContext(conf,Seconds(5))


  def monitorDirectory()={
//监听hdfs上的文件夹
    val fileDstream =ssc.textFileStream("/bd17/stream_spark")
    fileDstream.print()
  }

  def main(args: Array[String]): Unit = {
    monitorDirectory()
    ssc.start()
    ssc.awaitTermination()
  }
}


package com.haiyang

import com.haiyang.FlumePushStreaming.ssc
import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamingPullFlume {
//连接SparkStreaming
  val conf =new SparkConf().setMaster("local[*]").setAppName(" pull data")
  val ssc= new StreamingContext(conf,Seconds(5))

  def main(args: Array[String]): Unit = {
 //从指定节点上拉去数据  
  val flumeDstream =FlumeUtils.createPollingStream(ssc,"master",9999)
    flumeDstream.map(x=>new String(x.event.getBody.array()))
      .flatMap(x=>x.split("\\s"))
      .map((_ ,1))
      .reduceByKey(_+_)
      .print()
//展平 做MR操作
    ssc.start()
    ssc.awaitTermination()

  }

}

package com.haiyang

import java.sql.DriverManager

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamingSaveToMysql {

//创建连接SparkStreaming
  val conf =new SparkConf().setMaster("local[*]").setAppName("save to mysql")
  val ssc =new StreamingContext(conf,Seconds(5))

  def main(args: Array[String]): Unit = {
//通过socket连接指定主机的网络端口
    val dstream =ssc.socketTextStream("master",9999)
    val result =dstream.flatMap(x=>x.split("\\s"))
                    .map(x=>(x,1))
                      .reduceByKey(_+_)
//对ds数据践行遍历为RDD 键入元组 一个为声明时间戳 一个为遍历每个分区 让每个分区都连接到mysql上 在这里值得一提的是需要mysql依赖 
    result.foreachRDD((rdd,time)=>{
      val timestamp =time.milliseconds
      rdd.foreachPartition(wcs=>{
//启动驱动  连接mysql 
        Class.forName("com.mysql.jdbc.Driver")
        val connection =DriverManager.getConnection("jdbc:mysql://master:3306/test","ocean","BG@%pim%hGF1K5FY")
//sql语句 插入之前通过网络端口拉去到的数据
        val sql ="insert into streaming_wc (ts,word,count) values(?,?,?)"
        val prepareStatement =connection.prepareStatement(sql)
        for(record<-wcs){
          prepareStatement.setLong(1,timestamp)
          prepareStatement.setString(2,record._1)
          prepareStatement.setInt(3,record._2)
          prepareStatement.addBatch()
        }
        prepareStatement.executeBatch()
//        connection.commit()
        connection.close()
      })

    })
    ssc.start()
    ssc.awaitTermination()
  }

}
package com.haiyang


import org.apache.hadoop.hbase.client.Put
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable.ListBuffer

object StreamingToHbase {
//创建连接SparkStreaming
  val conf =new SparkConf().setAppName("streaming to hbase").setMaster("local[*]")
  val ssc =new StreamingContext(conf,Seconds(5))

  def main(args: Array[String]): Unit = {
//通过网端监听获得数据
val dstream =ssc.socketTextStream("master",9999)
    val result =dstream.flatMap(_.split("\\s"))
                          .map((_,1))
                            .reduceByKey(_+_)

//遍历为RDD 键入元组 两个参数 一个是时间戳 一个是word
    result.foreachRDD((rdd,time)=>{
      val timestamp =time.milliseconds.toString
      rdd.foreachPartition(wcs=>{
      val table =HbaseUtils.getTable("streaming_wc")

        val putList = new java.util.ArrayList[Put]()
        for(wc<-wcs){
          val put =new Put(timestamp.getBytes())
          put.addColumn("i".getBytes(),wc._1.getBytes(),wc._2.toString.getBytes())
          putList.add(put)
        }
        import  scala.collection.JavaConversions
        table.put(putList)
    })
    })
    ssc.start()
    ssc.awaitTermination()

  }


}
package com.haiyang

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Duration, StreamingContext}


object WindowWordCount {

  val conf =new SparkConf().setMaster("local[*]").setAppName("chuang kou caozuo")
  val ssc =new StreamingContext(conf,Duration(3000))

  def wordCount()= {
    val dstream = ssc.socketTextStream("master", 9999)
    val transaformation = dstream.flatMap(_.split("\\s"))
      .map((_, 1))

    //不指定滑动宽度  默认会以微批次的宽度计算的时间间隔
    val result = transaformation.reduceByKeyAndWindow((x1:Int,x2:Int)=>x1+x2, Duration(12000),Duration(6000))

    result.print()

  }
    def main(args: Array[String]): Unit = {
      wordCount()
      ssc.start()
      ssc.awaitTermination()
    }

}

package com.haiyang

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}


//从网络端口获得数据 并且统计每个单词出现的次数
object WordCount {
//如果使用local的话至少是local[2] 如果只有一个现成 那么这个现成始终再处理接收数据 计算数据的过程
val sparkconf =new SparkConf().setMaster("local[*]").setAppName("wc streaming")
val ssc=new StreamingContext(sparkconf,Seconds(3))


  def main(args: Array[String]): Unit = {
    val dsStream =ssc.socketTextStream("master",9999)

  val result =dsStream.flatMap(x=>x.split(" "))
            .map(x=>(x,1))
              .reduceByKey(_+_)
    result.print(20)
    ssc.start()
    ssc.awaitTermination()
  }




}

状态更新计算

package com.haiyang.statue

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Duration, StreamingContext}

object SteteWordCount {

  val conf =new SparkConf().setMaster("local[*]").setAppName("lei ji caozuo")
  val ssc=new StreamingContext(conf,Duration(3000))


  ssc.checkpoint("/temp/streamingcheckpoint")

  def allSumWordCount()={
    val dsream =ssc.socketTextStream("master",9999)

    val result =dsream.flatMap(_.split("\\s"))
                        .map((_,1))
                          .reduceByKey(_+_)
    //获取并更新状态
    val state =result.updateStateByKey[Int]((nowBat:Seq[Int],s:Option[Int])=>{
      s match{
        case Some(value)=>Some(value + nowBat.sum)
        case None=>Some(nowBat.sum)
      }
    })
    state.print()
  }

  def main(args: Array[String]): Unit = {
    allSumWordCount()
    ssc.start()
    ssc.awaitTermination()
  }
}
相关文章
|
7月前
|
存储 消息中间件 分布式计算
Spark Streaming
Spark Streaming
82 1
|
7月前
|
存储 分布式计算 Spark
实战|使用Spark Streaming写入Hudi
实战|使用Spark Streaming写入Hudi
315 0
|
7月前
|
分布式计算 监控 大数据
【Spark Streaming】Spark Day10:Spark Streaming 学习笔记
【Spark Streaming】Spark Day10:Spark Streaming 学习笔记
84 0
|
7月前
|
消息中间件 分布式计算 Kafka
【Spark Streaming】Spark Day11:Spark Streaming 学习笔记
【Spark Streaming】Spark Day11:Spark Streaming 学习笔记
52 0
|
7月前
|
消息中间件 分布式计算 Kafka
Spark【Spark Streaming】
Spark【Spark Streaming】
|
消息中间件 分布式计算 Java
spark streaming知识总结[优化]
spark streaming知识总结[优化]
228 0
|
消息中间件 分布式计算 网络协议
Spark-Streaming的学习使用
SparkStreaming Spark Streaming可以轻松构建可扩展的容错流应用程序。 便于使用 把之前学过的处理离线数据的算子直接拿来使用 容错 sparkStreaming可以保证数据被消费一次 保证不丢失,不重复消费数据。 集成
133 0
Spark-Streaming的学习使用
|
存储 分布式计算 调度
|
存储 分布式计算 Apache
Spark Streaming 框架在 5G 中的应用
在发展 5G 和 IoT 场景的准备阶段,爱立信研究了各种可扩展和灵活的流处理框架,以解决数据流水线问题以及提升整体性能。我们通过机器学习流数据进行自适应学习和智能决策从而实现各个领域的自动化。其中使用机器学习算法从流数据中逐步学习模型和获取信息是一个巨大的挑战。