Spark_Streaming-阿里云开发者社区

开发者社区> 漏船载酒> 正文

Spark_Streaming

简介:
+关注继续查看

练习例子
1。

package com.haiyang

import java.nio.charset.Charset

import org.apache.flume.api.RpcClientFactory
import org.apache.flume.event.EventBuilder
//flume 数据发送 productor端口 主要用于发送产生的Event
object FlumeMsgSender {
  val client =RpcClientFactory.getDefaultInstance("master",33333)
//客户端通过RPC协议工厂实现默认的主节点 以及端口
  def sendEvent(msg:String)={
//发送event的Body
    val event =EventBuilder.withBody(msg,Charset.forName("UTF-8"))
//追加event
    client.append(event)
  }

  def main(args: Array[String]): Unit = {
//产生时间并发送
    (1 to 100).foreach(x=>{
      sendEvent(s"hello flume--$x")
    })
    client.close()
  }
}

package com.haiyang

import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}


object FlumePushStreaming {
//连接Spark 创建Streaming  Secods(5)表示五秒产生一个RDD
  val conf =new SparkConf().setMaster("local[*]").setAppName("flume get")
  val ssc=new StreamingContext(conf,Seconds(5))

  def main(args: Array[String]): Unit = {
//创建FlumeUntils 连接Stream  :监听的IP 和端口
    val flumeDstream =FlumeUtils.createStream(ssc,"192.168.6.168",33333)
    flumeDstream.flatMap(x=>new String(x.event.getBody.array()).split("\\s"))
      .map(x=>(x,1))
      .reduceByKey(_+_)
      .print()
//将得到的数据展平 计算个数
    ssc.start()
    ssc.awaitTermination()
  }

}

//自建的工具类 连接Hbase的配置文件 连接HBase上的表明


package com.haiyang

import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.ConnectionFactory

object HbaseUtils {


  val conf =HBaseConfiguration.create()
  val connection =ConnectionFactory.createConnection(conf)

  def  getTable(tableName:String)={
    connection.getTable(TableName.valueOf(tableName))
  }

}

package com.haiyang

import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Duration, StreamingContext}


object KafkaDirectWordCount {
//连接Spark 连接Streaming
  val conf =new SparkConf().setMaster("local[*]").setAppName("use direct get wordCount")
  val ssc=new StreamingContext(conf,Duration(3000))

  def main(args: Array[String]): Unit = {
//实现kafka配置需要的信息 bootstrap.server key 以及value的Deserializer
    val kafkaParams =Map(("bootstrap.servers","master:9092,slave1:9092,slave2:9092"),("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
      ,("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"),("group.id","kafkatest"),("enable.auto.commit","false"))

//    val topic =Set("forstreaming")
//    val consumerStrategies =ConsumerStrategies Subscribe[String,String](topic, kafkaParams)
//    val kafkaDStream =KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent,consumerStrategies)
//   kafkaDStream.map(x=>x.value())
//        .flatMap(x=>x.split("\\s"))
//              .map(_,1)
//          .reduceByKey(_+_)
//            .print()
//    ssc.start()
//    ssc.awaitTermination()
//发送topic 信息
    val topic =Set("test1")
    //Subscribe后面的两个泛型要与map的kv类型对应
 //consumerStrategies 消费者策略 就是连接到topic 并给定关于kafka参数信息 kafka 
   val 
consumerStrategies=ConsumerStrategies.Subscribe[String,String](topic,kafkaParams)
    val kafkaDstream=KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent,consumerStrategies)
    kafkaDstream.map(x=>x.value())
      .flatMap(x=>x.split("\\s"))
      .map((_,1))
      .reduceByKey(_+_)
      .print()
    ssc.start()
    ssc.awaitTermination()

  }

}
package com.haiyang

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SpoolDirectoryWordCount {
//配置信息 连接SparkStreaming
  val conf =new SparkConf().setMaster("local[*]").setAppName("jianting")
  val ssc =new StreamingContext(conf,Seconds(5))


  def monitorDirectory()={
//监听hdfs上的文件夹
    val fileDstream =ssc.textFileStream("/bd17/stream_spark")
    fileDstream.print()
  }

  def main(args: Array[String]): Unit = {
    monitorDirectory()
    ssc.start()
    ssc.awaitTermination()
  }
}


package com.haiyang

