Java实现Flink集成Kafka消费数据

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Java实现Flink集成Kafka消费数据

1.引入相关的依赖


2.代码如下


/**
 * 消费Kafka中得数据
 * @author 王一宁
 * @date 2020/1/2 12:12
 */
public class StreamingFromKafka {
    public static void main(String[] args) throws Exception{
        //获取环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //kafka配置
        String topic = "wang";
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers","hadoop1:9092");//多个的话可以指定
        prop.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty("auto.offset.reset","latest");
        prop.setProperty("group.id","consumer1");
        FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<String>(topic, new SimpleStringSchema(), prop);
        //获取数据
        DataStream<String> text = env.addSource(myConsumer);
        //打印
        text.print().setParallelism(1);
        //执行
        //env.execute("StreamingFormCollection");
        env.execute();
    }
}


目录
相关文章
|
3月前
|
消息中间件 Java Kafka
Java 事件驱动架构设计实战与 Kafka 生态系统组件实操全流程指南
本指南详解Java事件驱动架构与Kafka生态实操,涵盖环境搭建、事件模型定义、生产者与消费者实现、事件测试及高级特性,助你快速构建高可扩展分布式系统。
234 7
|
2月前
|
人工智能 Java API
Java与大模型集成实战:构建智能Java应用的新范式
随着大型语言模型(LLM)的API化,将其强大的自然语言处理能力集成到现有Java应用中已成为提升应用智能水平的关键路径。本文旨在为Java开发者提供一份实用的集成指南。我们将深入探讨如何使用Spring Boot 3框架,通过HTTP客户端与OpenAI GPT(或兼容API)进行高效、安全的交互。内容涵盖项目依赖配置、异步非阻塞的API调用、请求与响应的结构化处理、异常管理以及一些面向生产环境的最佳实践,并附带完整的代码示例,助您快速将AI能力融入Java生态。
463 12
|
3月前
|
人工智能 自然语言处理 分布式计算
AI 驱动传统 Java 应用集成的关键技术与实战应用指南
本文探讨了如何将AI技术与传统Java应用集成,助力企业实现数字化转型。内容涵盖DJL、Deeplearning4j等主流AI框架选择,技术融合方案,模型部署策略,以及智能客服、财务审核、设备诊断等实战应用案例,全面解析Java系统如何通过AI实现智能化升级与效率提升。
330 0
|
5月前
|
Java 调度 流计算
基于Java 17 + Spring Boot 3.2 + Flink 1.18的智慧实验室管理系统核心代码
这是一套基于Java 17、Spring Boot 3.2和Flink 1.18开发的智慧实验室管理系统核心代码。系统涵盖多协议设备接入(支持OPC UA、MQTT等12种工业协议)、实时异常检测(Flink流处理引擎实现设备状态监控)、强化学习调度(Q-Learning算法优化资源分配)、三维可视化(JavaFX与WebGL渲染实验室空间)、微服务架构(Spring Cloud构建分布式体系)及数据湖建设(Spark构建实验室数据仓库)。实际应用中,该系统显著提升了设备调度效率(响应时间从46分钟降至9秒)、设备利用率(从41%提升至89%),并大幅减少实验准备时间和维护成本。
337 0
|
5月前
|
监控 安全 Java
Java 开发中基于 Spring Boot 3.2 框架集成 MQTT 5.0 协议实现消息推送与订阅功能的技术方案解析
本文介绍基于Spring Boot 3.2集成MQTT 5.0的消息推送与订阅技术方案,涵盖核心技术栈选型(Spring Boot、Eclipse Paho、HiveMQ)、项目搭建与配置、消息发布与订阅服务实现,以及在智能家居控制系统中的应用实例。同时,详细探讨了安全增强(TLS/SSL)、性能优化(异步处理与背压控制)、测试监控及生产环境部署方案,为构建高可用、高性能的消息通信系统提供全面指导。附资源下载链接:[https://pan.quark.cn/s/14fcf913bae6](https://pan.quark.cn/s/14fcf913bae6)。
934 0
|
8月前
|
SQL Java 中间件
【YashanDB知识库】yasdb jdbc驱动集成BeetISQL中间件,业务(java)报autoAssignKey failure异常
在BeetISQL 2.13.8版本中,客户使用batch insert向yashandb表插入数据并尝试获取自动生成的sequence id时,出现类型转换异常。原因是beetlsql在prepareStatement时未指定返回列,导致yashan JDBC驱动返回rowid(字符串),与Java Bean中的数字类型tid不匹配。此问题影响业务流程,使无法正确获取sequence id。解决方法包括:1) 在batchInsert时不返回自动生成的sequence id;2) 升级至BeetISQL 3,其已修正该问题。
【YashanDB知识库】yasdb jdbc驱动集成BeetISQL中间件,业务(java)报autoAssignKey failure异常
|
8月前
|
SQL druid Oracle
【YashanDB知识库】yasdb jdbc驱动集成druid连接池,业务(java)日志中有token IDENTIFIER start异常
客户Java日志中出现异常,影响Druid的merge SQL功能(将SQL字面量替换为绑定变量以统计性能),但不影响正常业务流程。原因是Druid在merge SQL时传入null作为dbType,导致无法解析递归查询中的`start`关键字。
|
10月前
|
存储 缓存 Java
Java中的分布式缓存与Memcached集成实战
通过在Java项目中集成Memcached,可以显著提升系统的性能和响应速度。合理的缓存策略、分布式架构设计和异常处理机制是实现高效缓存的关键。希望本文提供的实战示例和优化建议能够帮助开发者更好地应用Memcached,实现高性能的分布式缓存解决方案。
203 9
|
10月前
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
下一篇
oss云网关配置