Spark导入数据到HBase问题(BulkLoad)-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文

Spark导入数据到HBase问题(BulkLoad)

hbase小能手 2018-11-08 11:26:29 2336

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HRegionLocator;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.mapreduce.Job;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.sql.SparkSession;
import cn.netcommander.config.Config;
import scala.Tuple2;

public class Work {

public static void work() throws Exception{
    SparkSession spark = SparkSession.builder().appName("").getOrCreate();
    JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
    String inputPath = Config.input;//输入路径
    String zkQuorum = Config.zkQuorum;//zookeeper集群
    String zkPort = Config.zkPort;//zookeeper端口号
    String tableName = Config.tableName;//表名
    final String family = Config.family;//列簇
    String outputPath = Config.output;//输入路径
    JavaRDD<String> rdd = sc.textFile(inputPath);
    JavaPairRDD<ImmutableBytesWritable, KeyValue> flatMapToPair = rdd.flatMapToPair(new PairFlatMapFunction<String, ImmutableBytesWritable, KeyValue>() {
        private static final long serialVersionUID = 1L;
        @Override
        public Iterator<Tuple2<ImmutableBytesWritable, KeyValue>> call(String line) throws Exception {
            List<Tuple2<ImmutableBytesWritable, KeyValue>> kvs = new ArrayList<>();
            String[] split = line.split("\\|",-1);
            byte[] rowkey = (split[0]+"|"+split[2]).getBytes();//号牌号码+经过时间
            kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"hphm".getBytes(),split[0].getBytes())));    //号牌号码          
            kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"hpzl".getBytes(),split[1].getBytes()))); //号牌种类  
            kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"jgsj".getBytes(),split[2].getBytes()))); //经过时间  
            kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"clsd".getBytes(),split[3].getBytes()))); //车辆速度  
            kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"zgxs".getBytes(),split[4].getBytes())));    //最高限速          
            kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"dzxs".getBytes(),split[5].getBytes()))); //最低限速  
            kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"hpys".getBytes(),split[6].getBytes()))); //号牌颜色  
            kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"cllx".getBytes(),split[7].getBytes()))); //车辆类型  
            kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"sbbh".getBytes(),split[8].getBytes())));    //设备编号          
            kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"cdbh".getBytes(),split[9].getBytes()))); //车道编号  
            kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"fxbh".getBytes(),split[10].getBytes())));//方向编号  
            kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"csys".getBytes(),split[11].getBytes())));//车身颜色  
            kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"fxmc".getBytes(),split[12].getBytes())));//方向名称              
            kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"xrsj".getBytes(),split[13].getBytes())));//写入时间  
            return kvs.iterator();
        }
    });
    
    HBaseConfiguration conf = (HBaseConfiguration) HBaseConfiguration.create();
    conf.set("hbase.zookeeper.quorum", zkQuorum);
    conf.set("hbase.zookeeper.property.clientPort", zkPort);
    conf.set("hbase.defaults.for.version.skip", "true");
    conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);

    Connection connection = ConnectionFactory.createConnection(conf);
    Table table = connection.getTable(TableName.valueOf(tableName));
    
    Job job = Job.getInstance(conf);
    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapOutputValueClass(KeyValue.class);
    
    HRegionLocator regionLocator = new  HRegionLocator(TableName.valueOf(tableName),(ClusterConnection) connection);
    HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator);//HFileOutputFormat2.configureIncrementalLoadMap(job, table);

    flatMapToPair.saveAsNewAPIHadoopFile(outputPath, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class, conf);
    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
    loader.doBulkLoad(new Path(outputPath), connection.getAdmin(), table, regionLocator);
    
    sc.close();
}

}
POM.xml文件如下:
<?xml version="1.0" encoding="UTF-8"?>

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/ma ... gt%3B
<modelVersion>4.0.0</modelVersion>

<groupId>cn.netcommander.wf</groupId>
<artifactId>CLJS_ETL</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>

<properties>
    <fastjson.version>1.2.32</fastjson.version>
    <hadoop.version>2.7.3</hadoop.version>
    <hbase.version>1.2.6</hbase.version>
    <jdk.version>1.7</jdk.version>
    <kafka.version>0.10.2.1</kafka.version>
    <mysql.version>5.1.43</mysql.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <quartz.version>2.3.0</quartz.version>
    <scala.binary.version>2.11</scala.binary.version>
    <scala.version>2.11.8</scala.version>
    <spark.version>2.1.2</spark.version>
</properties>

<dependencies>

    <!-- Kafka依赖 -->
<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>1.1.0</version>
    </dependency>

<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>1.1.0</version>
</dependency>

    <!-- Hadoop依赖 -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-auth</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
    </dependency>

    <!-- HBase依赖 -->
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>${hbase.version}</version>
    </dependency>
        <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>${hbase.version}</version>
    </dependency>
    
    <!-- Spark依赖 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.binary.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_${scala.binary.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_${scala.binary.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-mllib_${scala.binary.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>${fastjson.version}</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>${mysql.version}</version>
    </dependency>
    <!--<dependency> -->
    <!--<groupId>cn.netcommander.kpiengine</groupId> -->
    <!--<artifactId>template</artifactId> -->
    <!--<version>1.0</version> -->
    <!--</dependency> -->
    <!-- 定时jar包 -->

    <dependency>
            <groupId>org.quartz-scheduler</groupId>
        <artifactId>quartz</artifactId>
        <version>2.1.7</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.5</version>
            <configuration>
                <source>${jdk.version}</source>
                <target>${jdk.version}</target>
                <encoding>${project.build.sourceEncoding}</encoding>
            </configuration>
        </plugin>
        <!-- 依赖包导出插件 -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-dependency-plugin</artifactId>
            <version>3.0.1</version>
            <configuration>
                <outputDirectory>${project.build.directory}\libs</outputDirectory>
                <!-- 排除关联的依赖 -->
                <excludeTransitive>false</excludeTransitive>
                <!-- 除去版本号 -->
                <stripVersion>false</stripVersion>
            </configuration>
        </plugin>
    </plugins>
</build>

XML 消息中间件 分布式计算 Java Hadoop Kafka 分布式数据库 Spark 数据格式 Hbase
分享到
取消 提交回答
全部回答(1)
大数据
使用钉钉扫一扫加入圈子
+ 订阅

大数据计算实践乐园,近距离学习前沿技术

推荐文章
相似问题