开发者社区> 问答> 正文

通过mapreduce向mysql大量写数据时报错Communications link failure during rollback(). Transaction resolution unknown.

小旋风柴进 2016-03-10 16:51:29 6650

报错如下:

AttemptID:attempt_1398735110766_0055_r_000000_0 Timed out after 600 secs

Error: java.io.IOException: Communications link failure during rollback(). Transaction resolution unknown. 
    at org.apache.hadoop.mapreduce.lib.db.DBOutputFormat$DBRecordWriter.close(DBOutputFormat.java:110)
    at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.close(ReduceTask.java:550) 
    at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:629) 
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389) 
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:416) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) 
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

java代码如下:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 
import bbs_post.EJob;
 
/**
 * 将mapreduce的结果数据写入mysql中
 */
public class WriteDataToMysql {
    /**
     * 重写DBWritable TblsWritable需要向mysql中写入数据
     */
    public static class TblsWritable implements Writable, DBWritable {
        String fid, create_time, num;
 
        public TblsWritable(String fid, String create_time, String num) {
            this.fid = fid;
            this.create_time = create_time;
            this.num = num;
        }
 
        @Override
        public void write(PreparedStatement statement) throws SQLException {
            statement.setString(1, this.fid);
            statement.setString(2, this.create_time);
            statement.setString(3, this.num);
        }
 
        @Override
        public void readFields(ResultSet resultSet) throws SQLException {
            this.fid = resultSet.getString(1);
            this.create_time = resultSet.getString(2);
            this.num = resultSet.getString(3);
        }
 
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(this.fid);
            out.writeUTF(this.create_time);
            out.writeUTF(this.num);
        }
 
        @Override
        public void readFields(DataInput in) throws IOException {
            this.fid = in.readUTF();
            this.create_time = in.readUTF();
            this.num = in.readUTF();
        }
 
        // public String toString() {
        // return new String(this.fid + "   " + this.create_time);
        // }
    }
 
    public static class ConnMysqlMapper1 extends
            Mapper<LongWritable, Text, Text, Text>
    // TblsRecord是自定义的类型,也就是上面重写的DBWritable类
    {
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            // <首字母偏移量,该行内容>接收进来,然后处理value,将abc和x作为map的输出
            // key对于本程序没有太大的意义,没有使用
//          System.out.println("ConnMysqlMapper1");
            String[] line = value.toString().split("    ");
            String val = line[1] + "    " + line[2];
//           System.out.println(val);
            context.write(new Text(line[0]), new Text(val));
        }
    }
 
    public static class ConnMysqlReducer1 extends
            Reducer<Text, Text, TblsWritable, TblsWritable> {
        public void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            // 接收到的key value对即为要输入数据库的字段,所以在reduce中:
            // wirte的第一个参数,类型是自定义类型TblsWritable,利用key和value将其组合成TblsWritable,然后等待写入数据库
            // wirte的第二个参数,wirte的第一个参数已经涵盖了要输出的类型,所以第二个类型没有用,设为null
//          System.out.println("ConnMysqlReducer1");
//          System.out.println("key: " + key + " values: " + values.toString());
            String[] line;
            // for (Text val : values) {
            // System.out.println(val.toString());
            // line = val.toString().split("    ");
            // context.write(new TblsWritable(line[1], line[2], line[3]), null);
            // }
            for (Iterator<Text> itr = values.iterator(); itr.hasNext();) {
                // System.out.println(itr.next().toString());
                line = itr.next().toString().split("    ");
                // System.out.println(line[1]+" "+line[2]+" "+line[3]);
                context.write(
                        new TblsWritable(key.toString(), line[0], line[1]),
                        null);
            }
            // System.exit(0);
        }
    }
 
    public static void main(String args[]) throws IOException,
            InterruptedException, ClassNotFoundException {
 
        Configuration conf = new Configuration();
 
        // mysql的jdbc驱动
        DistributedCache.addFileToClassPath(new Path("hdfs://192.168.1.181:9000/data/mysql-connector-java-5.1.18-bin.jar"),conf);
        DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver","jdbc:mysql://192.168.1.182:3306/meta", "bbs", "123456");
 
        Job job = new Job(conf, "WriteDataToMysql");
        job.setJarByClass(WriteDataToMysql.class);
 
        job.setMapperClass(ConnMysqlMapper1.class);
        job.setReducerClass(ConnMysqlReducer1.class);
 
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
 
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(DBOutputFormat.class);
 
        System.out.println("Job start!");
        FileInputFormat.addInputPath(job, new Path("hdfs://<span></span>192.168.1.1891<span></span>:9000/data/bbs_post/computeDir/compute-20140429143053/part-r-00000"));
        DBOutputFormat.setOutput(job, "test_out3", "fid", "create_time", "num");
 
        // System.exit(0);
        if (job.waitForCompletion(true)) {
            System.out.println("Data to mysql ok!");
        } else {
            System.out.println("Error, please check your code or data!");
        }
    }
}

通过这种方式向mysql写数据,如果数据量比较小就不会报错,如果数据量比较大就会报错。

看了下日志,数据量小的话是一台datanode向mysql写数据,如果数据量比较大就有多个datanode向mysql写数据,此时就会报错。麻烦帮我看一下是怎么回事,谢谢!

分布式计算 关系型数据库 MySQL Java 数据库连接 数据库
分享到
取消 提交回答
全部回答(1)
  • 小旋风柴进
    2019-07-17 18:57:56

    hadoop写数据库是通过事务实现的,当事务中的sql有一个出现错误就会报rollback错误。

    导致该错误的情况有一些几种:

    1 文件错误。由于Hdfs该文件的失误操作会导致文件损坏。

    2 数据库问题。

    3 网络问题。

    0 0
+ 订阅

分享数据库前沿,解构实战干货,推动数据库技术变革

推荐文章
相似问题