报错如下:
AttemptID:attempt_1398735110766_0055_r_000000_0 Timed out after 600 secs
Error: java.io.IOException: Communications link failure during rollback(). Transaction resolution unknown.
at org.apache.hadoop.mapreduce.lib.db.DBOutputFormat$DBRecordWriter.close(DBOutputFormat.java:110)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.close(ReduceTask.java:550)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:629)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:416)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)
java代码如下:
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import bbs_post.EJob;
/**
* 将mapreduce的结果数据写入mysql中
*/
public class WriteDataToMysql {
/**
* 重写DBWritable TblsWritable需要向mysql中写入数据
*/
public static class TblsWritable implements Writable, DBWritable {
String fid, create_time, num;
public TblsWritable(String fid, String create_time, String num) {
this.fid = fid;
this.create_time = create_time;
this.num = num;
}
@Override
public void write(PreparedStatement statement) throws SQLException {
statement.setString(1, this.fid);
statement.setString(2, this.create_time);
statement.setString(3, this.num);
}
@Override
public void readFields(ResultSet resultSet) throws SQLException {
this.fid = resultSet.getString(1);
this.create_time = resultSet.getString(2);
this.num = resultSet.getString(3);
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.fid);
out.writeUTF(this.create_time);
out.writeUTF(this.num);
}
@Override
public void readFields(DataInput in) throws IOException {
this.fid = in.readUTF();
this.create_time = in.readUTF();
this.num = in.readUTF();
}
// public String toString() {
// return new String(this.fid + " " + this.create_time);
// }
}
public static class ConnMysqlMapper1 extends
Mapper<LongWritable, Text, Text, Text>
// TblsRecord是自定义的类型,也就是上面重写的DBWritable类
{
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// <首字母偏移量,该行内容>接收进来,然后处理value,将abc和x作为map的输出
// key对于本程序没有太大的意义,没有使用
// System.out.println("ConnMysqlMapper1");
String[] line = value.toString().split(" ");
String val = line[1] + " " + line[2];
// System.out.println(val);
context.write(new Text(line[0]), new Text(val));
}
}
public static class ConnMysqlReducer1 extends
Reducer<Text, Text, TblsWritable, TblsWritable> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// 接收到的key value对即为要输入数据库的字段,所以在reduce中:
// wirte的第一个参数,类型是自定义类型TblsWritable,利用key和value将其组合成TblsWritable,然后等待写入数据库
// wirte的第二个参数,wirte的第一个参数已经涵盖了要输出的类型,所以第二个类型没有用,设为null
// System.out.println("ConnMysqlReducer1");
// System.out.println("key: " + key + " values: " + values.toString());
String[] line;
// for (Text val : values) {
// System.out.println(val.toString());
// line = val.toString().split(" ");
// context.write(new TblsWritable(line[1], line[2], line[3]), null);
// }
for (Iterator<Text> itr = values.iterator(); itr.hasNext();) {
// System.out.println(itr.next().toString());
line = itr.next().toString().split(" ");
// System.out.println(line[1]+" "+line[2]+" "+line[3]);
context.write(
new TblsWritable(key.toString(), line[0], line[1]),
null);
}
// System.exit(0);
}
}
public static void main(String args[]) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
// mysql的jdbc驱动
DistributedCache.addFileToClassPath(new Path("hdfs://192.168.1.181:9000/data/mysql-connector-java-5.1.18-bin.jar"),conf);
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver","jdbc:mysql://192.168.1.182:3306/meta", "bbs", "123456");
Job job = new Job(conf, "WriteDataToMysql");
job.setJarByClass(WriteDataToMysql.class);
job.setMapperClass(ConnMysqlMapper1.class);
job.setReducerClass(ConnMysqlReducer1.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(DBOutputFormat.class);
System.out.println("Job start!");
FileInputFormat.addInputPath(job, new Path("hdfs://<span></span>192.168.1.1891<span></span>:9000/data/bbs_post/computeDir/compute-20140429143053/part-r-00000"));
DBOutputFormat.setOutput(job, "test_out3", "fid", "create_time", "num");
// System.exit(0);
if (job.waitForCompletion(true)) {
System.out.println("Data to mysql ok!");
} else {
System.out.println("Error, please check your code or data!");
}
}
}
通过这种方式向mysql写数据,如果数据量比较小就不会报错,如果数据量比较大就会报错。
看了下日志,数据量小的话是一台datanode向mysql写数据,如果数据量比较大就有多个datanode向mysql写数据,此时就会报错。麻烦帮我看一下是怎么回事,谢谢!
hadoop写数据库是通过事务实现的,当事务中的sql有一个出现错误就会报rollback错误。
导致该错误的情况有一些几种:
1 文件错误。由于Hdfs该文件的失误操作会导致文件损坏。
2 数据库问题。
3 网络问题。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。