MapReduce InputFormat——DBInputFormat

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 一、背景      为了方便MapReduce直接访问关系型数据库(Mysql,Oracle),Hadoop提供了DBInputFormat和DBOutputFormat两个类。通过 DBInputFormat类把数据库表数据读入到HDFS,根据DBOutputFormat类把MapReduce产生的结果集导入到数据库表中。

一、背景

     为了方便MapReduce直接访问关系型数据库(Mysql,Oracle),Hadoop提供了DBInputFormat和DBOutputFormat两个类。通过

DBInputFormat类把数据库表数据读入到HDFS,根据DBOutputFormat类把MapReduce产生的结果集导入到数据库表中。

二、技术细节

1、DBInputFormat(Mysql为例),先创建表:

CREATE TABLE studentinfo (
  id INTEGER NOT NULL PRIMARY KEY,
  name VARCHAR(32) NOT NULL);
2、由于0.20版本对DBInputFormat和DBOutputFormat支持不是很好,该例用了0.19版本来说明这两个类的用法。
3、DBInputFormat用法如下:
[java] view plain copy
  1. public class DBInput {  
  2.    // DROP TABLE IF EXISTS `hadoop`.`studentinfo`;  
  3.    // CREATE TABLE studentinfo (  
  4.    // id INTEGER NOT NULL PRIMARY KEY,  
  5.    // name VARCHAR(32) NOT NULL);  
  6.   
  7.    public static class StudentinfoRecord implements Writable, DBWritable {  
  8.      int id;  
  9.      String name;  
  10.      public StudentinfoRecord() {  
  11.   
  12.      }  
  13.      public void readFields(DataInput in) throws IOException {  
  14.         this.id = in.readInt();  
  15.         this.name = Text.readString(in);  
  16.      }  
  17.      public void write(DataOutput out) throws IOException {  
  18.         out.writeInt(this.id);  
  19.         Text.writeString(out, this.name);  
  20.      }  
  21.      public void readFields(ResultSet result) throws SQLException {  
  22.         this.id = result.getInt(1);  
  23.         this.name = result.getString(2);  
  24.      }  
  25.      public void write(PreparedStatement stmt) throws SQLException {  
  26.         stmt.setInt(1this.id);  
  27.         stmt.setString(2this.name);  
  28.      }  
  29.      public String toString() {  
  30.         return new String(this.id + " " + this.name);  
  31.      }  
  32.    }  
  33.    public class DBInputMapper extends MapReduceBase implements  
  34.         Mapper<LongWritable, StudentinfoRecord, LongWritable, Text> {  
  35.      public void map(LongWritable key, StudentinfoRecord value,  
  36.           OutputCollector<LongWritable, Text> collector, Reporter reporter)  
  37.           throws IOException {  
  38.         collector.collect(new LongWritable(value.id), new Text(value  
  39.              .toString()));  
  40.      }  
  41.    }  
  42.    public static void main(String[] args) throws IOException {  
  43.      JobConf conf = new JobConf(DBInput.class);  
  44.      DistributedCache.addFileToClassPath(new Path(  
  45.           "/lib/mysql-connector-java-5.1.0-bin.jar"), conf);  
  46.        
  47.      conf.setMapperClass(DBInputMapper.class);  
  48.      conf.setReducerClass(IdentityReducer.class);  
  49.   
  50.      conf.setMapOutputKeyClass(LongWritable.class);  
  51.      conf.setMapOutputValueClass(Text.class);  
  52.      conf.setOutputKeyClass(LongWritable.class);  
  53.      conf.setOutputValueClass(Text.class);  
  54.        
  55.      conf.setInputFormat(DBInputFormat.class);  
  56.      FileOutputFormat.setOutputPath(conf, new Path("/hua01"));  
  57.      DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",  
  58.           "jdbc:mysql://192.168.3.244:3306/hadoop""hua""hadoop");  
  59.      String[] fields = { "id""name" };  
  60.      DBInputFormat.setInput(conf, StudentinfoRecord.class"studentinfo",  
  61.  null"id", fields);  
  62.   
  63.      JobClient.runJob(conf);  
  64.    }  
  65. }  

