Spark导入数据到HBase问题(BulkLoad)-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文

Spark导入数据到HBase问题(BulkLoad)

2018-11-08 11:26:29 4431 1

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HRegionLocator;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.mapreduce.Job;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.sql.SparkSession;
import cn.netcommander.config.Config;
import scala.Tuple2;

public class Work {

public static void work() throws Exception{
    SparkSession spark = SparkSession.builder().appName("").getOrCreate();
    JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
    String inputPath = Config.input;//输入路径
    String zkQuorum = Config.zkQuorum;//zookeeper集群
    String zkPort = Config.zkPort;//zookeeper端口号
    String tableName = Config.tableName;//表名
    final String family = Config.family;//列簇
    String outputPath = Config.output;//输入路径
    JavaRDD<String> rdd = sc.textFile(inputPath);
    JavaPairRDD<ImmutableBytesWritable, KeyValue> flatMapToPair = rdd.flatMapToPair(new PairFlatMapFunction<String, ImmutableBytesWritable, KeyValue>() {
        private static final long serialVersionUID = 1L;
        @Override
        public Iterator<Tuple2<ImmutableBytesWritable, KeyValue>> call(String line) throws Exception {
            List<Tuple2<ImmutableBytesWritable, KeyValue>> kvs = new ArrayList<>();
            String[] split = line.split("\\|",-1);
            byte[] rowkey = (split[0]+"|"+split[2]).getBytes();//号牌号码+经过时间
            kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"hphm".getBytes(),split[0].getBytes())));    //号牌号码          
            kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"hpzl".getBytes(),split[1].getBytes()))); //号牌种类  
            kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"jgsj".getBytes(),split[2].getBytes()))); //经过时间  
            kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"clsd".getBytes(),split[3].getBytes()))); //车辆速度  
            kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"zgxs".getBytes(),split[4].getBytes())));    //最高限速          
            kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"dzxs".getBytes(),split[5].getBytes()))); //最低限速  
            kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"hpys".getBytes(),split[6].getBytes()))); //号牌颜色  
            kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"cllx".getBytes(),split[7].getBytes()))); //车辆类型  
            kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"sbbh".getBytes(),split[8].getBytes())));    //设备编号          
            kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"cdbh".getBytes(),split[9].getBytes()))); //车道编号  
            kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"fxbh".getBytes(),split[10].getBytes())));//方向编号  
            kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"csys".getBytes(),split[11].getBytes())));//车身颜色  
            kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"fxmc".getBytes(),split[12].getBytes())));//方向名称              
            kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"xrsj".getBytes(),split[13].getBytes())));//写入时间  
            return kvs.iterator();
        }
    });
    
    HBaseConfiguration conf = (HBaseConfiguration) HBaseConfiguration.create();
    conf.set("hbase.zookeeper.quorum", zkQuorum);
    conf.set("hbase.zookeeper.property.clientPort", zkPort);
    conf.set("hbase.defaults.for.version.skip", "true");
    conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);

    Connection connection = ConnectionFactory.createConnection(conf);
    Table table = connection.getTable(TableName.valueOf(tableName));
    
    Job job = Job.getInstance(conf);
    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapOutputValueClass(KeyValue.class);
    
    HRegionLocator regionLocator = new  HRegionLocator(TableName.valueOf(tableName),(ClusterConnection) connection);
    HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator);//HFileOutputFormat2.configureIncrementalLoadMap(job, table);

    flatMapToPair.saveAsNewAPIHadoopFile(outputPath, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class, conf);
    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
    loader.doBulkLoad(new Path(outputPath), connection.getAdmin(), table, regionLocator);
    
    sc.close();
}

}
POM.xml文件如下:
<?xml version="1.0" encoding="UTF-8"?>

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/ma ... gt%3B
<modelVersion>4.0.0</modelVersion>

<groupId>cn.netcommander.wf</groupId>
<artifactId>CLJS_ETL</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>

