一、FlinkKafkaToRedis
思考一个问题:flink程序运行的时候,我们可以通过神魔样的形式进行传值?
1.写死程序传值
2.args[0],程序动态传值
ParameterTool parameters = ParameterTool.fromPropertiesFile(args[0]); DataStream<String> lines = FlinkUtils.createKafkaStream(parameters,SimpleStringSchema.class); String groupId = parameters.get("group.id","consumer1"); String topics = parameters.getRequired("topics");
3.配置文件动态读取
1.1 pom
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.9.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.9.1</version> </dependency> <dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.9.1</version> </dependency>
1.2 config.properties
# 可以传入多个topic,多个 ,隔开 topics=wang group.id=consumer1 bootstrap.servers=hadoop1:9092,hadoop2:9092,hadoop3:9092 # earliest auto.offset.reset=latest # kafka不提交偏移量,由flink管理checkpoint enable.auto.commit=false # 30s写入内存一次 默认是内存,由于我没有指定checkpoint目录,会保存与jobManager的内存中 # 你自己可以配置到 HDFS 例如: # env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true)); # env.setStateBackend(new FsStateBackend("file:///D://APP//IDEA//workplace//FlinkTurbineFaultDiagnosis//checkpoint")); checkpoint.interval=30000 # redis redis.host=127.0.0.1 #redis.pwd=123456 redis.db=1
1.3 FlinkUtils.java
public class FlinkUtils { private static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); public static <T> DataStream<T> createKafkaStream(ParameterTool parameters, Class<? extends DeserializationSchema<T>> clazz) throws Exception { //1.设置全局的参数 env.getConfig().setGlobalJobParameters(parameters); //2.checkpoint配置 env.enableCheckpointing(parameters.getLong("checkpoint.interval", 5000L), CheckpointingMode.EXACTLY_ONCE); //3.取消checkpoint任务不删除 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //4.kafka配置 Properties prop = new Properties(); prop.setProperty("bootstrap.servers", parameters.getRequired("bootstrap.servers")); prop.setProperty("group.id", parameters.getRequired("group.id")); prop.setProperty("auto.offset.reset", parameters.get("auto.offset.reset", "earliest")); //5.不自动提交偏移量,交给flink的checkpoint处理哦 prop.setProperty("enable.auto.commit", parameters.get("enable.auto.commit", "false")); String topics = parameters.getRequired("topics"); List<String> topicList = Arrays.asList(topics.split(",")); FlinkKafkaConsumer<T> kafkaConsumer = new FlinkKafkaConsumer<T>( topicList, clazz.newInstance(), prop); return env.addSource(kafkaConsumer); } //获取执行环境 public static StreamExecutionEnvironment getEnv() { return env; } }
1.4 MyRedisSink.java
public class MyRedisSink extends RichSinkFunction<Turbine> { //初始化redis连接 private transient Jedis jedis; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); ParameterTool params = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); String host = params.getRequired("redis.host"); //String password = params.getRequired("redis.pwd"); int db = params.getInt("redis.db", 0); jedis = new Jedis(host, 6379, 5000); //jedis.auth(password); jedis.select(db); } @Override public void invoke(Turbine value, Context context) throws Exception { if (!jedis.isConnected()) { jedis.connect(); } //写入redis jedis.hset(value.word, value.province, String.valueOf(value.counts)); } @Override public void close() throws Exception { super.close(); jedis.close(); } }
1.5 KafkaToRedis
public class KafkaToRedis { public static void main(String[] args) throws Exception { ParameterTool parameters = ParameterTool.fromPropertiesFile("D:\\APP\\IDEA\\workplace\\FlinkTurbineFaultDiagnosis\\src\\main\\resources\\config.properties"); DataStream<String> lines = FlinkUtils.createKafkaStream(parameters, SimpleStringSchema.class); lines.print(); //输入的时String 返回一个对象 SingleOutputStreamOperator<Turbine> map = lines.map(new MapFunction<String, Turbine>() { @Override public Turbine map(String value) throws Exception { String[] fields = value.split(" "); String word = fields[0]; String province = fields[1]; long counts = Long.parseLong(fields[2]); return Turbine.of(word, province, counts); } }); map.addSink(new MyRedisSink()); //执行程序 FlinkUtils.getEnv().execute(); } }