MapReduce on Hbase

简介: org.apache.hadoop.hbase.mapreduceTableMapper  TableReducer一个region对应一个mapimport java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfigura


org.apache.hadoop.hbase.mapreduce


TableMapper  TableReducer


一个region对应一个map

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;

public class HbaseMR {

    public class MyMapper extends TableMapper<Text, Text> {

        @Override
        protected void map(ImmutableBytesWritable key, Result value,
                Context context) throws IOException, InterruptedException {
            // key代表rowkey
            Text k = new Text(Bytes.toString(key.get()));
            Text v = new Text(Bytes.toString(value.getValue(
                    "basicinfo".getBytes(), "age".getBytes())));

            context.write(v, k);

        }

    }

    public class MyReducer extends TableReducer<Text, Text, Text> {

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            Put put = new Put(Bytes.toBytes(key.toString()));
            for (Text value : values) {
                put.add(Bytes.toBytes("f1"), Bytes.toBytes(value.toString()),
                        Bytes.toBytes(value.toString()));
            }
            context.write(null, put);
        }

    }

    public static void main(String[] args) {
        Configuration conf=    HBaseConfiguration.create();
        try {
            Job job=new Job(conf, "mapreduce on hbase");
            job.setJarByClass(HbaseMR.class);
            Scan scan=new Scan();
            scan.setCaching(1000);//
            TableMapReduceUtil.initTableMapperJob("students", scan, MyMapper.class, Text.class, Text.class, job);
            TableMapReduceUtil.initTableReducerJob("student-age",  MyReducer.class,  job);
            job.waitForCompletion(true);
        } catch (Exception e) {
            
            e.printStackTrace();
        }
    }

}


本文出自 “点滴积累” 博客,请务必保留此出处http://tianxingzhe.blog.51cto.com/3390077/1699284

相关实践学习
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
5月前
|
存储 分布式计算 分布式数据库
对给定的数据利用MapReduce编程实现数据的清洗和预处理,编程实现数据存储到HBase数据库,实现数据的增删改查操作接口
对给定的数据利用MapReduce编程实现数据的清洗和预处理,编程实现数据存储到HBase数据库,实现数据的增删改查操作接口
29 0
|
6月前
|
分布式计算 分布式数据库 Hbase
99 MapReduce操作Hbase
99 MapReduce操作Hbase
43 0
|
分布式计算 分布式数据库 Hbase
当HBase遇上MapReduce头歌答案
当HBase遇上MapReduce头歌答案
521 0
|
存储 分布式计算 分布式数据库
【HBase】(九)MapReduce 操作 Hbase
【HBase】(九)MapReduce 操作 Hbase
374 0
|
分布式计算 分布式数据库 Apache
|
分布式计算 分布式数据库 Hbase
hbase 学习(十二)非mapreduce生成Hfile,然后导入hbase当中
最近一个群友的boss让研究hbase,让hbase的入库速度达到5w+/s,这可愁死了,4台个人电脑组成的集群,多线程入库调了好久,速度也才1w左右,于是想到“非mapreduce生成Hfile,然后导入hbase当中”。
1798 0