当HBase遇上MapReduce头歌答案

简介: 当HBase遇上MapReduce头歌答案

第1关:HBase的MapReduce快速入门

package com.processdata;
import java.io.IOException;
import java.util.List;
import java.util.Scanner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apdplat.word.WordSegmenter;
import org.apdplat.word.segmentation.Word;
import com.util.HBaseUtil;
import com.vdurmont.emoji.EmojiParser;
/**
 * 词频统计
 *
 */
public class WorldCountMapReduce extends Configured implements Tool {
    private static class MyMapper extends TableMapper<Text, IntWritable> {
        private static byte[] family = "comment_info".getBytes();
        private static byte[] column = "content".getBytes();
        @Override
        protected void map(ImmutableBytesWritable rowKey, Result result, Context context) {
            try {
                byte[] value = result.getValue(family, column);
                String content = new String(value, "utf-8");
                String[] split = content.split(" ");
                for (String str : split) {
                    Text text = new Text(str);
                    IntWritable v = new IntWritable(1);
                    context.write(text, v);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    private static class MyReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
        private static byte[] family = "word_info".getBytes();
        private static byte[] column = "count".getBytes();
        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context) {
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            Put put = new Put(Bytes.toBytes(key.toString()));
            put.addColumn(family, column, Bytes.toBytes(sum));
            try {
                context.write(null, put);
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    @Override
    public int run(String[] args) throws Exception {
        // 配置Job
        /********** Begin *********/
        // 创建Conf对象
        Configuration conf = HBaseConfiguration.create(getConf());
        String tablename = args[0]; // 表名
        String targetTable = args[1]; // 目标表
        // 获取到Job对象
        Job job = Job.getInstance(conf);
        // 创建Scan对象
        Scan scan = new Scan();
        // 通过Hbase工具类提交数据
        TableMapReduceUtil.initTableMapperJob(tablename, scan, MyMapper.class, Text.class, IntWritable.class, job);
        TableMapReduceUtil.initTableReducerJob(targetTable, MyReducer.class, job);
        // 开始提交数据
        job.waitForCompletion(true);
        return 0;
        /********** End *********/
    }
}


第2关:HBase的MapReduce使用

package com.processdata;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
/**
 * 词频统计
 *
 */
public class WorldCountMapReduce2 extends Configured implements Tool {
    private static class MyMapper extends Mapper<Object, Text, Text, IntWritable> {
        @Override
        public void map(Object object, Text value, Context context) throws IOException, InterruptedException {
            /********** Begin *********/
            // 根据题意,我们需要根据空格对指定数据进行拆分
            String[] split = value.toString().split(" ");
            // 循环数组,对值进行分类
            for (String str : split) {
                Text text = new Text(str.getBytes());
                IntWritable v = new IntWritable(1);
                context.write(text, v);
            }
            /********** End *********/
        }
    }
    private static class MyReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
        private static byte[] family = "word_info".getBytes();
        private static byte[] column = "count".getBytes();
        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            /********** Begin *********/
            int sum = 0; // 用于统计
             //  循环Map中分类的值,求和
            for (IntWritable value : values) {
                sum += value.get();
            }
             //  将key和value进行聚和
            Put put = new Put(Bytes.toBytes(key.toString()));
            put.addColumn(family, column, Bytes.toBytes(sum));
             //  通过文件方式将其输出
            context.write(null, put);
            /********** End *********/
        }
    }
    @Override
    public int run(String[] args) throws Exception {
         // 配置Job
        /********** Begin *********/
         // 配置
        Configuration conf = HBaseConfiguration.create(getConf());
        String file = args[0]; //  输入文件
        String targetTable = args[1]; //  输出表
        Job job = Job.getInstance(conf);
         //  Map的Key的输入类型
        job.setMapOutputKeyClass(Text.class);
         //  Map的Value的输入类型
        job.setMapOutputValueClass(IntWritable.class);
         //  需要执行的MapReduce类
        job.setJarByClass(WorldCountMapReduce2.class);
         //  文件输入格式
        FileInputFormat.addInputPath(job, new Path(file));
         //  设置Mapper类
        job.setMapperClass(MyMapper.class);
         //  开始执行任务
        TableMapReduceUtil.initTableReducerJob(targetTable, MyReducer.class, job);
        job.waitForCompletion(true);
        return 0;
        /********** End *********/
    }
}
相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
3月前
|
分布式计算 大数据 分布式数据库
"揭秘HBase MapReduce高效数据处理秘诀:四步实战攻略,让你轻松玩转大数据分析!"
【8月更文挑战第17天】大数据时代,HBase以高性能、可扩展性成为关键的数据存储解决方案。结合MapReduce分布式计算框架,能高效处理HBase中的大规模数据。本文通过实例展示如何配置HBase集群、编写Map和Reduce函数,以及运行MapReduce作业来计算HBase某列的平均值。此过程不仅限于简单的统计分析,还可扩展至更复杂的数据处理任务,为企业提供强有力的大数据技术支持。
50 1
|
3月前
|
存储 分布式计算 分布式数据库
《HBase MapReduce之旅:我的学习笔记与心得》——跟随我的步伐,一同探索HBase世界,揭开MapReduce的神秘面纱,分享那些挑战与收获,让你在数据的海洋里畅游无阻!
【8月更文挑战第17天】HBase是Apache顶级项目,作为Bigtable的开源版,它是一个非关系型、分布式数据库,具备高可扩展性和性能。结合HDFS存储和MapReduce计算框架,以及Zookeeper协同服务,HBase支持海量数据高效管理。MapReduce通过将任务拆解并在集群上并行执行,极大提升处理速度。学习HBase MapReduce涉及理解其数据模型、编程模型及应用实践,虽然充满挑战,但收获颇丰,对职业发展大有裨益。
41 0
|
6月前
|
存储 分布式计算 分布式数据库
对给定的数据利用MapReduce编程实现数据的清洗和预处理,编程实现数据存储到HBase数据库,实现数据的增删改查操作接口
对给定的数据利用MapReduce编程实现数据的清洗和预处理,编程实现数据存储到HBase数据库,实现数据的增删改查操作接口
51 0
|
分布式计算 分布式数据库 Hbase
99 MapReduce操作Hbase
99 MapReduce操作Hbase
86 0
|
存储 分布式计算 Hadoop
HBase MapReduce_3 | 学习笔记
快速学习 HBase MapReduce_3
115 0
HBase MapReduce_3 | 学习笔记
|
存储 缓存 分布式计算
HBase MapReduce_4 | 学习笔记
快速学习 HBase MapReduce_4
101 0
|
6月前
|
分布式计算 Hadoop
Hadoop系列 mapreduce 原理分析
Hadoop系列 mapreduce 原理分析
75 1
|
23天前
|
分布式计算 资源调度 Hadoop
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
75 3
|
5月前
|
分布式计算 Hadoop Java
Hadoop MapReduce编程
该教程指导编写Hadoop MapReduce程序处理天气数据。任务包括计算每个城市ID的最高、最低气温、气温出现次数和平均气温。在读取数据时需忽略表头,且数据应为整数。教程中提供了环境变量设置、Java编译、jar包创建及MapReduce执行的步骤说明,但假设读者已具备基础操作技能。此外,还提到一个扩展练习,通过分区功能将具有相同尾数的数字分组到不同文件。
59 1
|
5月前
|
数据采集 SQL 分布式计算