开发者社区 问答 正文

通过mapreduce向mysql大量写数据时报错Communications ?报错

报错如下:

AttemptID:attempt_1398735110766_0055_r_000000_0 Timed out after 600 secs

Error: java.io.IOException: Communications link failure during rollback(). Transaction resolution unknown.
    at org.apache.hadoop.mapreduce.lib.db.DBOutputFormat$DBRecordWriter.close(DBOutputFormat.java:110)
    at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.close(ReduceTask.java:550)
    at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:629)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:416)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)

    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

java代码如下:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import bbs_post.EJob;

/**
 * 将mapreduce的结果数据写入mysql中
 */
public class WriteDataToMysql {
	/**
	 * 重写DBWritable TblsWritable需要向mysql中写入数据
	 */
	public static class TblsWritable implements Writable, DBWritable {
		String fid, create_time, num;

		public TblsWritable(String fid, String create_time, String num) {
			this.fid = fid;
			this.create_time = create_time;
			this.num = num;
		}

		@Override
		public void write(PreparedStatement statement) throws SQLException {
			statement.setString(1, this.fid);
			statement.setString(2, this.create_time);
			statement.setString(3, this.num);
		}

		@Override
		public void readFields(ResultSet resultSet) throws SQLException {
			this.fid = resultSet.getString(1);
			this.create_time = resultSet.getString(2);
			this.num = resultSet.getString(3);
		}

		@Override
		public void write(DataOutput out) throws IOException {
			out.writeUTF(this.fid);
			out.writeUTF(this.create_time);
			out.writeUTF(this.num);
		}

		@Override
		public void readFields(DataInput in) throws IOException {
			this.fid = in.readUTF();
			this.create_time = in.readUTF();
			this.num = in.readUTF();
		}

		// public String toString() {
		// return new String(this.fid + "	" + this.create_time);
		// }
	}

	public static class ConnMysqlMapper1 extends
			Mapper<LongWritable, Text, Text, Text>
	// TblsRecord是自定义的类型,也就是上面重写的DBWritable类
	{
		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			// <首字母偏移量,该行内容>接收进来,然后处理value,将abc和x作为map的输出
			// key对于本程序没有太大的意义,没有使用
//			System.out.println("ConnMysqlMapper1");
			String[] line = value.toString().split("	");
			String val = line[1] + "	" + line[2];
//			 System.out.println(val);
			context.write(new Text(line[0]), new Text(val));
		}
	}

	public static class ConnMysqlReducer1 extends
			Reducer<Text, Text, TblsWritable, TblsWritable> {
		public void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			// 接收到的key value对即为要输入数据库的字段,所以在reduce中:
			// wirte的第一个参数,类型是自定义类型TblsWritable,利用key和value将其组合成TblsWritable,然后等待写入数据库
			// wirte的第二个参数,wirte的第一个参数已经涵盖了要输出的类型,所以第二个类型没有用,设为null
//			System.out.println("ConnMysqlReducer1");
//			System.out.println("key: " + key + " values: " + values.toString());
			String[] line;
			// for (Text val : values) {
			// System.out.println(val.toString());
			// line = val.toString().split("	");
			// context.write(new TblsWritable(line[1], line[2], line[3]), null);
			// }
			for (Iterator<Text> itr = values.iterator(); itr.hasNext();) {
				// System.out.println(itr.next().toString());
				line = itr.next().toString().split("	");
				// System.out.println(line[1]+"	"+line[2]+"	"+line[3]);
				context.write(
						new TblsWritable(key.toString(), line[0], line[1]),
						null);
			}
			// System.exit(0);
		}
	}

	public static void main(String args[]) throws IOException,
			InterruptedException, ClassNotFoundException {

		Configuration conf = new Configuration();

		// mysql的jdbc驱动
		DistributedCache.addFileToClassPath(new Path("hdfs://192.168.1.181:9000/data/mysql-connector-java-5.1.18-bin.jar"),conf);
		DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver","jdbc:mysql://192.168.1.182:3306/meta", "bbs", "123456");

		Job job = new Job(conf, "WriteDataToMysql");
		job.setJarByClass(WriteDataToMysql.class);

		job.setMapperClass(ConnMysqlMapper1.class);
		job.setReducerClass(ConnMysqlReducer1.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(DBOutputFormat.class);

		System.out.println("Job start!");
		FileInputFormat.addInputPath(job, new Path("hdfs://192.168.1.1891:9000/data/bbs_post/computeDir/compute-20140429143053/part-r-00000"));
		DBOutputFormat.setOutput(job, "test_out3", "fid", "create_time", "num");

		// System.exit(0);
		if (job.waitForCompletion(true)) {
			System.out.println("Data to mysql ok!");
		} else {
			System.out.println("Error, please check your code or data!");
		}
	}
}

通过这种方式向mysql写数据,如果数据量比较小就不会报错,如果数据量比较大就会报错。

看了下日志,数据量小的话是一台datanode向mysql写数据,如果数据量比较大就有多个datanode向mysql写数据,此时就会报错。麻烦帮我看一下是怎么回事,谢谢!



展开
收起
爱吃鱼的程序员 2020-06-20 19:26:34 795 分享 版权
1 条回答
写回答
取消 提交回答
  • https://developer.aliyun.com/profile/5yerqm5bn5yqg?spm=a2c6h.12873639.0.0.6eae304abcjaIB

    hadoop写数据库是通过事务实现的,当事务中的sql有一个出现错误就会报rollback错误。

    导致该错误的情况有一些几种:

    1 文件错误。由于Hdfs该文件的失误操作会导致文件损坏。

    2 数据库问题。

    3 网络问题。

    2020-06-20 19:26:52
    赞同 展开评论