a)StudnetinfoRecord类的变量为表字段,实现Writable和DBWritable两个接口。

实现Writable的方法:

[java]  view plain copy
  1. public void readFields(DataInput in) throws IOException {  
  2.        this.id = in.readInt();  
  3.        this.name = Text.readString(in);  
  4.     }  
  5.     public void write(DataOutput out) throws IOException {  
  6.        out.writeInt(this.id);  
  7.        Text.writeString(out, this.name);  
  8.     }  

实现DBWritable的方法:

[java]  view plain copy
  1. public void readFields(ResultSet result) throws SQLException {  
  2.         this.id = result.getInt(1);  
  3.         this.name = result.getString(2);  
  4.      }  
  5.      public void write(PreparedStatement stmt) throws SQLException {  
  6.         stmt.setInt(1this.id);  
  7.         stmt.setString(2this.name);  
  8.      }  

b)读入Mapper的value类型是StudnetinfoRecord。

c)配置如何连入数据库,读出表studentinfo数据。

[java]  view plain copy
  1. DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",  
  2.           "jdbc:mysql://192.168.3.244:3306/hadoop""hua""hadoop");  
  3.      String[] fields = { "id""name" };  
  4.      DBInputFormat.setInput(conf, StudentinfoRecord.class"studentinfo",  null"id", fields);  


4、DBOutputFormat用法如下:
[java]  view plain copy
  1. public class DBOutput {  
  2.   
  3.    public static class StudentinfoRecord implements Writable,  DBWritable {  
  4.      int id;  
  5.      String name;  
  6.      public StudentinfoRecord() {  
  7.   
  8.      }  
  9.      public void readFields(DataInput in) throws IOException {  
  10.         this.id = in.readInt();  
  11.         this.name = Text.readString(in);  
  12.      }  
  13.      public void write(DataOutput out) throws IOException {  
  14.         out.writeInt(this.id);  
  15.         Text.writeString(out, this.name);  
  16.      }  
  17.      public void readFields(ResultSet result) throws SQLException {  
  18.         this.id = result.getInt(1);  
  19.         this.name = result.getString(2);  
  20.      }  
  21.      public void write(PreparedStatement stmt) throws SQLException {  
  22.         stmt.setInt(1this.id);  
  23.         stmt.setString(2this.name);  
  24.      }  
  25.      public String toString() {  
  26.         return new String(this.id + " " + this.name);  
  27.      }  
  28.    }  
  29.      
  30.    public static class MyReducer extends MapReduceBase implements  
  31.         Reducer<LongWritable, Text, StudentinfoRecord, Text> {  
  32.      public void reduce(LongWritable key, Iterator<Text> values,  
  33.           OutputCollector<StudentinfoRecord, Text> output, Reporter  reporter)  
  34.           throws IOException {  
  35.         String[] splits = values.next().toString().split("/t");  
  36.         StudentinfoRecord r = new StudentinfoRecord();  
  37.         r.id = Integer.parseInt(splits[0]);  
  38.         r.name = splits[1];  
  39.         output.collect(r, new Text(r.name));  
  40.      }  
  41.    }  
  42.   
  43.    public static void main(String[] args) throws IOException {  
  44.      JobConf conf = new JobConf(DBOutput.class);  
  45.      conf.setInputFormat(TextInputFormat.class);  
  46.      conf.setOutputFormat(DBOutputFormat.class);  
  47.   
  48.      FileInputFormat.setInputPaths(conf, new Path("/hua/hua.bcp"));  
  49.      DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",  
  50.           "jdbc:mysql://192.168.3.244:3306/hadoop""hua""hadoop");  
  51.      DBOutputFormat.setOutput(conf, "studentinfo""id""name");  
  52.   
  53.   conf.setMapperClass(org.apache.hadoop.mapred.lib.IdentityMapper.class);  
  54.      conf.setReducerClass(MyReducer.class);  
  55.   
  56.      JobClient.runJob(conf);  
  57.    }  
  58.   
  59. }  

a)StudnetinfoRecord类的变量为表字段,实现Writable和DBWritable两个接口,同.DBInputFormat的StudnetinfoRecord类。

b)输出Reducer的key/value类型是StudnetinfoRecord。

c)配置如何连入数据库,输出结果到表studentinfo。

[java]  view plain copy
  1. DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",  
  2.           "jdbc:mysql://192.168.3.244:3306/hadoop""hua""hadoop");  
  3.      DBOutputFormat.setOutput(conf, "studentinfo""id""name");  

三、总结

      运行MapReduce时候报错:java.io.IOException: com.mysql.jdbc.Driver,一般是由于程序找不到mysql驱动包。解决方法是让每个

tasktracker运行MapReduce程序时都可以找到该驱动包。

添加包有两种方式:

1.在每个节点下的${HADOOP_HOME}/lib下添加该包。重启集群,一般是比较原始的方法。

2.a)把包传到集群上: hadoop fs -put mysql-connector-java-5.1.0- bin.jar /lib

b)在mr程序提交job前,添加语句:istributedCache.addFileToClassPath(new Path("/lib/mysql- connector-java- 5.1.0-bin.jar"), conf);

3、虽然API用的是0.19的,但是使用0.20的API一样可用,只是会提示方法已过时而已。

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
6月前
|
分布式计算
MapReduce【自定义InputFormat】
MapReduce【自定义InputFormat】
|
数据采集 分布式计算
34 MAPREDUCE自定义inputFormat
34 MAPREDUCE自定义inputFormat
43 0
|
存储 分布式计算 资源调度
MapReduce框架--InputFormat数据输入--切片优化(11)
MapReduce框架--InputFormat数据输入--切片优化(11)
300 0
MapReduce框架--InputFormat数据输入--切片优化(11)
|
存储 分布式计算
MapReduce源码分析之InputFormat
        InputFormat描述了一个Map-Reduce作业中的输入规范。Map-Reduce框架依靠作业的InputFormat实现以下内容:         1、校验作业的输入规范;         2、分割输入文件(可能为多个),生成逻辑输入分片InputSplit(往往为多个),每个输入分片InputSplit接着被分配给单独的Mapper;         3、提供记录读取器RecordReader的实现,RecordReader被用于从逻辑输入分片InputSplit收集输入记录,这些输入记录会被交由Mapper处理。
1070 0
|
存储 分布式计算 算法
MapReduce InputFormat之FileInputFormat
一:简单认识InputFormat类 InputFormat主要用于描述输入数据的格式,提供了以下两个功能:          1)、数据切分,按照某个策略将输入数据且分成若干个split,以便确定Map Task的个数即Mapper的个数,在MapReduce框架中,一个split就意味着需要一个Map Task;         2)为Mapper提供输入数据,即给定一个split,(使用其中的RecordReader对象)将之解析为一个个的key/value键值对。
1070 0
|
6月前
|
分布式计算 Hadoop
Hadoop系列 mapreduce 原理分析
Hadoop系列 mapreduce 原理分析
81 1
|
1月前
|
分布式计算 资源调度 Hadoop
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
98 3
|
5月前
|
分布式计算 Hadoop Java
Hadoop MapReduce编程
该教程指导编写Hadoop MapReduce程序处理天气数据。任务包括计算每个城市ID的最高、最低气温、气温出现次数和平均气温。在读取数据时需忽略表头,且数据应为整数。教程中提供了环境变量设置、Java编译、jar包创建及MapReduce执行的步骤说明,但假设读者已具备基础操作技能。此外,还提到一个扩展练习,通过分区功能将具有相同尾数的数字分组到不同文件。
64 1
|
5月前
|
数据采集 SQL 分布式计算
下一篇
无影云桌面