1.java实现读取本地数据到kafka生产数据
/** * Created by 王一宁 on 2019/11/6. */ public class kafkaProducer { public static void main(String[] args) throws Exception{ Properties prop = new Properties(); //指定kafka broker地址 prop.put("bootstrap.servers", "hadoop1:9092"); //指定key value的序列化方式 prop.put("key.serializer", StringSerializer.class.getName()); prop.put("value.serializer", StringSerializer.class.getName()); //指定topic名称 String topic = "wang"; //创建producer链接 KafkaProducer<String, String> producer = new KafkaProducer<String,String>(prop); //创建Java IO InputStream file = new FileInputStream("D:\\APP\\IDEA\\workplace\\FlinkTurbineFaultDiagnosis\\src\\main\\resources\\turbine\\GW20000120160101.txt"); InputStreamReader fileInputStream = new InputStreamReader(file); BufferedReader reader = new BufferedReader(fileInputStream); String line = null; while ((line = reader.readLine()) != null) { //生产消息 producer.send(new ProducerRecord<String, String>(topic,line)); Thread.sleep(1000); } reader.close(); file.close(); fileInputStream.close(); //关闭链接 producer.close(); } }
2.在linux服务器中,直接开启一个消费者,就可以看到生产的数据了,或者手写一个java消费者,消费同一个Topic的数据。
3.java实现flink集成kafka消费者的实现代码
/** * 消费Kafka中得数据 * @author 王一宁 * @date 2020/1/2 12:12 */ public class StreamingFromKafka { public static void main(String[] args) throws Exception{ //获取环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //kafka配置 String topic = "wang"; Properties prop = new Properties(); prop.setProperty("bootstrap.servers","hadoop1:9092");//多个的话可以指定 prop.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); prop.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); prop.setProperty("auto.offset.reset","latest"); prop.setProperty("group.id","consumer1"); FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<String>(topic, new SimpleStringSchema(), prop); //获取数据 DataStream<String> text = env.addSource(myConsumer); //打印 text.print().setParallelism(1); //执行 //env.execute("StreamingFormCollection"); env.execute(); } }