import com.haiyang.FlumePushStreaming.ssc
import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamingPullFlume {
//连接SparkStreaming
  val conf =new SparkConf().setMaster("local[*]").setAppName(" pull data")
  val ssc= new StreamingContext(conf,Seconds(5))

  def main(args: Array[String]): Unit = {
 //从指定节点上拉去数据  
  val flumeDstream =FlumeUtils.createPollingStream(ssc,"master",9999)
    flumeDstream.map(x=>new String(x.event.getBody.array()))
      .flatMap(x=>x.split("\\s"))
      .map((_ ,1))
      .reduceByKey(_+_)
      .print()
//展平 做MR操作
    ssc.start()
    ssc.awaitTermination()

  }

}

package com.haiyang

import java.sql.DriverManager

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamingSaveToMysql {

//创建连接SparkStreaming
  val conf =new SparkConf().setMaster("local[*]").setAppName("save to mysql")
  val ssc =new StreamingContext(conf,Seconds(5))

  def main(args: Array[String]): Unit = {
//通过socket连接指定主机的网络端口
    val dstream =ssc.socketTextStream("master",9999)
    val result =dstream.flatMap(x=>x.split("\\s"))
                    .map(x=>(x,1))
                      .reduceByKey(_+_)
//对ds数据践行遍历为RDD 键入元组 一个为声明时间戳 一个为遍历每个分区 让每个分区都连接到mysql上 在这里值得一提的是需要mysql依赖 
    result.foreachRDD((rdd,time)=>{
      val timestamp =time.milliseconds
      rdd.foreachPartition(wcs=>{
//启动驱动  连接mysql 
        Class.forName("com.mysql.jdbc.Driver")
        val connection =DriverManager.getConnection("jdbc:mysql://master:3306/test","ocean","BG@%pim%hGF1K5FY")
//sql语句 插入之前通过网络端口拉去到的数据
        val sql ="insert into streaming_wc (ts,word,count) values(?,?,?)"
        val prepareStatement =connection.prepareStatement(sql)
        for(record<-wcs){
          prepareStatement.setLong(1,timestamp)
          prepareStatement.setString(2,record._1)
          prepareStatement.setInt(3,record._2)
          prepareStatement.addBatch()
        }
        prepareStatement.executeBatch()
//        connection.commit()
        connection.close()
      })

    })
    ssc.start()
    ssc.awaitTermination()
  }

}
package com.haiyang


import org.apache.hadoop.hbase.client.Put
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable.ListBuffer

object StreamingToHbase {
//创建连接SparkStreaming
  val conf =new SparkConf().setAppName("streaming to hbase").setMaster("local[*]")
  val ssc =new StreamingContext(conf,Seconds(5))

  def main(args: Array[String]): Unit = {
//通过网端监听获得数据
val dstream =ssc.socketTextStream("master",9999)
    val result =dstream.flatMap(_.split("\\s"))
                          .map((_,1))
                            .reduceByKey(_+_)

//遍历为RDD 键入元组 两个参数 一个是时间戳 一个是word
    result.foreachRDD((rdd,time)=>{
      val timestamp =time.milliseconds.toString
      rdd.foreachPartition(wcs=>{
      val table =HbaseUtils.getTable("streaming_wc")

        val putList = new java.util.ArrayList[Put]()
        for(wc<-wcs){
          val put =new Put(timestamp.getBytes())
          put.addColumn("i".getBytes(),wc._1.getBytes(),wc._2.toString.getBytes())
          putList.add(put)
        }
        import  scala.collection.JavaConversions
        table.put(putList)
    })
    })
    ssc.start()
    ssc.awaitTermination()

  }


}
package com.haiyang

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Duration, StreamingContext}


object WindowWordCount {

  val conf =new SparkConf().setMaster("local[*]").setAppName("chuang kou caozuo")
  val ssc =new StreamingContext(conf,Duration(3000))

  def wordCount()= {
    val dstream = ssc.socketTextStream("master", 9999)
    val transaformation = dstream.flatMap(_.split("\\s"))
      .map((_, 1))

    //不指定滑动宽度  默认会以微批次的宽度计算的时间间隔
    val result = transaformation.reduceByKeyAndWindow((x1:Int,x2:Int)=>x1+x2, Duration(12000),Duration(6000))

    result.print()

  }
    def main(args: Array[String]): Unit = {
      wordCount()
      ssc.start()
      ssc.awaitTermination()
    }

}

package com.haiyang

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}


