【Kafka】(五)Java 操作 Kafka

简介: 【Kafka】(五)Java 操作 Kafka

文章目录


一、创建消息队列

二、pom.xml

三、生产者

四、消费者


java操作kafka非常的简单,然后kafka也提供了很多缺省值,一般情况下我们不需要修改太多的参数就能使用。下面我贴出代码。


一、创建消息队列


kafka-topics.sh --create --zookeeper 192.168.56.137:2181 --topic demo8 --replication-factor 1 --partitions 1


二、pom.xml


<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.1.1</version>
</dependency>


三、生产者


package com.njbdqn.services;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
 * @author:Tokgo J
 * @date:2020/2/11
 * @aim:生产者:往Demo8消息队列写入消息
 */
public class MyProducer {
    public static void main(String[] args) {
        // 定义配置信息
        Properties prop = new Properties();
        // kafka地址,多个地址用逗号分割  "192.168.23.76:9092,192.168.23.77:9092"
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.56.137:9092");
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        KafkaProducer<String,String> prod = new KafkaProducer<String, String>(prop);
        // 发送消息
        try {
            for(int i=0;i<10;i++) {
                // 生产者记录消息
                ProducerRecord<String, String> pr = new ProducerRecord<String, String>("demo8", "hello world"+i);
                prod.send(pr);
                Thread.sleep(500);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            prod.close();
        }
    }
}


注意:


1.kafka如果是集群,多个地址用逗号分割(,)

2.Properties的put方法,第一个参数可以是字符串,如:p.put(“bootstrap.servers”,“192.168.23.76:9092”)

3.kafkaProducer.send(record)可以通过返回的Future来判断是否已经发送到kafka,增强消息的可靠性。同时也可以使用send的第二个参数来回调,通过回调判断是否发送成功。

4.p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);设置序列化类,可以写类的全路径


四、消费者


package com.njbdqn.services;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
/**
 * @author:Tokgo J
 * @date:2020/2/11
 * @aim:消费者:读取kafka数据
 */
public class MyConsumer {
    public static void main(String[] args) {
        Properties prop = new Properties();
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.137:9092");
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        prop.put("session.timeout.ms", "30000");
        //消费者是否自动提交偏移量,默认是true 避免出现重复数据 设为false
        prop.put("enable.auto.commit", "false");
        prop.put("auto.commit.interval.ms", "1000");
        //auto.offset.reset 消费者在读取一个没有偏移量的分区或者偏移量无效的情况下的处理
        //earliest 在偏移量无效的情况下 消费者将从起始位置读取分区的记录
        //latest 在偏移量无效的情况下 消费者将从最新位置读取分区的记录
        prop.put("auto.offset.reset", "earliest");
        // 设置组名
        prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group_4");
        KafkaConsumer<String, String> con = new KafkaConsumer<String, String>(prop);
        con.subscribe(Collections.singletonList("demo8"));
        while (true) {
            ConsumerRecords<String, String> records = con.poll(Duration.ofSeconds(100));
            for (ConsumerRecord<String, String> rec : records) {
                System.out.println(String.format("offset:%d,key:%s,value:%s", rec.offset(), rec.key(), rec.value()));
            }
        }
    }
}


注意:


1.订阅消息可以订阅多个主题

2.ConsumerConfig.GROUP_ID_CONFIG表示消费者的分组,kafka根据分组名称判断是不是同一组消费者,同一组消费者去消费一个主题的数据的时候,数据将在这一组消费者上面轮询。

3.主题涉及到分区的概念,同一组消费者的个数不能大于分区数。因为:一个分区只能被同一群组的一个消费者消费。出现分区小于消费者个数的时候,可以动态增加分区。

4.注意和生产者的对比,Properties中的key和value是反序列化,而生产者是序列化。

目录
相关文章
|
6天前
|
Java 数据库 数据安全/隐私保护
Java操作Excel文件导入导出【内含有 jxl.jar 】
Java操作Excel文件导入导出【内含有 jxl.jar 】
20 0
|
9天前
|
消息中间件 Java 关系型数据库
实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
583 0
|
13天前
|
存储 缓存 Java
滚雪球学Java(59):从基础到高阶:Java中LinkedList的操作指南
【6月更文挑战第13天】🏆本文收录于「滚雪球学Java」专栏,专业攻坚指数级提升,希望能够助你一臂之力,帮你早日登顶实现财富自由🚀;同时,欢迎大家关注&&收藏&&订阅!持续更新中,up!up!up!!
12 1
滚雪球学Java(59):从基础到高阶:Java中LinkedList的操作指南
|
2天前
|
安全 Java 程序员
在Java中,finalization是一种机制,允许对象在被垃圾收集器回收之前执行一些清理操作。
【6月更文挑战第24天】Java中的finalization机制允许对象在被垃圾收集前执行清理,以释放系统资源或处理敏感信息。`finalize()`方法用于定义此类操作,但它不是可靠的资源管理策略,因为调用时机不确定且可能影响性能。尽管可用于清理外部资源或作为保护措施,但应避免依赖finalization,而应优先采用手动资源管理,遵循“创建者负责”原则。
8 1
|
7天前
|
Java
在Java中,你可以创建一个简单的四则运算程序来执行小学级别的加减乘除操作
【6月更文挑战第19天】Java程序实现简单四则运算,接收用户输入的两个数字和运算符,根据运算符调用相应函数进行计算。包含加、减、乘、除功能,其中除法操作检查了除数是否为零,避免运行时错误。
22 5
|
5天前
|
Java 索引
从干将莫邪的故事说起--java比较操作注意要点
从干将莫邪的故事说起--java比较操作注意要点
|
6天前
|
NoSQL Java Redis
如何在 Java 中操作这些 Redis 数据结构的基本方法
如何在 Java 中操作这些 Redis 数据结构的基本方法
11 2
|
7天前
|
Java API
使用 Java 来实现两个 List 的差集操作
使用 Java 来实现两个 List 的差集操作
12 3
|
7天前
|
算法 搜索推荐 Java
二叉树的基本概念、常见操作以及如何使用Java代码
二叉树的基本概念、常见操作以及如何使用Java代码
10 1
|
7天前
|
存储 安全 Java
Java集合类是Java编程语言中用于存储和操作一组对象的工具
【6月更文挑战第19天】Java集合类,如`List`、`Set`、`Map`在`java.util`包中,提供高级数据结构。常用实现包括`ArrayList`(快速随机访问)、`LinkedList`(高效插入删除)、`HashSet`(无序不重复)、`TreeSet`(排序)、`HashMap`(键值对)和`TreeMap`(排序映射)。集合动态调整大小,支持对象引用,部分保证顺序。选择合适集合优化性能和数据组织。
10 1