开发者社区> 问答> 正文

如何用spark将mysql的数据写入hbase?

如何用spark将mysql的数据写入hbase?

展开
收起
程序猿003 2019-08-27 16:51:04 1649 0
1 条回答
写回答
取消 提交回答
  • 你好,使用spark将数据写入Hbase,

    --------------组装xml并捕获异常-------------------

    package wondersgroup_0628.com

    import java.io.{IOException, PrintWriter, StringReader, StringWriter} import java.util.Base64

    import com.wonders.TXmltmp import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{HTable, Put} import org.apache.hadoop.hbase.mapred.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.{SparkConf, SparkContext} import org.xml.sax.{InputSource, SAXException}

    object TestTest_3 { def main(args: Array[String]): Unit = { val saprkConf = new SparkConf().setAppName("TextTeset_3") val sc = new SparkContext(saprkConf) // val dataText = "/user/hdfs/test/rdd_1000000.dat" val rdd = sc.textFile(args(0)) val data = rdd.map(_.split("\|\|")).map{x=>(x(0),x(1),x(2))} val result = data.foreachPartition{x => { val conf= HBaseConfiguration.create() conf.set(TableInputFormat.COLUMN_LIST,"hbaseTest"); conf.set("hbase.zookeeper.quorum","qsmaster,qsslave1,qsslave2"); conf.set("hbase.zookeeper.property.clientPort","2181"); // conf.addResource("/home/hadoop/data/lib/hbase-site.xml"); val table = new HTable(conf,"hbaseTest"); table.setAutoFlush(false,false); table.setWriteBufferSize(510241024); x.foreach{y => { try { val tmp = new TXmltmp val j1 = new String( Base64.getDecoder.decode(y ._1) ) val j2 = new String( Base64.getDecoder.decode(y ._2)) val xml = tmp.load(j1, j2)

        import javax.xml.parsers.DocumentBuilderFactory
        val foctory = DocumentBuilderFactory.newInstance
        val builder = foctory.newDocumentBuilder
        val buil = builder.parse(new InputSource( new StringReader(xml)))
        var put= new Put(Bytes.toBytes(y._3));
        put.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("age"), Bytes.toBytes(xml))
        table.put(put);table.flushCommits
          }
      catch {
        case ex: SAXException=>
        case ex: IOException=>
          println("found a unknown exception"+ ex)
          val sw:StringWriter = new StringWriter()
          val pw:PrintWriter = new PrintWriter(sw)
          ex.printStackTrace(pw)
          val error = sw.getBuffer
          sw.close()
          pw.close()
          var put= new Put(Bytes.toBytes(y._3));
        put.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("name"), Bytes.toBytes(error.toString))
        table.put(put);table.flushCommits}
      }}
      }}
    sc.stop()}
    

    }

    ————————————————

    版权声明:本文为CSDN博主「zhaokunpeng1593」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。 原文链接:https://blog.csdn.net/u012719230/article/details/81699987

    阿里云帮助文档 https://help.aliyun.com

    希望对您有帮助!

    2019-12-12 22:21:27
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
搭建电商项目架构连接MySQL 立即下载
搭建4层电商项目架构,实战连接MySQL 立即下载
PolarDB MySQL引擎重磅功能及产品能力盛大发布 立即下载

相关镜像