6 指定转换函数
大多数转换都需要用户自定义的函数。 本节列出了如何指定它们的不同方法
6.1 Java版本
6.1.1 实现接口
最基本的方法是实现一个提供的接口:
class MyMapFunction implements MapFunction<String, Integer> { public Integer map(String value) { return Integer.parseInt(value); } }; data.map(new MyMapFunction());
6.1.2 匿名类
可以将函数作为匿名类传递:
data.map(new MapFunction<String, Integer> () { public Integer map(String value) { return Integer.parseInt(value); } });
6.1.3 Java 8 Lambdas
Flink还支持Java API中的Java 8 Lambdas。
data.filter(s -> s.startsWith("http://")); data.reduce((i1,i2) -> i1 + i2);
6.1.4 增强函数
所有需要用户定义函数的转换都可以将增强函数作为参数。 例如,与其写成
class MyMapFunction implements MapFunction<String, Integer> { public Integer map(String value) { return Integer.parseInt(value); } };
不如写成
class MyMapFunction extends RichMapFunction<String, Integer> { public Integer map(String value) { return Integer.parseInt(value); } };
并像往常一样将函数传递给map转换:
data.map(new MyMapFunction());
也可以定义为匿名类:
data.map (new RichMapFunction<String, Integer>() { public Integer map(String value) { return Integer.parseInt(value); } });
除了用户定义的函数(map,reduce等)之外,Rich函数还提供了四种方法:open,close,getRuntimeContext和setRuntimeContext。
这些用于参数化函数(请参阅将参数传递给函数),创建和完成本地状态,访问广播变量以及访问运行时信息(如累加器和计数器)