//从网络端口获得数据 并且统计每个单词出现的次数
object WordCount {
//如果使用local的话至少是local[2] 如果只有一个现成 那么这个现成始终再处理接收数据 计算数据的过程
val sparkconf =new SparkConf().setMaster("local[*]").setAppName("wc streaming")
val ssc=new StreamingContext(sparkconf,Seconds(3))


  def main(args: Array[String]): Unit = {
    val dsStream =ssc.socketTextStream("master",9999)

  val result =dsStream.flatMap(x=>x.split(" "))
            .map(x=>(x,1))
              .reduceByKey(_+_)
    result.print(20)
    ssc.start()
    ssc.awaitTermination()
  }




}

状态更新计算

package com.haiyang.statue

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Duration, StreamingContext}

object SteteWordCount {

  val conf =new SparkConf().setMaster("local[*]").setAppName("lei ji caozuo")
  val ssc=new StreamingContext(conf,Duration(3000))


  ssc.checkpoint("/temp/streamingcheckpoint")

  def allSumWordCount()={
    val dsream =ssc.socketTextStream("master",9999)

    val result =dsream.flatMap(_.split("\\s"))
                        .map((_,1))
                          .reduceByKey(_+_)
    //获取并更新状态
    val state =result.updateStateByKey[Int]((nowBat:Seq[Int],s:Option[Int])=>{
      s match{
        case Some(value)=>Some(value + nowBat.sum)
        case None=>Some(nowBat.sum)
      }
    })
    state.print()
  }

  def main(args: Array[String]): Unit = {
    allSumWordCount()
    ssc.start()
    ssc.awaitTermination()
  }
}

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
阿里云服务器怎么设置密码?怎么停机?怎么重启服务器?
如果在创建实例时没有设置密码,或者密码丢失,您可以在控制台上重新设置实例的登录密码。本文仅描述如何在 ECS 管理控制台上修改实例登录密码。
9482 0
阿里云服务器ECS远程登录用户名密码查询方法
阿里云服务器ECS远程连接登录输入用户名和密码,阿里云没有默认密码,如果购买时没设置需要先重置实例密码,Windows用户名是administrator,Linux账号是root,阿小云来详细说下阿里云服务器远程登录连接用户名和密码查询方法
11191 0
阿里云服务器如何登录?阿里云服务器的三种登录方法
购买阿里云ECS云服务器后如何登录?场景不同,阿里云优惠总结大概有三种登录方式: 登录到ECS云服务器控制台 在ECS云服务器控制台用户可以更改密码、更换系.
13166 0
windows server 2008阿里云ECS服务器安全设置
最近我们Sinesafe安全公司在为客户使用阿里云ecs服务器做安全的过程中,发现服务器基础安全性都没有做。为了为站长们提供更加有效的安全基础解决方案,我们Sinesafe将对阿里云服务器win2008 系统进行基础安全部署实战过程! 比较重要的几部分 1.
9048 0
阿里云ECS云服务器初始化设置教程方法
阿里云ECS云服务器初始化是指将云服务器系统恢复到最初状态的过程,阿里云的服务器初始化是通过更换系统盘来实现的,是免费的,阿里云百科网分享服务器初始化教程: 服务器初始化教程方法 本文的服务器初始化是指将ECS云服务器系统恢复到最初状态,服务器中的数据也会被清空,所以初始化之前一定要先备份好。
6885 0
阿里云服务器如何登录?阿里云服务器的三种登录方法
购买阿里云ECS云服务器后如何登录?场景不同,云吞铺子总结大概有三种登录方式: 登录到ECS云服务器控制台 在ECS云服务器控制台用户可以更改密码、更换系统盘、创建快照、配置安全组等操作如何登录ECS云服务器控制台? 1、先登录到阿里云ECS服务器控制台 2、点击顶部的“控制台” 3、通过左侧栏,切换到“云服务器ECS”即可,如下图所示 通过ECS控制台的远程连接来登录到云服务器 阿里云ECS云服务器自带远程连接功能,使用该功能可以登录到云服务器,简单且方便,如下图:点击“远程连接”,第一次连接会自动生成6位数字密码,输入密码即可登录到云服务器上。
21893 0
阿里云服务器ECS登录用户名是什么?系统不同默认账号也不同
阿里云服务器Windows系统默认用户名administrator,Linux镜像服务器用户名root
4008 0
+关注
漏船载酒
我想:这该是一条多么长的路啊.
48
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
《2021云上架构与运维峰会演讲合集》
立即下载
《零基础CSS入门教程》
立即下载
《零基础HTML入门教程》
立即下载