Sparkstreaming读取Kafka消息再结合SparkSQL,将结果保存到HBase-阿里云开发者社区

开发者社区> 这是爱国者> 正文

Sparkstreaming读取Kafka消息再结合SparkSQL,将结果保存到HBase

简介: 环境为CDH5.8,开发工具为IDEA,大数据目前最新的API,送给大家避免踩坑!!
+关注继续查看

亲自摸索,送给大家,原创文章,转载注明哦。



import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.hadoop.hbase.client.{Mutation, Put}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.OutputFormat
/**
  * Created by sunyulong on 16/9/19.
  */
object OBDSQL extends App{
  //kafka topic
  val topics = List(("aaa",1)).toMap
  //zookeeper
  val zk = "10.1.11.71,10.1.11.72,10.1.11.73"
  val conf = new SparkConf() setMaster "yarn-cluster" setAppName "SparkStreamingETL"
  //create streaming context
  val ssc = new StreamingContext(conf , Seconds(1))
  //get every lines from kafka
  val lines = KafkaUtils.createStream(ssc,zk,"sparkStreaming",topics).map(_._2)
  //get spark context
  val sc = ssc.sparkContext
  //get sql context
  val sqlContext = new SQLContext(sc)
  //process every rdd AND save as HTable
  lines.foreachRDD(rdd => {
    //case class implicits
    import sqlContext.implicits._
    //filter empty rdd
    if (!rdd.isEmpty) {
      //register a temp table
      rdd.map(_.split(",")).map(p => Persion(p(0), p(1).trim.toDouble, p(2).trim.toInt, p(3).trim.toDouble)).toDF.registerTempTable("oldDriver")
      //use spark SQL
      val rs = sqlContext.sql("select count(1) from oldDriver")
      //create hbase conf
      val hconf = HBaseConfiguration.create()
      hconf.set("hbase.zookeeper.quorum",zk)
      hconf.set("hbase.zookeeper.property.clientPort", "2181")
      hconf.set("hbase.defaults.for.version.skip", "true")
      hconf.set(TableOutputFormat.OUTPUT_TABLE, "obd_pv")
      hconf.setClass("mapreduce.job.outputformat.class", classOf[TableOutputFormat[String]], classOf[OutputFormat[String, Mutation]])
      val jobConf = new JobConf(hconf)
      //convert every line to hbase lines
      rs.rdd.map(line => (System.currentTimeMillis(),line(0))).map(line =>{
        //create hbase put
        val put = new Put(Bytes.toBytes(line._1))
        //add column
        put.addColumn(Bytes.toBytes("pv"),Bytes.toBytes("pv"),Bytes.toBytes(line._2.toString))
        //retuen type
        (new ImmutableBytesWritable,put)
      }).saveAsNewAPIHadoopDataset(jobConf)     //save as HTable
    }
  })
  //streaming start
  ssc start()
  ssc awaitTermination()
}

//the entity of persion for SparkSQL
case class Persion(gender: String, tall: Double, age: Int, driverAge: Double)


版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
RocketMQ与MYSQL事务消息整合
rocketmq事务消息与mysql事物整合
8507 0
windows linux 使用python执行系统命令并将结果保存到变量
最近需要用到os.system 发现不能赋值到变量 后查有更新的模块,如下: os.system os.spawn* os.popen* popen2.* commands.* 重新使用content=os.
1089 0
ASP.NET Core轻松入门Bind读取配置文件到C#实例
首先新建一个ASP.NET Core空项目,命名为BindReader 然后 向项目中添加一个名为appsettings.json的json文件,为什么叫appsettings呢?  打开Program.
889 0
asp adodb.stream读取文件和写文件
读取文件操作: '------------------------------------------------- '函数名称:ReadTextFile '作用:利用AdoDb.Stream对象来读取UTF-8格式的文本文件 '-------------------------...
875 0
+关注
这是爱国者
北京车音网大数据开发
1
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
《2021云上架构与运维峰会演讲合集》
立即下载
《零基础CSS入门教程》
立即下载
《零基础HTML入门教程》
立即下载