1.pom
org.apache.flink flink-streaming-java_2.11 1.5.6
2.main
package com.jd.xq; import com.jd.xq.mapper.LineMapper; import com.jd.xq.sink.SinkTest; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.InputStream; /** * @author * @Date 2019-08-16 15:45 **/ public class StartJob { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); // 非常关键,一定要设置启动检查点!! env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getConfig().setGlobalJobParameters(loadConfig("config.properties")); DataStream text = env.socketTextStream("localhost", 9090, "\n"); DataStream stream = text.flatMap(new LineMapper()); stream.addSink(new SinkTest()); // 将结果打印到控制台,注意这里使用的是单线程打印,而非多线程 env.setParallelism(1); env.execute("WordCount from Kafka data"); } public static ParameterTool loadConfig(String configFileName) throws Exception { try (InputStream is = StartJob.class.getClassLoader().getResourceAsStream(configFileName)) { return ParameterTool.fromPropertiesFile(is); } } }
3.mapper
package com.jd.xq.mapper; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Collector; /** * @author duanxiaoqiu * @Date 2019-09-10 20:50 **/ public class LineMapper extends RichFlatMapFunction { @Override public void open(Configuration parameters) { ExecutionConfig executionConfig = getRuntimeContext().getExecutionConfig(); ParameterTool params = (ParameterTool) executionConfig.getGlobalJobParameters(); System.out.println(params.getProperties().getProperty("xq.name")); } public void flatMap(String s, Collector collector) { System.out.println(s); } }
4.sink
package com.jd.xq.sink; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; /** * @author duanxiaoqiu * @Date 2019-09-11 09:36 **/ public class SinkTest extends RichSinkFunction { @Override public void open(Configuration parameters) throws Exception { } public void invoke(String value, Context context) throws Exception { } }
5.配置文件 config.properties
xq.name=test
6.Flink
接数据、处理、写数据到其他数据库
open方法里面如果实例化类,最好使用单例的方式