Java模拟读取本地数据到Flink集成的Kafka并消费数据

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Java模拟读取本地数据到Flink集成的Kafka并消费数据

1.java实现读取本地数据到kafka生产数据


/**
 * Created by 王一宁 on 2019/11/6.
 */
public class kafkaProducer {
    public static void main(String[] args) throws Exception{
        Properties prop = new Properties();
        //指定kafka broker地址
        prop.put("bootstrap.servers", "hadoop1:9092");
        //指定key value的序列化方式
        prop.put("key.serializer", StringSerializer.class.getName());
        prop.put("value.serializer", StringSerializer.class.getName());
        //指定topic名称
        String topic = "wang";
        //创建producer链接
        KafkaProducer<String, String> producer = new KafkaProducer<String,String>(prop);
        //创建Java IO
        InputStream file = new FileInputStream("D:\\APP\\IDEA\\workplace\\FlinkTurbineFaultDiagnosis\\src\\main\\resources\\turbine\\GW20000120160101.txt");
        InputStreamReader fileInputStream = new InputStreamReader(file);
        BufferedReader reader = new BufferedReader(fileInputStream);
        String line = null;
        while ((line = reader.readLine()) != null) {
            //生产消息
            producer.send(new ProducerRecord<String, String>(topic,line));
            Thread.sleep(1000);
        }
        reader.close();
        file.close();
        fileInputStream.close();
        //关闭链接
        producer.close();
    }
}

2.在linux服务器中,直接开启一个消费者,就可以看到生产的数据了,或者手写一个java消费者,消费同一个Topic的数据。

3.java实现flink集成kafka消费者的实现代码


/**
 * 消费Kafka中得数据
 * @author 王一宁
 * @date 2020/1/2 12:12
 */
public class StreamingFromKafka {
    public static void main(String[] args) throws Exception{
        //获取环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //kafka配置
        String topic = "wang";
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers","hadoop1:9092");//多个的话可以指定
        prop.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty("auto.offset.reset","latest");
        prop.setProperty("group.id","consumer1");
        FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<String>(topic, new SimpleStringSchema(), prop);
        //获取数据
        DataStream<String> text = env.addSource(myConsumer);
        //打印
        text.print().setParallelism(1);
        //执行
        //env.execute("StreamingFormCollection");
        env.execute();
    }
}


目录
相关文章
|
2天前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【9月更文挑战第7天】在大数据时代,数据管理和处理尤为重要,尤其在保障数据安全与隐私方面。本文探讨如何利用Flink SQL实现数据脱敏,为实时数据处理提供有效的隐私保护方案。数据脱敏涉及在处理、存储或传输前对敏感数据进行加密、遮蔽或替换,以遵守数据保护法规(如GDPR)。Flink SQL通过内置函数和表达式支持这一过程。
13 2
|
10天前
|
消息中间件 Kafka 数据处理
实时计算 Flink版产品使用问题之如何处理数据并记录每条数据的变更
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
10天前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之同步时,上游批量删除大量数据(如20万条),如何提高删除效率
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
10天前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
10天前
|
SQL 关系型数据库 HIVE
实时计算 Flink版产品使用问题之如何将PostgreSQL数据实时入库Hive并实现断点续传
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
10天前
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之处理Kafka数据顺序时,怎么确保事件的顺序性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
10天前
|
SQL 分布式计算 数据处理
实时计算 Flink版产品使用问题之怎么将数据从Hive表中读取并写入到另一个Hive表中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
10天前
|
消息中间件 Kafka 数据处理
实时数据流处理:Dask Streams 与 Apache Kafka 集成
【8月更文第29天】在现代数据处理领域,实时数据流处理已经成为不可或缺的一部分。随着物联网设备、社交媒体和其他实时数据源的普及,处理这些高吞吐量的数据流成为了一项挑战。Apache Kafka 作为一种高吞吐量的消息队列服务,被广泛应用于实时数据流处理场景中。Dask Streams 是 Dask 库的一个子模块,它为 Python 开发者提供了一个易于使用的实时数据流处理框架。本文将介绍如何将 Dask Streams 与 Apache Kafka 结合使用,以实现高效的数据流处理。
17 0