D:\HADOOP_HOME\hadoop-2.6.4\share\hadoop\mapreduce\sources\hadoop-mapreduce-examples 2.6.0-source.jar
2.1 Mapper
package cf; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MovieMapper1 extends Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException { String[] values = ivalue.toString().split(","); if (values.length!=2) { return ; } String userID = values[0]; String itemID = values[1]; context.write(new Text(userID), new Text(itemID)); } }
2.2 Reducer
package cf; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MovieReduce1 extends Reducer<Text, Text, Text, Text> { public void reduce(Text _key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // process values StringBuffer sb = new StringBuffer(); for (Text val : values) { sb.append(val.toString()); sb.append(","); } //value不能直接用StringBuffer 必须转换为String context.write(_key,new Text(sb.toString())); } }
2.3 Main
package cf; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class UserItemSetMapReduce { public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); Job job = new Job(conf, "CFItemSet"); job.setJarByClass(UserItemSetMapReduce.class); job.setMapperClass(MovieMapper1.class); //job.setCombinerClass(cls); // job.setCombinerClass(MovieReduce1.class); job.setReducerClass(MovieReduce1.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job,new Path("hdfs://")); //InputPath(job, new Path(otherArgs[0])); //直接写到cf会提示已存在cf,我写成uIO.ttx,以为内容会写入到txt,然没有,默认他是文件夹 FileOutputFormat.setOutputPath(job,new Path("hdfs://")); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
3.1 输入
3.2 输出
DEBUG - PrivilegedAction as:hxsyl (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.getCounters(Job.java:765) INFO - Counters: 38 File System Counters FILE: Number of bytes read=538 FILE: Number of bytes written=509366 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=106 HDFS: Number of bytes written=37 HDFS: Number of read operations=13 HDFS: Number of large read operations=0 HDFS: Number of write operations=4 Map-Reduce Framework Map input records=11 Map output records=11 Map output bytes=44 Map output materialized bytes=72 Input split bytes=107 Combine input records=0 Combine output records=0 Reduce input groups=5 Reduce shuffle bytes=72 Reduce input records=11 Reduce output records=5 Spilled Records=22 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=3 CPU time spent (ms)=0 Physical memory (bytes) snapshot=0 Virtual memory (bytes) snapshot=0 Total committed heap usage (bytes)=462422016 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=53 File Output Format Counters Bytes Written=37 DEBUG - PrivilegedAction as:hxsyl (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.updateStatus(Job.java:323) DEBUG - stopping client from cache: org.apache.hadoop.ipc.Client@37afeb11 DEBUG - removing client from cache: org.apache.hadoop.ipc.Client@37afeb11 DEBUG - stopping actual client because no more references remain: org.apache.hadoop.ipc.Client@37afeb11 DEBUG - Stopping client DEBUG - IPC Client (521081105) connection to / from hxsyl: closed DEBUG - IPC Client (521081105) connection to / from hxsyl: stopped, remaining connections 0