<properties>
    <fastjson.version>1.2.32</fastjson.version>
    <hadoop.version>2.7.3</hadoop.version>
    <hbase.version>1.2.6</hbase.version>
    <jdk.version>1.7</jdk.version>
    <kafka.version>0.10.2.1</kafka.version>
    <mysql.version>5.1.43</mysql.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <quartz.version>2.3.0</quartz.version>
    <scala.binary.version>2.11</scala.binary.version>
    <scala.version>2.11.8</scala.version>
    <spark.version>2.1.2</spark.version>
</properties>

<dependencies>

    <!-- Kafka依赖 -->
<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>1.1.0</version>
    </dependency>

<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>1.1.0</version>
</dependency>

    <!-- Hadoop依赖 -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-auth</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
    </dependency>

    <!-- HBase依赖 -->
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>${hbase.version}</version>
    </dependency>
        <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>${hbase.version}</version>
    </dependency>
    
    <!-- Spark依赖 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.binary.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_${scala.binary.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_${scala.binary.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-mllib_${scala.binary.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>${fastjson.version}</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>${mysql.version}</version>
    </dependency>
    <!--<dependency> -->
    <!--<groupId>cn.netcommander.kpiengine</groupId> -->
    <!--<artifactId>template</artifactId> -->
    <!--<version>1.0</version> -->
    <!--</dependency> -->
    <!-- 定时jar包 -->

    <dependency>
            <groupId>org.quartz-scheduler</groupId>
        <artifactId>quartz</artifactId>
        <version>2.1.7</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.5</version>
            <configuration>
                <source>${jdk.version}</source>
                <target>${jdk.version}</target>
                <encoding>${project.build.sourceEncoding}</encoding>
            </configuration>
        </plugin>
        <!-- 依赖包导出插件 -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-dependency-plugin</artifactId>
            <version>3.0.1</version>
            <configuration>
                <outputDirectory>${project.build.directory}\libs</outputDirectory>
                <!-- 排除关联的依赖 -->
                <excludeTransitive>false</excludeTransitive>
                <!-- 除去版本号 -->
                <stripVersion>false</stripVersion>
            </configuration>
        </plugin>
    </plugins>
</build>

取消 提交回答
全部回答(1)
相关问答

40

回答

[@徐雷frank][¥20]什么是JAVA的平台无关性

大河人家 2018-10-29 23:55:20 144698浏览量 回答数 40

17

回答

【大咖问答】对话PostgreSQL 中国社区发起人之一,阿里云数据库高级专家 德哥

阿里ACE 彭飞 2019-07-10 09:36:10 1034717浏览量 回答数 17

162

回答

惊喜翻倍:免费ECS+免费环境配置~!(ECS免费体验6个月活动3月31日结束)

豆妹 2014-10-29 17:52:21 226087浏览量 回答数 162

8

回答

OceanBase 使用动画(持续更新)

mq4096 2019-02-20 17:16:36 336973浏览量 回答数 8

13

回答

[@饭娱咖啡][¥20]我想知道 Java 关于引用那一块的知识

心意乱 2018-10-31 18:44:12 142447浏览量 回答数 13

110

回答

OSS存储服务-客户端工具

newegg11 2012-05-17 15:37:18 295471浏览量 回答数 110

22

回答

爬虫数据管理【问答合集】

我是管理员 2018-08-10 16:37:41 147213浏览量 回答数 22

18

回答

阿里云开放端口权限

xcxx 2016-07-20 15:03:33 646722浏览量 回答数 18

31

回答

[@倚贤][¥20]刚学完html/css/js的新手学习servlet、jsp需要注意哪些问题?

弗洛伊德6 2018-10-27 21:52:43 146022浏览量 回答数 31

24

回答

【精品问答】python技术1000问(1)

问问小秘 2019-11-15 13:25:00 475472浏览量 回答数 24
+关注
hbase小能手
HBase是一个分布式的、面向列的开源数据库,一个结构化数据的分布式存储系统。HBase不同于一般的关系数据库,它是一个适合于非结构化数据存储的数据库。阿里云HBase技术团队共同探讨HBase及其生态的问题。
112
文章
338
问答
问答排行榜
最热
最新
相关电子书
更多
《2021云上架构与运维峰会演讲合集》
立即下载
《零基础CSS入门教程》
立即下载
《零基础HTML入门教程》
立即下载