你好,使用spark将数据写入Hbase,
--------------组装xml并捕获异常-------------------
package wondersgroup_0628.com
import java.io.{IOException, PrintWriter, StringReader, StringWriter} import java.util.Base64
import com.wonders.TXmltmp import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{HTable, Put} import org.apache.hadoop.hbase.mapred.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.{SparkConf, SparkContext} import org.xml.sax.{InputSource, SAXException}
object TestTest_3 { def main(args: Array[String]): Unit = { val saprkConf = new SparkConf().setAppName("TextTeset_3") val sc = new SparkContext(saprkConf) // val dataText = "/user/hdfs/test/rdd_1000000.dat" val rdd = sc.textFile(args(0)) val data = rdd.map(_.split("\|\|")).map{x=>(x(0),x(1),x(2))} val result = data.foreachPartition{x => { val conf= HBaseConfiguration.create() conf.set(TableInputFormat.COLUMN_LIST,"hbaseTest"); conf.set("hbase.zookeeper.quorum","qsmaster,qsslave1,qsslave2"); conf.set("hbase.zookeeper.property.clientPort","2181"); // conf.addResource("/home/hadoop/data/lib/hbase-site.xml"); val table = new HTable(conf,"hbaseTest"); table.setAutoFlush(false,false); table.setWriteBufferSize(510241024); x.foreach{y => { try { val tmp = new TXmltmp val j1 = new String( Base64.getDecoder.decode(y ._1) ) val j2 = new String( Base64.getDecoder.decode(y ._2)) val xml = tmp.load(j1, j2)
import javax.xml.parsers.DocumentBuilderFactory
val foctory = DocumentBuilderFactory.newInstance
val builder = foctory.newDocumentBuilder
val buil = builder.parse(new InputSource( new StringReader(xml)))
var put= new Put(Bytes.toBytes(y._3));
put.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("age"), Bytes.toBytes(xml))
table.put(put);table.flushCommits
}
catch {
case ex: SAXException=>
case ex: IOException=>
println("found a unknown exception"+ ex)
val sw:StringWriter = new StringWriter()
val pw:PrintWriter = new PrintWriter(sw)
ex.printStackTrace(pw)
val error = sw.getBuffer
sw.close()
pw.close()
var put= new Put(Bytes.toBytes(y._3));
put.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("name"), Bytes.toBytes(error.toString))
table.put(put);table.flushCommits}
}}
}}
sc.stop()}
}
————————————————
版权声明:本文为CSDN博主「zhaokunpeng1593」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。 原文链接:https://blog.csdn.net/u012719230/article/details/81699987
阿里云帮助文档 https://help.aliyun.com
希望对您有帮助!
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。