Spark + Hbase
Spark 接入 Hbase
下面这个例子演示了 Spark 如何向 Hbase 写数据。需要指出的是,计算集群需要和 Hbase 集群处于一个安全组内,否则网络无法打通。在 E-Mapreduce 创建集群时,请注意选择 Hbase 集群所处的安全组。
- [backcolor=transparent] [backcolor=transparent]object[backcolor=transparent] [backcolor=transparent]ConnectionUtil[backcolor=transparent] [backcolor=transparent]extends[backcolor=transparent] [backcolor=transparent]Serializable[backcolor=transparent] [backcolor=transparent]{
- [backcolor=transparent] [backcolor=transparent]private[backcolor=transparent] val conf [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]HBaseConfiguration[backcolor=transparent].[backcolor=transparent]create[backcolor=transparent]()
- [backcolor=transparent] conf[backcolor=transparent].[backcolor=transparent]set[backcolor=transparent]([backcolor=transparent]HConstants[backcolor=transparent].[backcolor=transparent]ZOOKEEPER_QUORUM[backcolor=transparent],[backcolor=transparent]"ecs1,ecs1,ecs3"[backcolor=transparent])
- [backcolor=transparent] conf[backcolor=transparent].[backcolor=transparent]set[backcolor=transparent]([backcolor=transparent]HConstants[backcolor=transparent].[backcolor=transparent]ZOOKEEPER_ZNODE_PARENT[backcolor=transparent],[backcolor=transparent] [backcolor=transparent]"/hbase"[backcolor=transparent])
- [backcolor=transparent] [backcolor=transparent]private[backcolor=transparent] val connection [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]ConnectionFactory[backcolor=transparent].[backcolor=transparent]createConnection[backcolor=transparent]([backcolor=transparent]conf[backcolor=transparent])
- [backcolor=transparent] [backcolor=transparent]def[backcolor=transparent] getDefaultConn[backcolor=transparent]:[backcolor=transparent] [backcolor=transparent]Connection[backcolor=transparent] [backcolor=transparent]=[backcolor=transparent] connection
- [backcolor=transparent] [backcolor=transparent]}
- [backcolor=transparent] [backcolor=transparent]//创建数据流 unionStreams
- [backcolor=transparent] unionStreams[backcolor=transparent].[backcolor=transparent]foreachRDD[backcolor=transparent]([backcolor=transparent]rdd [backcolor=transparent]=>[backcolor=transparent] [backcolor=transparent]{
- [backcolor=transparent] rdd[backcolor=transparent].[backcolor=transparent]map[backcolor=transparent]([backcolor=transparent]bytes [backcolor=transparent]=>[backcolor=transparent] [backcolor=transparent]new[backcolor=transparent] [backcolor=transparent]String[backcolor=transparent]([backcolor=transparent]bytes[backcolor=transparent]))
- [backcolor=transparent] [backcolor=transparent].[backcolor=transparent]flatMap[backcolor=transparent]([backcolor=transparent]line [backcolor=transparent]=>[backcolor=transparent] line[backcolor=transparent].[backcolor=transparent]split[backcolor=transparent]([backcolor=transparent]" "[backcolor=transparent]))
- [backcolor=transparent] [backcolor=transparent].[backcolor=transparent]map[backcolor=transparent]([backcolor=transparent]word [backcolor=transparent]=>[backcolor=transparent] [backcolor=transparent]([backcolor=transparent]word[backcolor=transparent],[backcolor=transparent] [backcolor=transparent]1[backcolor=transparent]))
- [backcolor=transparent] [backcolor=transparent].[backcolor=transparent]reduceByKey[backcolor=transparent]([backcolor=transparent]_ [backcolor=transparent]+[backcolor=transparent] _[backcolor=transparent])
- [backcolor=transparent] [backcolor=transparent].[backcolor=transparent]mapPartitions [backcolor=transparent]{[backcolor=transparent]words [backcolor=transparent]=>[backcolor=transparent] [backcolor=transparent]{
- [backcolor=transparent] val conn [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]ConnectionUtil[backcolor=transparent].[backcolor=transparent]getDefaultConn
- [backcolor=transparent] val tableName [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]TableName[backcolor=transparent].[backcolor=transparent]valueOf[backcolor=transparent]([backcolor=transparent]tname[backcolor=transparent])
- [backcolor=transparent] val t [backcolor=transparent]=[backcolor=transparent] conn[backcolor=transparent].[backcolor=transparent]getTable[backcolor=transparent]([backcolor=transparent]tableName[backcolor=transparent])
- [backcolor=transparent] [backcolor=transparent]try[backcolor=transparent] [backcolor=transparent]{
- [backcolor=transparent] words[backcolor=transparent].[backcolor=transparent]sliding[backcolor=transparent]([backcolor=transparent]100[backcolor=transparent],[backcolor=transparent] [backcolor=transparent]100[backcolor=transparent]).[backcolor=transparent]foreach[backcolor=transparent]([backcolor=transparent]slice [backcolor=transparent]=>[backcolor=transparent] [backcolor=transparent]{
- [backcolor=transparent] val puts [backcolor=transparent]=[backcolor=transparent] slice[backcolor=transparent].[backcolor=transparent]map[backcolor=transparent]([backcolor=transparent]word [backcolor=transparent]=>[backcolor=transparent] [backcolor=transparent]{
- [backcolor=transparent] println[backcolor=transparent]([backcolor=transparent]s[backcolor=transparent]"word: $word"[backcolor=transparent])
- [backcolor=transparent] val put [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]new[backcolor=transparent] [backcolor=transparent]Put[backcolor=transparent]([backcolor=transparent]Bytes[backcolor=transparent].[backcolor=transparent]toBytes[backcolor=transparent]([backcolor=transparent]word[backcolor=transparent].[backcolor=transparent]_1 [backcolor=transparent]+[backcolor=transparent] [backcolor=transparent]System[backcolor=transparent].[backcolor=transparent]currentTimeMillis[backcolor=transparent]()))
- [backcolor=transparent] put[backcolor=transparent].[backcolor=transparent]addColumn[backcolor=transparent]([backcolor=transparent]COLUMN_FAMILY_BYTES[backcolor=transparent],[backcolor=transparent] COLUMN_QUALIFIER_BYTES[backcolor=transparent],
- [backcolor=transparent] [backcolor=transparent]System[backcolor=transparent].[backcolor=transparent]currentTimeMillis[backcolor=transparent](),[backcolor=transparent] [backcolor=transparent]Bytes[backcolor=transparent].[backcolor=transparent]toBytes[backcolor=transparent]([backcolor=transparent]word[backcolor=transparent].[backcolor=transparent]_2[backcolor=transparent]))
- [backcolor=transparent] put
- [backcolor=transparent] [backcolor=transparent]}).[backcolor=transparent]toList
- [backcolor=transparent] t[backcolor=transparent].[backcolor=transparent]put[backcolor=transparent]([backcolor=transparent]puts[backcolor=transparent])
- [backcolor=transparent] [backcolor=transparent]})
- [backcolor=transparent] [backcolor=transparent]}[backcolor=transparent] [backcolor=transparent]finally[backcolor=transparent] [backcolor=transparent]{
- [backcolor=transparent] t[backcolor=transparent].[backcolor=transparent]close[backcolor=transparent]()
- [backcolor=transparent] [backcolor=transparent]}
- [backcolor=transparent] [backcolor=transparent]Iterator[backcolor=transparent].[backcolor=transparent]empty
- [backcolor=transparent] [backcolor=transparent]}}.[backcolor=transparent]count[backcolor=transparent]()
- [backcolor=transparent] [backcolor=transparent]})
- [backcolor=transparent] ssc[backcolor=transparent].[backcolor=transparent]start[backcolor=transparent]()
- [backcolor=transparent] ssc[backcolor=transparent].[backcolor=transparent]awaitTermination[backcolor=transparent]()
附录
完整示例代码请看: