【HBase】(九)MapReduce 操作 Hbase

简介: 【HBase】(九)MapReduce 操作 Hbase

文章目录


一、MapReduce从HDFS读取数据存储到HBase中

二、MapReduce从HBase读取数据计算平均年龄并存储到HDFS中


一、MapReduce从HDFS读取数据存储到HBase中


现有HDFS中有一个student.txt文件,格式如下

95002,刘晨,女,19,IS
95017,王风娟,女,18,IS
95018,王一,女,19,IS
95013,冯伟,男,21,CS
95014,王小丽,女,19,CS
95019,邢小丽,女,19,IS
95020,赵钱,男,21,IS
95003,王敏,女,22,MA
95004,张立,男,19,IS
95012,孙花,女,20,CS
95010,孔小涛,男,19,CS
95005,刘刚,男,18,MA
95006,孙庆,男,23,CS
95007,易思玲,女,19,MA
95008,李娜,女,18,CS
95021,周二,男,17,MA
95022,郑明,男,20,MA
95001,李勇,男,20,CS
95011,包小柏,男,18,MA
95009,梦圆圆,女,18,MA
95015,王君,男,18,MA


将HDFS上的这个文件里面的数据写入到HBase数据块中


MapReduce实现代码如下:

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ReadHDFSDataToHbaseMR extends Configured implements Tool{
    public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new ReadHDFSDataToHbaseMR(), args);
        System.exit(run);
    }
    @Override
    public int run(String[] arg0) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        conf.set("fs.defaultFS", "hdfs://myha01/");
        conf.set("hbase.zookeeper.quorum", "hadoop1:2181,hadoop2:2181,hadoop3:2181");
        System.setProperty("HADOOP_USER_NAME", "hadoop");
        FileSystem fs = FileSystem.get(conf);
//        conf.addResource("config/core-site.xml");
//        conf.addResource("config/hdfs-site.xml");
        Job job = Job.getInstance(conf);
        job.setJarByClass(ReadHDFSDataToHbaseMR.class);
        job.setMapperClass(HDFSToHbaseMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        TableMapReduceUtil.initTableReducerJob("student", HDFSToHbaseReducer.class, job,null,null,null,null,false);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Put.class);
        Path inputPath = new Path("/student/input/");
        Path outputPath = new Path("/student/output/");
        if(fs.exists(outputPath)) {
            fs.delete(outputPath,true);
        }
        FileInputFormat.addInputPath(job, inputPath);
        FileOutputFormat.setOutputPath(job, outputPath);
        boolean isDone = job.waitForCompletion(true);
        return isDone ? 0 : 1;
    }
    public static class HDFSToHbaseMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {    
            context.write(value, NullWritable.get());
        }
    }
    /**
     * 95015,王君,男,18,MA
     * */
    public static class HDFSToHbaseReducer extends TableReducer<Text, NullWritable, NullWritable>{
        @Override
        protected void reduce(Text key, Iterable<NullWritable> values,Context context)
                throws IOException, InterruptedException {
            String[] split = key.toString().split(",");
            Put put = new Put(split[0].getBytes());
            put.addColumn("info".getBytes(), "name".getBytes(), split[1].getBytes());
            put.addColumn("info".getBytes(), "sex".getBytes(), split[2].getBytes());
            put.addColumn("info".getBytes(), "age".getBytes(), split[3].getBytes());
            put.addColumn("info".getBytes(), "department".getBytes(), split[4].getBytes());
            context.write(NullWritable.get(), put);
        }
    }
}



二、MapReduce从HBase读取数据计算平均年龄并存储到HDFS中


import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ReadHbaseDataToHDFS extends Configured implements Tool{
    public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new ReadHbaseDataToHDFS(), args);
        System.exit(run);
    }
    @Override
    public int run(String[] arg0) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        conf.set("fs.defaultFS", "hdfs://myha01/");
        conf.set("hbase.zookeeper.quorum", "hadoop1:2181,hadoop2:2181,hadoop3:2181");
        System.setProperty("HADOOP_USER_NAME", "hadoop");
        FileSystem fs = FileSystem.get(conf);
//        conf.addResource("config/core-site.xml");
//        conf.addResource("config/hdfs-site.xml");
        Job job = Job.getInstance(conf);
        job.setJarByClass(ReadHbaseDataToHDFS.class);
        // 取对业务有用的数据 info,age
        Scan scan = new Scan();
        scan.addColumn("info".getBytes(), "age".getBytes());
        TableMapReduceUtil.initTableMapperJob(
                "student".getBytes(), // 指定表名
                scan, // 指定扫描数据的条件
                HbaseToHDFSMapper.class, // 指定mapper class
                Text.class,     // outputKeyClass mapper阶段的输出的key的类型
                IntWritable.class, // outputValueClass mapper阶段的输出的value的类型
                job, // job对象
                false
                );
        job.setReducerClass(HbaseToHDFSReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);
        Path outputPath = new Path("/student/avg/");
        if(fs.exists(outputPath)) {
            fs.delete(outputPath,true);
        }
        FileOutputFormat.setOutputPath(job, outputPath);
        boolean isDone = job.waitForCompletion(true);
        return isDone ? 0 : 1;
    }
    public static class HbaseToHDFSMapper extends TableMapper<Text, IntWritable>{
        Text outKey = new Text("age");
        IntWritable outValue = new IntWritable();
        // key是hbase中的行键
        // value是hbase中的所行键的所有数据
        @Override
        protected void map(ImmutableBytesWritable key, Result value,Context context)
                throws IOException, InterruptedException {
            boolean isContainsColumn = value.containsColumn("info".getBytes(), "age".getBytes());
            if(isContainsColumn) {
                List<Cell> listCells = value.getColumnCells("info".getBytes(), "age".getBytes());
                System.out.println("listCells:\t"+listCells);
                Cell cell = listCells.get(0);
                System.out.println("cells:\t"+cell);
                byte[] cloneValue = CellUtil.cloneValue(cell);
                String ageValue = Bytes.toString(cloneValue);
                outValue.set(Integer.parseInt(ageValue));
                context.write(outKey,outValue);
            }
        }
    }
    public static class HbaseToHDFSReducer extends Reducer<Text, IntWritable, Text, DoubleWritable>{
        DoubleWritable outValue = new DoubleWritable();
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,Context context)
                throws IOException, InterruptedException {
            int count = 0;
            int sum = 0;
            for(IntWritable value : values) {
                count++;
                sum += value.get();
            }
            double avgAge = sum * 1.0 / count;
            outValue.set(avgAge);
            context.write(key, outValue);
        }
    }
}
相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
7月前
|
分布式计算 资源调度 Hadoop
HBase表数据的读、写操作与综合操作
HBase表数据的读、写操作与综合操作
99 0
|
7月前
|
分布式计算 Hadoop Shell
熟悉常用的HBase操作
熟悉常用的HBase操作
170 3
熟悉常用的HBase操作
|
4月前
|
分布式计算 大数据 分布式数据库
"揭秘HBase MapReduce高效数据处理秘诀:四步实战攻略,让你轻松玩转大数据分析!"
【8月更文挑战第17天】大数据时代,HBase以高性能、可扩展性成为关键的数据存储解决方案。结合MapReduce分布式计算框架,能高效处理HBase中的大规模数据。本文通过实例展示如何配置HBase集群、编写Map和Reduce函数,以及运行MapReduce作业来计算HBase某列的平均值。此过程不仅限于简单的统计分析,还可扩展至更复杂的数据处理任务,为企业提供强有力的大数据技术支持。
83 1
|
4月前
|
存储 分布式计算 分布式数据库
《HBase MapReduce之旅:我的学习笔记与心得》——跟随我的步伐,一同探索HBase世界,揭开MapReduce的神秘面纱,分享那些挑战与收获,让你在数据的海洋里畅游无阻!
【8月更文挑战第17天】HBase是Apache顶级项目,作为Bigtable的开源版,它是一个非关系型、分布式数据库,具备高可扩展性和性能。结合HDFS存储和MapReduce计算框架,以及Zookeeper协同服务,HBase支持海量数据高效管理。MapReduce通过将任务拆解并在集群上并行执行,极大提升处理速度。学习HBase MapReduce涉及理解其数据模型、编程模型及应用实践,虽然充满挑战,但收获颇丰,对职业发展大有裨益。
56 0
|
5月前
|
DataWorks 数据管理 大数据
DataWorks操作报错合集之在连接HBase时出现超时问题,该怎么解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
6月前
|
Java 大数据 API
【大数据】HDFS、HBase操作教程(含指令和JAVA API)
【大数据】HDFS、HBase操作教程(含指令和JAVA API)
156 0
【大数据】HDFS、HBase操作教程(含指令和JAVA API)
|
7月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
151 0
|
7月前
|
SQL 消息中间件 Kafka
实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
105 0
|
7月前
|
分布式计算 Java Hadoop
MapReduce编程:检索特定群体搜索记录和定义分片操作
MapReduce编程:检索特定群体搜索记录和定义分片操作
74 0
|
7月前
|
缓存 分布式计算 Java
MapReduce编程:join操作和聚合操作
MapReduce编程:join操作和聚合操作
129 0