1.pom
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.8.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.8.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.10_2.11</artifactId> <version>1.8.0</version> </dependency>
2.接kafka消息
public static FlinkKafkaConsumer010<String> createConsumers(ParameterTool params, String topic) { Properties props = new Properties(); String brokers = params.getProperties().getProperty("kafka.consumer.brokers"); System.out.println(brokers); String groupid = params.getProperties().getProperty("kafka.channel.consumer.group.id"); System.out.println(groupid); //String topic = params.getProperties().getProperty("kafka.web.info.topic"); System.out.println(topic); props.setProperty("bootstrap.servers", brokers); props.setProperty("group.id", groupid); FlinkKafkaConsumer010<String> consumer010 = new FlinkKafkaConsumer010<>(topic, new org.apache.flink.api.common.serialization.SimpleStringSchema(), props); return consumer010; }
3.config
package com.vince.xq.common; import org.apache.flink.api.java.utils.ParameterTool; import java.io.InputStream; public class Configs { public static ParameterTool loadConfig(String configFileName) throws Exception { try (InputStream is = Configs.class.getClassLoader().getResourceAsStream(configFileName)) { return ParameterTool.fromPropertiesFile(is); } } }
4.主程序
public static void main(String[] args) throws Exception { /*if (args == null || args.length == 0) { throw new RuntimeException("config file name must be config, config is args[0]"); }*/ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //DataStream<String> userStream = env.addSource(new RandomUserSource()); ParameterTool parameterTool = Configs.loadConfig("config-test.properties"); env.getConfig().setGlobalJobParameters(parameterTool); //String topic = parameterTool.getProperties().getProperty("kafka.user.info.topic"); String topic="test1115"; DataStream<String> userStream = env.addSource(Utils.createConsumers(parameterTool, topic)); DataStream<String> userMapperStream = userStream.flatMap(new UserMapper()); String index = "sys_user_test"; String type = "user"; userMapperStream.addSink(Utils.createEsProducer(parameterTool, index, type)); env.execute("user-to-es"); }
参考:
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/connectors/kafka.html