Mapreduce构建hbase二级索引

简介: import java.io.IOException;import java.util.HashMap;import java.util.Map;import java.util.Set;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;imp
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;

public class IndexBuilder {

    private class MyMapper extends TableMapper<ImmutableBytesWritable, Put> {
        private Map<byte[], ImmutableBytesWritable> indexes = new HashMap<byte[], ImmutableBytesWritable>();
        private String columnFamily;

        @Override
        protected void map(ImmutableBytesWritable key, Result value,
                Context context) throws IOException, InterruptedException {

            Set<byte[]> keys = indexes.keySet();
            for (byte[] k : keys) {

                ImmutableBytesWritable indexTableName = indexes.get(k);
                byte[] val = value.getValue(Bytes.toBytes(columnFamily), k);
                Put put = new Put(val);// 索引表的rowkey为原始表的值
                put.add(Bytes.toBytes("f1"), Bytes.toBytes("id"), key.get());// 索引表的内容为原始表的rowkey
                context.write(indexTableName, put);

            }

        }

        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {
            Configuration conf = context.getConfiguration();
            String tableName = conf.get("tableName");
            columnFamily = conf.get("columnFamily");
            String[] qualifiers = conf.getStrings("qualifiers");
            // indexes的key为列名,value为索引表名
            for (String q : qualifiers) {
                indexes.put(
                        Bytes.toBytes(q),
                        new ImmutableBytesWritable(Bytes.toBytes(tableName
                                + "-" + q)));

            }

        }

    }

    public static void main(String[] args) throws IOException,
            ClassNotFoundException, InterruptedException {
        Configuration conf = HBaseConfiguration.create();

        String[] otherargs = new GenericOptionsParser(conf, args)
                .getRemainingArgs();// 去除掉没有用的命令行参数
        // 输入参数:表名,列族名,列名
        if (otherargs.length < 3) {
            System.exit(-1);
        }
        String tableName = otherargs[0];
        String columnFamily = otherargs[1];

        conf.set("tableName", tableName);
        conf.set("columnFamily", columnFamily);

        String[] qualifiers = new String[otherargs.length - 2];
        for (int i = 0; i < qualifiers.length; i++) {

            qualifiers[i] = otherargs[i + 2];
        }
        conf.setStrings("qualifiers", qualifiers);
        Job job = new Job(conf, tableName);
        job.setJarByClass(IndexBuilder.class);
        job.setMapperClass(MyMapper.class);
        job.setNumReduceTasks(0);
        job.setInputFormatClass(TableInputFormat.class);
        // 可以输出多张表
        job.setOutputFormatClass(MultiTableOutputFormat.class);
        Scan scan = new Scan();
        scan.setCaching(1000);
        TableMapReduceUtil.initTableMapperJob(tableName, scan, MyMapper.class,
                ImmutableBytesWritable.class, Put.class, job);
        job.waitForCompletion(true);

    }

}


本文出自 “点滴积累” 博客,请务必保留此出处http://tianxingzhe.blog.51cto.com/3390077/1699774

相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
3月前
|
存储 分布式计算 分布式数据库
深入理解Apache HBase:构建大数据时代的基石
在大数据时代,数据的存储和管理成为了企业面临的一大挑战。随着数据量的急剧增长和数据结构的多样化,传统的关系型数据库(如RDBMS)逐渐显现出局限性。
494 12
|
4月前
|
分布式计算 大数据 分布式数据库
"揭秘HBase MapReduce高效数据处理秘诀:四步实战攻略,让你轻松玩转大数据分析!"
【8月更文挑战第17天】大数据时代,HBase以高性能、可扩展性成为关键的数据存储解决方案。结合MapReduce分布式计算框架,能高效处理HBase中的大规模数据。本文通过实例展示如何配置HBase集群、编写Map和Reduce函数,以及运行MapReduce作业来计算HBase某列的平均值。此过程不仅限于简单的统计分析,还可扩展至更复杂的数据处理任务,为企业提供强有力的大数据技术支持。
78 1
|
4月前
|
存储 分布式计算 分布式数据库
《HBase MapReduce之旅:我的学习笔记与心得》——跟随我的步伐,一同探索HBase世界,揭开MapReduce的神秘面纱,分享那些挑战与收获,让你在数据的海洋里畅游无阻!
【8月更文挑战第17天】HBase是Apache顶级项目,作为Bigtable的开源版,它是一个非关系型、分布式数据库,具备高可扩展性和性能。结合HDFS存储和MapReduce计算框架,以及Zookeeper协同服务,HBase支持海量数据高效管理。MapReduce通过将任务拆解并在集群上并行执行,极大提升处理速度。学习HBase MapReduce涉及理解其数据模型、编程模型及应用实践,虽然充满挑战,但收获颇丰,对职业发展大有裨益。
50 0
|
5月前
|
存储 Java 分布式数据库
HBase构建图片视频数据的统一存储检索
HBase构建图片视频数据的统一存储检索
|
分布式计算 分布式数据库 Docker
docker 构建 hbase 容器
docker 构建 hbase 容器
297 1
|
7月前
|
存储 分布式计算 分布式数据库
对给定的数据利用MapReduce编程实现数据的清洗和预处理,编程实现数据存储到HBase数据库,实现数据的增删改查操作接口
对给定的数据利用MapReduce编程实现数据的清洗和预处理,编程实现数据存储到HBase数据库,实现数据的增删改查操作接口
61 0
|
7月前
|
存储 NoSQL 分布式数据库
Hbase的三种索引_全局索引,覆盖索引,本地索引(七)
Hbase的三种索引_全局索引,覆盖索引,本地索引(七)
207 0
|
7月前
|
SQL 分布式数据库 HIVE
Hbase二级索引_Hive on Hbase 及phoenix详解
Hbase二级索引_Hive on Hbase 及phoenix详解
85 0
|
分布式计算 分布式数据库 Hbase
99 MapReduce操作Hbase
99 MapReduce操作Hbase
104 0
|
SQL 分布式数据库 Apache

热门文章

最新文章