读取时会出现一些问题
比如 虚拟机上无法ping 本地ip
还有就是需要在类中加
job.addFileToClassPath(new Path("/mysql-connector-java-5.1.43-bin.jar"));
关于这个jar包需要放在hdfs中
在解压安装的hadoop中的share/hadoop/common/lib/下也要加入
且每个子节点都得配置一下
避免出错
public class ReadFormDB {
public static class ReadFromDBMap extends Mapper<LongWritable, WordCountDBWritable, Text, NullWritable> {
private final NullWritable outValue = NullWritable.get();
private Text outKey = new Text();
@Override
protected void map(LongWritable key, WordCountDBWritable value,
Mapper<LongWritable, WordCountDBWritable, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
outKey.set(value.toString());
context.write(outKey, outValue);
}
}
//GRANT ALL PRIVILEGES ON *.* TO 'root'@'%'IDENTIFIED BY '123456' WITH GRANT OPTION;
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration =new Configuration();
// DBConfiguration.configureDB(configuration, "com.mysql.jdbc.Driver", "jdbc:mysql://localhost/test","root","123456");
DBConfiguration.configureDB(configuration, "com.mysql.jdbc.Driver",
"jdbc:mysql://192.168.111.1:3306/test?useSSL=false&autoReconnect=true&failOverReadOnly=false","root","123456");
Job job=Job.getInstance(configuration);
job.setJarByClass(ReadFormDB.class);
job.setJobName("读取从mysql中");
job.setMapperClass(ReadFromDBMap.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.addFileToClassPath(new Path("/mysql-connector-java-5.1.43-bin.jar"));
DBInputFormat.setInput(job, WordCountDBWritable.class, "word_count", "wc_count","wc_count asc","*");
Path outputPath =new Path("/ReadFormD");
outputPath.getFileSystem(configuration).delete(outputPath, true);
FileOutputFormat.setOutputPath(job, outputPath);
System.exit(job.waitForCompletion(true)?0:1);
}
}
把wordcount 的结果写入到mysql中
public class WriteToDB {
// 对应表 word_count create table(wc_word varchar(255) )
public static class WordCountDBWritable implements DBWritable, Writable {
private String word;
private int count;
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
@Override
public String toString() {
return word + "," + count;
}
// 把数据写到数据库中
// insert into word_count(wc_word,wc_count)value(?,?)
public void write(PreparedStatement statement) throws SQLException {
statement.setString(1, this.word);
statement.setInt(2, this.count);
}
// 从数据库中读取数据
public void readFields(ResultSet resultSet) throws SQLException {
this.word = resultSet.getString("wc_word");
this.count = resultSet.getInt("wc_count");
}
public void write(DataOutput out) throws IOException {
out.writeUTF(this.word);
out.writeInt(this.count);
}
public void readFields(DataInput in) throws IOException {
this.word = in.readUTF();
this.count = in.readInt();
}
}
public static class WriteToDBMap extends Mapper<LongWritable, Text, Text, IntWritable> {
private final IntWritable ONE = new IntWritable(1);
private Text oKey = new Text();
private String[] infos;
上面是读文件的方式
下面是关于如何写入到本地mysql中