Java 代码:
package
com.xunjie.dmsp.olduser;
import java.util.Properties;
import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.operation.regex.RegexSplitter;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.scheme.TextLine;
import cascading.tap.Hfs;
import cascading.tap.Tap;
import cascading.tuple.Fields;
/**
* test.txt:
* 1 a
* 2 b
* 3 c
*
* /data/hadoop/hadoop/bin/hadoop jar
* dmsp_test_jar-1.0-SNAPSHOT-dependencies.jar
* hdfs:/user/hadoop/test/lky/test.txt
* file:///data/hadoop/test/lky/output
*/
public class Test2 {
public static void main(String[] args) {
// 设定输入文件
String sourcePath = args[ 0 ];
// 设置输出文件夹
String sinkPath = args[ 1 ];
// 定义读取列
Fields inputfields = new Fields( " num " , " value " );
// 定义分解正则,默认 \t
RegexSplitter spliter = new RegexSplitter(inputfields);
// 管道定义
Pipe p1 = new Pipe( " test " );
// 管道嵌套:
// 分解日志源文件,输出给定字段
p1 = new Each(p1, new Fields( " line " ) ,spliter);
// 设定输入和输出 ,使用 泛型Hfs
Tap source = new Hfs( new TextLine(), sourcePath );
Tap sink = new Hfs( new TextLine() , sinkPath );
// 配置job
Properties properties = new Properties();
properties.setProperty( " hadoop.job.ugi " , " hadoop,hadoop " );
FlowConnector.setApplicationJarClass( properties, Main. class );
FlowConnector flowConnector = new FlowConnector(properties);
Flow importFlow = flowConnector.connect( " import flow " , source,sink,p1);
importFlow.start();
importFlow.complete();
}
}
本文转自博客园刘凯毅的博客,原文链接:hadoop cascading demo,如需转载请自行联系原博主。
import java.util.Properties;
import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.operation.regex.RegexSplitter;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.scheme.TextLine;
import cascading.tap.Hfs;
import cascading.tap.Tap;
import cascading.tuple.Fields;
/**
* test.txt:
* 1 a
* 2 b
* 3 c
*
* /data/hadoop/hadoop/bin/hadoop jar
* dmsp_test_jar-1.0-SNAPSHOT-dependencies.jar
* hdfs:/user/hadoop/test/lky/test.txt
* file:///data/hadoop/test/lky/output
*/
public class Test2 {
public static void main(String[] args) {
// 设定输入文件
String sourcePath = args[ 0 ];
// 设置输出文件夹
String sinkPath = args[ 1 ];
// 定义读取列
Fields inputfields = new Fields( " num " , " value " );
// 定义分解正则,默认 \t
RegexSplitter spliter = new RegexSplitter(inputfields);
// 管道定义
Pipe p1 = new Pipe( " test " );
// 管道嵌套:
// 分解日志源文件,输出给定字段
p1 = new Each(p1, new Fields( " line " ) ,spliter);
// 设定输入和输出 ,使用 泛型Hfs
Tap source = new Hfs( new TextLine(), sourcePath );
Tap sink = new Hfs( new TextLine() , sinkPath );
// 配置job
Properties properties = new Properties();
properties.setProperty( " hadoop.job.ugi " , " hadoop,hadoop " );
FlowConnector.setApplicationJarClass( properties, Main. class );
FlowConnector flowConnector = new FlowConnector(properties);
Flow importFlow = flowConnector.connect( " import flow " , source,sink,p1);
importFlow.start();
importFlow.complete();
}
}