Java模拟读取本地数据到Flink集成的Kafka并消费数据

简介: Java模拟读取本地数据到Flink集成的Kafka并消费数据

1.java实现读取本地数据到kafka生产数据


/**
 * Created by 王一宁 on 2019/11/6.
 */
public class kafkaProducer {
    public static void main(String[] args) throws Exception{
        Properties prop = new Properties();
        //指定kafka broker地址
        prop.put("bootstrap.servers", "hadoop1:9092");
        //指定key value的序列化方式
        prop.put("key.serializer", StringSerializer.class.getName());
        prop.put("value.serializer", StringSerializer.class.getName());
        //指定topic名称
        String topic = "wang";
        //创建producer链接
        KafkaProducer<String, String> producer = new KafkaProducer<String,String>(prop);
        //创建Java IO
        InputStream file = new FileInputStream("D:\\APP\\IDEA\\workplace\\FlinkTurbineFaultDiagnosis\\src\\main\\resources\\turbine\\GW20000120160101.txt");
        InputStreamReader fileInputStream = new InputStreamReader(file);
        BufferedReader reader = new BufferedReader(fileInputStream);
        String line = null;
        while ((line = reader.readLine()) != null) {
            //生产消息
            producer.send(new ProducerRecord<String, String>(topic,line));
            Thread.sleep(1000);
        }
        reader.close();
        file.close();
        fileInputStream.close();
        //关闭链接
        producer.close();
    }
}

2.在linux服务器中,直接开启一个消费者,就可以看到生产的数据了,或者手写一个java消费者,消费同一个Topic的数据。

3.java实现flink集成kafka消费者的实现代码


/**
 * 消费Kafka中得数据
 * @author 王一宁
 * @date 2020/1/2 12:12
 */
public class StreamingFromKafka {
    public static void main(String[] args) throws Exception{
        //获取环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //kafka配置
        String topic = "wang";
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers","hadoop1:9092");//多个的话可以指定
        prop.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty("auto.offset.reset","latest");
        prop.setProperty("group.id","consumer1");
        FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<String>(topic, new SimpleStringSchema(), prop);
        //获取数据
        DataStream<String> text = env.addSource(myConsumer);
        //打印
        text.print().setParallelism(1);
        //执行
        //env.execute("StreamingFormCollection");
        env.execute();
    }
}


目录
相关文章
|
5月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
916 43
|
4月前
|
Java API 开发工具
【Azure Developer】Java代码实现获取Azure 资源的指标数据却报错 "invalid time interval input"
在使用 Java 调用虚拟机 API 获取指标数据时,因本地时区设置非 UTC,导致时间格式解析错误。解决方法是在代码中手动指定时区为 UTC,使用 `ZoneOffset.ofHours(0)` 并结合 `withOffsetSameInstant` 方法进行时区转换,从而避免因时区差异引发的时间格式问题。
279 3
|
5月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
377 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
5月前
|
消息中间件 Java Kafka
Java 事件驱动架构设计实战与 Kafka 生态系统组件实操全流程指南
本指南详解Java事件驱动架构与Kafka生态实操,涵盖环境搭建、事件模型定义、生产者与消费者实现、事件测试及高级特性,助你快速构建高可扩展分布式系统。
299 7
|
5月前
|
数据采集 JSON Java
Java爬虫获取1688店铺所有商品接口数据实战指南
本文介绍如何使用Java爬虫技术高效获取1688店铺商品信息,涵盖环境搭建、API调用、签名生成及数据抓取全流程,并附完整代码示例,助力市场分析与选品决策。
|
5月前
|
数据采集 存储 前端开发
Java爬虫性能优化:多线程抓取JSP动态数据实践
Java爬虫性能优化:多线程抓取JSP动态数据实践
|
4月前
|
人工智能 Java API
Java与大模型集成实战:构建智能Java应用的新范式
随着大型语言模型(LLM)的API化,将其强大的自然语言处理能力集成到现有Java应用中已成为提升应用智能水平的关键路径。本文旨在为Java开发者提供一份实用的集成指南。我们将深入探讨如何使用Spring Boot 3框架,通过HTTP客户端与OpenAI GPT(或兼容API)进行高效、安全的交互。内容涵盖项目依赖配置、异步非阻塞的API调用、请求与响应的结构化处理、异常管理以及一些面向生产环境的最佳实践,并附带完整的代码示例,助您快速将AI能力融入Java生态。
777 12
|
4月前
|
算法 Java
Java多线程编程:实现线程间数据共享机制
以上就是Java中几种主要处理多线程序列化资源以及协调各自独立运行但需相互配合以完成任务threads 的技术手段与策略。正确应用上述技术将大大增强你程序稳定性与效率同时也降低bug出现率因此深刻理解每项技术背后理论至关重要.
380 16
|
5月前
|
SQL 关系型数据库 Apache
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
本文将深入解析 Flink-Doris-Connector 三大典型场景中的设计与实现,并结合 Flink CDC 详细介绍了整库同步的解决方案,助力构建更加高效、稳定的实时数据处理体系。
2404 0
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
|
5月前
|
机器学习/深度学习 SQL 大数据
什么是数据集成?和数据融合有什么区别?
在大数据领域,“数据集成”与“数据融合”常被混淆。数据集成关注数据的物理集中,解决“数据从哪来”的问题;数据融合则侧重逻辑协同,解决“数据怎么用”的问题。两者相辅相成,集成是基础,融合是价值提升的关键。理解其差异,有助于企业释放数据潜力,避免“数据堆积”或“盲目融合”的误区,实现数据从成本到生产力的转变。
什么是数据集成?和数据融合有什么区别?