开发者社区> 问答> 正文

通过mapreduce向mysql大量写数据时报错

通过mapreduce向mysql大量写数据时报错Communications link failure during rollback(). Transaction resolution unknown.:报错

报错如下:
AttemptID:attempt_1398735110766_0055_r_000000_0 Timed out after 600 secs Error: java.io.IOException: Communications link failure during rollback(). Transaction resolution unknown.

    at org.apache.hadoop.mapreduce.lib.db.DBOutputFormat$DBRecordWriter.close(DBOutputFormat.java:110)

    at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.close(ReduceTask.java:550)

    at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:629)

    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)

    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)

    at java.security.AccessController.doPrivileged(Native Method)

    at javax.security.auth.Subject.doAs(Subject.java:416)

    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)

    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)
java代码如下:

import java.io.DataInput; import java.io.DataOutput; import java.io.File; import java.io.IOException; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Iterator;

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat; import org.apache.hadoop.mapreduce.lib.db.DBWritable; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import bbs_post.EJob;

/** * 将mapreduce的结果数据写入mysql中 / public class WriteDataToMysql { /* * 重写DBWritable TblsWritable需要向mysql中写入数据 */ public static class TblsWritable implements Writable, DBWritable { String fid, create_time, num;

	public TblsWritable(String fid, String create_time, String num) {
		this.fid = fid;
		this.create_time = create_time;
		this.num = num;
	}

	@Override
	public void write(PreparedStatement statement) throws SQLException {
		statement.setString(1, this.fid);
		statement.setString(2, this.create_time);
		statement.setString(3, this.num);
	}

	@Override
	public void readFields(ResultSet resultSet) throws SQLException {
		this.fid = resultSet.getString(1);
		this.create_time = resultSet.getString(2);
		this.num = resultSet.getString(3);
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(this.fid);
		out.writeUTF(this.create_time);
		out.writeUTF(this.num);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.fid = in.readUTF();
		this.create_time = in.readUTF();
		this.num = in.readUTF();
	}

	// public String toString() {
	// return new String(this.fid + "	" + this.create_time);
	// }
}

public static class ConnMysqlMapper1 extends
		Mapper<LongWritable, Text, Text, Text>
// TblsRecord是自定义的类型,也就是上面重写的DBWritable类
{
	public void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		// <首字母偏移量,该行内容>接收进来,然后处理value,将abc和x作为map的输出
		// key对于本程序没有太大的意义,没有使用

// System.out.println("ConnMysqlMapper1"); String[] line = value.toString().split(" "); String val = line[1] + " " + line[2]; // System.out.println(val); context.write(new Text(line[0]), new Text(val)); } }

public static class ConnMysqlReducer1 extends
		Reducer<Text, Text, TblsWritable, TblsWritable> {
	public void reduce(Text key, Iterable<Text> values, Context context)
			throws IOException, InterruptedException {
		// 接收到的key value对即为要输入数据库的字段,所以在reduce中:
		// wirte的第一个参数,类型是自定义类型TblsWritable,利用key和value将其组合成TblsWritable,然后等待写入数据库
		// wirte的第二个参数,wirte的第一个参数已经涵盖了要输出的类型,所以第二个类型没有用,设为null

// System.out.println("ConnMysqlReducer1"); // System.out.println("key: " + key + " values: " + values.toString()); String[] line; // for (Text val : values) { // System.out.println(val.toString()); // line = val.toString().split(" "); // context.write(new TblsWritable(line[1], line[2], line[3]), null); // } for (Iterator<Text> itr = values.iterator(); itr.hasNext();) { // System.out.println(itr.next().toString()); line = itr.next().toString().split(" "); // System.out.println(line[1]+" "+line[2]+" "+line[3]); context.write( new TblsWritable(key.toString(), line[0], line[1]), null); } // System.exit(0); } }

public static void main(String args[]) throws IOException,
		InterruptedException, ClassNotFoundException {

	Configuration conf = new Configuration();

	// mysql的jdbc驱动
	DistributedCache.addFileToClassPath(new Path("hdfs://192.168.1.181:9000/data/mysql-connector-java-5.1.18-bin.jar"),conf);
	DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver","jdbc:mysql://192.168.1.182:3306/meta", "bbs", "123456");

	Job job = new Job(conf, "WriteDataToMysql");
	job.setJarByClass(WriteDataToMysql.class);

	job.setMapperClass(ConnMysqlMapper1.class);
	job.setReducerClass(ConnMysqlReducer1.class);

	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(Text.class);

	job.setInputFormatClass(TextInputFormat.class);
	job.setOutputFormatClass(DBOutputFormat.class);

	System.out.println("Job start!");
	FileInputFormat.addInputPath(job, new Path("hdfs://192.168.1.1891:9000/data/bbs_post/computeDir/compute-20140429143053/part-r-00000"));
	DBOutputFormat.setOutput(job, "test_out3", "fid", "create_time", "num");

	// System.exit(0);
	if (job.waitForCompletion(true)) {
		System.out.println("Data to mysql ok!");
	} else {
		System.out.println("Error, please check your code or data!");
	}
}

}

通过这种方式向mysql写数据,如果数据量比较小就不会报错,如果数据量比较大就会报错。
看了下日志,数据量小的话是一台datanode向mysql写数据,如果数据量比较大就有多个datanode向mysql写数据,此时就会报错。麻烦帮我看一下是怎么回事,谢谢!

展开
收起
kun坤 2020-06-09 12:18:20 722 0
1 条回答
写回答
取消 提交回答
  • hadoop写数据库是通过事务实现的,当事务中的sql有一个出现错误就会报rollback错误。
    导致该错误的情况有一些几种:
    1 文件错误。由于Hdfs该文件的失误操作会导致文件损坏。
    2 数据库问题。
    3 网络问题。

    2020-06-09 12:18:28
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
搭建电商项目架构连接MySQL 立即下载
搭建4层电商项目架构,实战连接MySQL 立即下载
PolarDB MySQL引擎重磅功能及产品能力盛大发布 立即下载

相关镜像