Spark Streaming实时流处理项目实战笔记——Kafka Consumer Java API编程

简介: Spark Streaming实时流处理项目实战笔记——Kafka Consumer Java API编程

1、在控制台创建发送者


kafka-console-producer.sh --broker-list hadoop2:9092 --topic zz
>hello world

2、消费者API


import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class CustomNewConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        // 定义kakfa 服务的地址,不需要将所有broker指定上
        props.put("bootstrap.servers", "hadoop2:9092");
        // 制定consumer group
        props.put("group.id", "test");
        // 是否自动确认offset
        props.put("enable.auto.commit", "true");
        // 自动确认offset的时间间隔
        props.put("auto.commit.interval.ms", "1000");
        // key的序列化类
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // value的序列化类
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 定义consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        // 消费者订阅的topic, 可同时订阅多个
        consumer.subscribe(Arrays.asList("zz"));
        while (true) {
            // 读取数据,读取超时时间为100ms
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}



相关文章
|
19天前
|
设计模式 Java 关系型数据库
【Java笔记+踩坑汇总】Java基础+JavaWeb+SSM+SpringBoot+SpringCloud+瑞吉外卖/谷粒商城/学成在线+设计模式+面试题汇总+性能调优/架构设计+源码解析
本文是“Java学习路线”专栏的导航文章,目标是为Java初学者和初中高级工程师提供一套完整的Java学习路线。
174 37
|
19天前
|
设计模式 Java 关系型数据库
【Java笔记+踩坑】设计模式——原型模式
对比原型模式和传统方式的实现思路、代码方案、优缺点,阐述原型模式的使用场景,以及深拷贝、浅拷贝等相关概念,并扩展原型模式在Spring源码中的应用。
【Java笔记+踩坑】设计模式——原型模式
|
6天前
|
消息中间件 分布式计算 Java
Linux环境下 java程序提交spark任务到Yarn报错
Linux环境下 java程序提交spark任务到Yarn报错
17 5
|
8天前
|
Java 程序员 API
Java 8新特性之Lambda表达式与Stream API的探索
【9月更文挑战第24天】本文将深入浅出地介绍Java 8中的重要新特性——Lambda表达式和Stream API,通过实例解析其语法、用法及背后的设计哲学。我们将一探究竟,看看这些新特性如何让Java代码变得更加简洁、易读且富有表现力,同时提升程序的性能和开发效率。
|
6天前
|
JSON Java Maven
关于使用Java-JWT的笔记
这篇文章介绍了使用Java-JWT库来生成和验证JSON Web Tokens (JWT) 的方法。文中解释了JWT的组成,包括头部、载荷和签名,并提供了如何使用java-jwt库生成和验证token的示例代码。此外,还提供了Maven依赖和一些关于token的标准声明和自定义声明的解释。
关于使用Java-JWT的笔记
|
7天前
|
SQL Java Linux
Java 8 API添加了一个新的抽象称为流Stream
Java 8 API添加了一个新的抽象称为流Stream
|
9天前
|
Java
flyway报错Caused by: java.lang.NoSuchMethodError: org.flywaydb.core.api.configuration.FluentConfigurat
flyway报错Caused by: java.lang.NoSuchMethodError: org.flywaydb.core.api.configuration.FluentConfigurat
14 2
|
20天前
|
Java 开发者 数据格式
【Java笔记+踩坑】SpringBoot基础4——原理篇
bean的8种加载方式,自动配置原理、自定义starter开发、SpringBoot程序启动流程解析
【Java笔记+踩坑】SpringBoot基础4——原理篇
|
2月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
85 9
|
2月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
59 3
下一篇
无影云桌面