练习例子
1。
package com.haiyang
import java.nio.charset.Charset
import org.apache.flume.api.RpcClientFactory
import org.apache.flume.event.EventBuilder
//flume 数据发送 productor端口 主要用于发送产生的Event
object FlumeMsgSender {
val client =RpcClientFactory.getDefaultInstance("master",33333)
//客户端通过RPC协议工厂实现默认的主节点 以及端口
def sendEvent(msg:String)={
//发送event的Body
val event =EventBuilder.withBody(msg,Charset.forName("UTF-8"))
//追加event
client.append(event)
}
def main(args: Array[String]): Unit = {
//产生时间并发送
(1 to 100).foreach(x=>{
sendEvent(s"hello flume--$x")
})
client.close()
}
}
package com.haiyang
import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object FlumePushStreaming {
//连接Spark 创建Streaming Secods(5)表示五秒产生一个RDD
val conf =new SparkConf().setMaster("local[*]").setAppName("flume get")
val ssc=new StreamingContext(conf,Seconds(5))
def main(args: Array[String]): Unit = {
//创建FlumeUntils 连接Stream :监听的IP 和端口
val flumeDstream =FlumeUtils.createStream(ssc,"192.168.6.168",33333)
flumeDstream.flatMap(x=>new String(x.event.getBody.array()).split("\\s"))
.map(x=>(x,1))
.reduceByKey(_+_)
.print()
//将得到的数据展平 计算个数
ssc.start()
ssc.awaitTermination()
}
}
//自建的工具类 连接Hbase的配置文件 连接HBase上的表明
package com.haiyang
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.ConnectionFactory
object HbaseUtils {
val conf =HBaseConfiguration.create()
val connection =ConnectionFactory.createConnection(conf)
def getTable(tableName:String)={
connection.getTable(TableName.valueOf(tableName))
}
}
package com.haiyang
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Duration, StreamingContext}
object KafkaDirectWordCount {
//连接Spark 连接Streaming
val conf =new SparkConf().setMaster("local[*]").setAppName("use direct get wordCount")
val ssc=new StreamingContext(conf,Duration(3000))
def main(args: Array[String]): Unit = {
//实现kafka配置需要的信息 bootstrap.server key 以及value的Deserializer
val kafkaParams =Map(("bootstrap.servers","master:9092,slave1:9092,slave2:9092"),("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
,("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"),("group.id","kafkatest"),("enable.auto.commit","false"))
// val topic =Set("forstreaming")
// val consumerStrategies =ConsumerStrategies Subscribe[String,String](topic, kafkaParams)
// val kafkaDStream =KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent,consumerStrategies)
// kafkaDStream.map(x=>x.value())
// .flatMap(x=>x.split("\\s"))
// .map(_,1)
// .reduceByKey(_+_)
// .print()
// ssc.start()
// ssc.awaitTermination()
//发送topic 信息
val topic =Set("test1")
//Subscribe后面的两个泛型要与map的kv类型对应
//consumerStrategies 消费者策略 就是连接到topic 并给定关于kafka参数信息 kafka
val
consumerStrategies=ConsumerStrategies.Subscribe[String,String](topic,kafkaParams)
val kafkaDstream=KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent,consumerStrategies)
kafkaDstream.map(x=>x.value())
.flatMap(x=>x.split("\\s"))
.map((_,1))
.reduceByKey(_+_)
.print()
ssc.start()
ssc.awaitTermination()
}
}
package com.haiyang
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SpoolDirectoryWordCount {
//配置信息 连接SparkStreaming
val conf =new SparkConf().setMaster("local[*]").setAppName("jianting")
val ssc =new StreamingContext(conf,Seconds(5))
def monitorDirectory()={
//监听hdfs上的文件夹
val fileDstream =ssc.textFileStream("/bd17/stream_spark")
fileDstream.print()
}
def main(args: Array[String]): Unit = {
monitorDirectory()
ssc.start()
ssc.awaitTermination()
}
}
package com.haiyang
import com.haiyang.FlumePushStreaming.ssc
import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamingPullFlume {
//连接SparkStreaming
val conf =new SparkConf().setMaster("local[*]").setAppName(" pull data")
val ssc= new StreamingContext(conf,Seconds(5))
def main(args: Array[String]): Unit = {
//从指定节点上拉去数据
val flumeDstream =FlumeUtils.createPollingStream(ssc,"master",9999)
flumeDstream.map(x=>new String(x.event.getBody.array()))
.flatMap(x=>x.split("\\s"))
.map((_ ,1))
.reduceByKey(_+_)
.print()
//展平 做MR操作
ssc.start()
ssc.awaitTermination()
}
}
package com.haiyang
import java.sql.DriverManager
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamingSaveToMysql {
//创建连接SparkStreaming
val conf =new SparkConf().setMaster("local[*]").setAppName("save to mysql")
val ssc =new StreamingContext(conf,Seconds(5))
def main(args: Array[String]): Unit = {
//通过socket连接指定主机的网络端口
val dstream =ssc.socketTextStream("master",9999)
val result =dstream.flatMap(x=>x.split("\\s"))
.map(x=>(x,1))
.reduceByKey(_+_)
//对ds数据践行遍历为RDD 键入元组 一个为声明时间戳 一个为遍历每个分区 让每个分区都连接到mysql上 在这里值得一提的是需要mysql依赖
result.foreachRDD((rdd,time)=>{
val timestamp =time.milliseconds
rdd.foreachPartition(wcs=>{
//启动驱动 连接mysql
Class.forName("com.mysql.jdbc.Driver")
val connection =DriverManager.getConnection("jdbc:mysql://master:3306/test","ocean","BG@%pim%hGF1K5FY")
//sql语句 插入之前通过网络端口拉去到的数据
val sql ="insert into streaming_wc (ts,word,count) values(?,?,?)"
val prepareStatement =connection.prepareStatement(sql)
for(record<-wcs){
prepareStatement.setLong(1,timestamp)
prepareStatement.setString(2,record._1)
prepareStatement.setInt(3,record._2)
prepareStatement.addBatch()
}
prepareStatement.executeBatch()
// connection.commit()
connection.close()
})
})
ssc.start()
ssc.awaitTermination()
}
}
package com.haiyang
import org.apache.hadoop.hbase.client.Put
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable.ListBuffer
object StreamingToHbase {
//创建连接SparkStreaming
val conf =new SparkConf().setAppName("streaming to hbase").setMaster("local[*]")
val ssc =new StreamingContext(conf,Seconds(5))
def main(args: Array[String]): Unit = {
//通过网端监听获得数据
val dstream =ssc.socketTextStream("master",9999)
val result =dstream.flatMap(_.split("\\s"))
.map((_,1))
.reduceByKey(_+_)
//遍历为RDD 键入元组 两个参数 一个是时间戳 一个是word
result.foreachRDD((rdd,time)=>{
val timestamp =time.milliseconds.toString
rdd.foreachPartition(wcs=>{
val table =HbaseUtils.getTable("streaming_wc")
val putList = new java.util.ArrayList[Put]()
for(wc<-wcs){
val put =new Put(timestamp.getBytes())
put.addColumn("i".getBytes(),wc._1.getBytes(),wc._2.toString.getBytes())
putList.add(put)
}
import scala.collection.JavaConversions
table.put(putList)
})
})
ssc.start()
ssc.awaitTermination()
}
}
package com.haiyang
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Duration, StreamingContext}
object WindowWordCount {
val conf =new SparkConf().setMaster("local[*]").setAppName("chuang kou caozuo")
val ssc =new StreamingContext(conf,Duration(3000))
def wordCount()= {
val dstream = ssc.socketTextStream("master", 9999)
val transaformation = dstream.flatMap(_.split("\\s"))
.map((_, 1))
//不指定滑动宽度 默认会以微批次的宽度计算的时间间隔
val result = transaformation.reduceByKeyAndWindow((x1:Int,x2:Int)=>x1+x2, Duration(12000),Duration(6000))
result.print()
}
def main(args: Array[String]): Unit = {
wordCount()
ssc.start()
ssc.awaitTermination()
}
}
package com.haiyang
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
//从网络端口获得数据 并且统计每个单词出现的次数
object WordCount {
//如果使用local的话至少是local[2] 如果只有一个现成 那么这个现成始终再处理接收数据 计算数据的过程
val sparkconf =new SparkConf().setMaster("local[*]").setAppName("wc streaming")
val ssc=new StreamingContext(sparkconf,Seconds(3))
def main(args: Array[String]): Unit = {
val dsStream =ssc.socketTextStream("master",9999)
val result =dsStream.flatMap(x=>x.split(" "))
.map(x=>(x,1))
.reduceByKey(_+_)
result.print(20)
ssc.start()
ssc.awaitTermination()
}
}
状态更新计算
package com.haiyang.statue
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Duration, StreamingContext}
object SteteWordCount {
val conf =new SparkConf().setMaster("local[*]").setAppName("lei ji caozuo")
val ssc=new StreamingContext(conf,Duration(3000))
ssc.checkpoint("/temp/streamingcheckpoint")
def allSumWordCount()={
val dsream =ssc.socketTextStream("master",9999)
val result =dsream.flatMap(_.split("\\s"))
.map((_,1))
.reduceByKey(_+_)
//获取并更新状态
val state =result.updateStateByKey[Int]((nowBat:Seq[Int],s:Option[Int])=>{
s match{
case Some(value)=>Some(value + nowBat.sum)
case None=>Some(nowBat.sum)
}
})
state.print()
}
def main(args: Array[String]): Unit = {
allSumWordCount()
ssc.start()
ssc.awaitTermination()
}
}