public class StreamWordCount { public static void main(String[] args) throws Exception { //获取环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //kafka配置 String topic = "superman"; Properties prop = new Properties(); prop.setProperty("bootstrap.servers", "192.168.52.200:9092");//多个的话可以指定 prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); prop.setProperty("auto.offset.reset", "earliest"); prop.setProperty("group.id", "consumer1"); FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<String>(topic, new SimpleStringSchema(), prop); //获取数据 DataStream<String> text = env.addSource(myConsumer); //打印 text.print(); //执行 env.execute("KafkaFlink"); } }