使用Java编写Kafka生产者和消费者示例

简介: 使用Java编写Kafka生产者和消费者示例

标题:使用Java编写Kafka生产者和消费者示例

Kafka是一个分布式流处理平台,具有高性能、高吞吐量的特点,被广泛应用于消息队列、日志收集、事件处理等场景。在本文中,我们将介绍如何使用Java编写Kafka的生产者和消费者,以实现消息的生产和消费。

1. 准备工作

在开始之前,确保已经安装并配置了Kafka,并启动了Zookeeper和Kafka服务。可以从官方网站https://kafka.apache.org/downloads下载Kafka。

2. 编写Kafka生产者

首先,我们来编写一个简单的Kafka生产者,用于向指定的Topic发送消息。

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerExample {
    public static void main(String[] args) {
        String bootstrapServers = "localhost:9092";
        String topic = "test-topic";
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++) {
            String key = "key-" + i;
            String value = "message-" + i;
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("Message sent successfully! Topic: " + metadata.topic() +
                                ", Partition: " + metadata.partition() +
                                ", Offset: " + metadata.offset());
                    } else {
                        System.err.println("Error sending message: " + exception.getMessage());
                    }
                }
            });
        }
        producer.close();
    }
}

3. 编写Kafka消费者

接下来,我们编写一个简单的Kafka消费者,用于从指定的Topic消费消息。

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
    public static void main(String[] args) {
        String bootstrapServers = "localhost:9092";
        String topic = "test-topic";
        String groupId = "test-group";
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        props.put("group.id", groupId);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(topic));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received message: " +
                        "Key = " + record.key() +
                        ", Value = " + record.value() +
                        ", Topic = " + record.topic() +
                        ", Partition = " + record.partition() +
                        ", Offset = " + record.offset());
            }
        }
    }
}

4. 运行示例

将以上两个Java文件编译并运行,即可在Kafka中发送和接收消息。确保Kafka服务已经启动,并且Topic已经创建。

5. 总结

本文介绍了如何使用Java编写Kafka的生产者和消费者示例。通过这些示例代码,我们可以轻松地在Java应用中集成Kafka,实现消息的生产和消费。Kafka具有高性能、高可靠性的特点,适用于各种场景的消息处理需求。

相关文章
|
6月前
|
Java
在 Java 中捕获和处理自定义异常的代码示例
本文提供了一个 Java 代码示例,展示了如何捕获和处理自定义异常。通过创建自定义异常类并使用 try-catch 语句,可以更灵活地处理程序中的错误情况。
195 1
|
2月前
|
消息中间件 Java Kafka
SpringBoot使用Kafka生产者、消费者
SpringBoot使用Kafka生产者、消费者
100 10
|
7月前
|
存储 Java
Java中的HashMap和TreeMap,通过具体示例展示了它们在处理复杂数据结构问题时的应用。
【10月更文挑战第19天】本文详细介绍了Java中的HashMap和TreeMap,通过具体示例展示了它们在处理复杂数据结构问题时的应用。HashMap以其高效的插入、查找和删除操作著称,而TreeMap则擅长于保持元素的自然排序或自定义排序,两者各具优势,适用于不同的开发场景。
92 1
|
3月前
|
消息中间件 Kafka
【赵渝强老师】Kafka的消费者与消费者组
Kafka消费者是从Kafka集群中消费数据的客户端。单消费者模型在数据生产速度超过消费速度时会导致数据堆积。为解决此问题,Kafka引入了消费者组的概念,允许多个消费者共同消费同一主题的消息。消费者组由一个或多个消费者组成,它们动态分配和重新分配主题分区,确保消息处理的高效性和可靠性。视频讲解及示意图详细展示了这一机制。
|
5月前
|
Java
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
128 1
Java—多线程实现生产消费者
|
6月前
|
Java
在Java中实现接口的具体代码示例
可以根据具体的需求,创建更多的类来实现这个接口,以满足不同形状的计算需求。希望这个示例对你理解在 Java 中如何实现接口有所帮助。
188 38
|
7月前
|
存储 缓存 Java
java基础:IO流 理论与代码示例(详解、idea设置统一utf-8编码问题)
这篇文章详细介绍了Java中的IO流,包括字符与字节的概念、编码格式、File类的使用、IO流的分类和原理,以及通过代码示例展示了各种流的应用,如节点流、处理流、缓存流、转换流、对象流和随机访问文件流。同时,还探讨了IDEA中设置项目编码格式的方法,以及如何处理序列化和反序列化问题。
204 1
java基础:IO流 理论与代码示例(详解、idea设置统一utf-8编码问题)
|
6月前
|
消息中间件 缓存 Java
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
零拷贝技术 Zero-Copy 是指计算机执行操作时,可以直接从源(如文件或网络套接字)将数据传输到目标缓冲区, 而不需要 CPU 先将数据从某处内存复制到另一个特定区域,从而减少上下文切换以及 CPU 的拷贝时间。
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
|
6月前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
172 2
|
7月前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
84 1