package com.zhiyou100.test;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MRFromHBase {
public static class MrFromHBaseMap extends TableMapper<Text, Text> {
private Text outputKey = new Text();
private Text outputValue = new Text();
private Cell cell;
private String rowKey;
private String columnFamily;
private String columnQualify;
private String columnValue;
@Override
protected void map(ImmutableBytesWritable key, Result value,
Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
throws IOException, InterruptedException {
// 从Result 中获取数据输出初速
CellScanner scanner = value.cellScanner();
while (scanner.advance()) {
cell = scanner.current();
rowKey = Bytes.toString(CellUtil.cloneRow(cell));
columnFamily = Bytes.toString(CellUtil.cloneFamily(cell));
columnQualify = Bytes.toString(CellUtil.cloneQualifier(cell));
columnValue = Bytes.toString(CellUtil.cloneValue(cell));
outputKey.set(rowKey);
outputValue.set("columnFamily:" + columnFamily + "columnQualify:" + columnQualify + "columnValue:"
+ columnValue);
context.write(outputKey, outputValue);
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration =HBaseConfiguration.create();
Job job =Job.getInstance(configuration);
job.setJarByClass(MRFromHBase.class);
job.setJobName("mapreduce 从hbase中读取数据");
//不需要reducer
job.setNumReduceTasks(0);
Scan scan =new Scan();
TableMapReduceUtil.initTableMapperJob("bd17:fromjava", scan,MrFromHBaseMap.class, Text.class, Text.class, job);
//设置输出路径
Path outputDir =new Path("/fromhbase");
outputDir.getFileSystem(configuration).delete(outputDir, true);
FileOutputFormat.setOutputPath(job, outputDir);
System.exit(job.waitForCompletion(true)?1:0);
}
}