import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HRegionLocator;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.mapreduce.Job;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.sql.SparkSession;
import cn.netcommander.config.Config;
import scala.Tuple2;
public class Work {
public static void work() throws Exception{
SparkSession spark = SparkSession.builder().appName("").getOrCreate();
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
String inputPath = Config.input;//输入路径
String zkQuorum = Config.zkQuorum;//zookeeper集群
String zkPort = Config.zkPort;//zookeeper端口号
String tableName = Config.tableName;//表名
final String family = Config.family;//列簇
String outputPath = Config.output;//输入路径
JavaRDD<String> rdd = sc.textFile(inputPath);
JavaPairRDD<ImmutableBytesWritable, KeyValue> flatMapToPair = rdd.flatMapToPair(new PairFlatMapFunction<String, ImmutableBytesWritable, KeyValue>() {
private static final long serialVersionUID = 1L;
@Override
public Iterator<Tuple2<ImmutableBytesWritable, KeyValue>> call(String line) throws Exception {
List<Tuple2<ImmutableBytesWritable, KeyValue>> kvs = new ArrayList<>();
String[] split = line.split("\\|",-1);
byte[] rowkey = (split[0]+"|"+split[2]).getBytes();//号牌号码+经过时间
kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"hphm".getBytes(),split[0].getBytes()))); //号牌号码
kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"hpzl".getBytes(),split[1].getBytes()))); //号牌种类
kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"jgsj".getBytes(),split[2].getBytes()))); //经过时间
kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"clsd".getBytes(),split[3].getBytes()))); //车辆速度
kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"zgxs".getBytes(),split[4].getBytes()))); //最高限速
kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"dzxs".getBytes(),split[5].getBytes()))); //最低限速
kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"hpys".getBytes(),split[6].getBytes()))); //号牌颜色
kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"cllx".getBytes(),split[7].getBytes()))); //车辆类型
kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"sbbh".getBytes(),split[8].getBytes()))); //设备编号
kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"cdbh".getBytes(),split[9].getBytes()))); //车道编号
kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"fxbh".getBytes(),split[10].getBytes())));//方向编号
kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"csys".getBytes(),split[11].getBytes())));//车身颜色
kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"fxmc".getBytes(),split[12].getBytes())));//方向名称
kvs.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey,family.getBytes(),"xrsj".getBytes(),split[13].getBytes())));//写入时间
return kvs.iterator();
}
});
HBaseConfiguration conf = (HBaseConfiguration) HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", zkQuorum);
conf.set("hbase.zookeeper.property.clientPort", zkPort);
conf.set("hbase.defaults.for.version.skip", "true");
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);
Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(TableName.valueOf(tableName));
Job job = Job.getInstance(conf);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
HRegionLocator regionLocator = new HRegionLocator(TableName.valueOf(tableName),(ClusterConnection) connection);
HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator);//HFileOutputFormat2.configureIncrementalLoadMap(job, table);
flatMapToPair.saveAsNewAPIHadoopFile(outputPath, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class, conf);
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
loader.doBulkLoad(new Path(outputPath), connection.getAdmin(), table, regionLocator);
sc.close();
}
}
POM.xml文件如下:
<?xml version="1.0" encoding="UTF-8"?>
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/ma ... gt%3B
<modelVersion>4.0.0</modelVersion>
<groupId>cn.netcommander.wf</groupId>
<artifactId>CLJS_ETL</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<fastjson.version>1.2.32</fastjson.version>
<hadoop.version>2.7.3</hadoop.version>
<hbase.version>1.2.6</hbase.version>
<jdk.version>1.7</jdk.version>
<kafka.version>0.10.2.1</kafka.version>
<mysql.version>5.1.43</mysql.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<quartz.version>2.3.0</quartz.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.8</scala.version>
<spark.version>2.1.2</spark.version>
</properties>
<dependencies>
<!-- Kafka依赖 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
<!-- Hadoop依赖 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- HBase依赖 -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<!-- Spark依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<!--<dependency> -->
<!--<groupId>cn.netcommander.kpiengine</groupId> -->
<!--<artifactId>template</artifactId> -->
<!--<version>1.0</version> -->
<!--</dependency> -->
<!-- 定时jar包 -->
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.1.7</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5</version>
<configuration>
<source>${jdk.version}</source>
<target>${jdk.version}</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
<!-- 依赖包导出插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.0.1</version>
<configuration>
<outputDirectory>${project.build.directory}\libs</outputDirectory>
<!-- 排除关联的依赖 -->
<excludeTransitive>false</excludeTransitive>
<!-- 除去版本号 -->
<stripVersion>false</stripVersion>
</configuration>
</plugin>
</plugins>
</build>
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
缺少hbase-common的jar吧