Kafka Java API示例

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介:         1、Producer端 import java.util.Properties;import kafka.javaapi.producer.Producer;import kafka.

        1、Producer端

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class KafkaProducer{
	
	private final Producer<String, String> producer;
    public final static String TOPIC = "testtopic";

    private KafkaProducer(){
    	
        Properties props = new Properties();
        
        // 此处配置的是kafka的broker地址:端口列表
        props.put("metadata.broker.list", "192.168.1.225:9092,192.168.1.226:9092");

        //配置value的序列化类
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        
        //配置key的序列化类
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");

        //request.required.acks
        //0, which means that the producer never waits for an acknowledgement from the broker (the same behavior as 0.7). This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails).
        //1, which means that the producer gets an acknowledgement after the leader replica has received the data. This option provides better durability as the client waits until the server acknowledges the request as successful (only messages that were written to the now-dead leader but not yet replicated will be lost).
        //-1, which means that the producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the best durability, we guarantee that no messages will be lost as long as at least one in sync replica remains.
        props.put("request.required.acks","-1");

        producer = new Producer<String, String>(new ProducerConfig(props));
    }

    void produce() {
        int messageNo = 1;
        final int COUNT = 101;

        int messageCount = 0;
        while (messageNo < COUNT) {
            String key = String.valueOf(messageNo);
            String data = "Hello kafka message :" + key;
            producer.send(new KeyedMessage<String, String>(TOPIC, key ,data));
            System.out.println(data);
            messageNo ++;
            messageCount++;
        }
        
        System.out.println("Producer端一共产生了" + messageCount + "条消息!");
    }

    public static void main( String[] args )
    {
        new KafkaProducer().produce();
    }
}
        运行结果:

Hello kafka message :1
Hello kafka message :2
Hello kafka message :3
Hello kafka message :4
Hello kafka message :5
Hello kafka message :6
Hello kafka message :7
Hello kafka message :8
Hello kafka message :9
Hello kafka message :10
Hello kafka message :11
Hello kafka message :12
Hello kafka message :13
Hello kafka message :14
Hello kafka message :15
Hello kafka message :16
Hello kafka message :17
Hello kafka message :18
Hello kafka message :19
Hello kafka message :20
Hello kafka message :21
Hello kafka message :22
Hello kafka message :23
Hello kafka message :24
Hello kafka message :25
Hello kafka message :26
Hello kafka message :27
Hello kafka message :28
Hello kafka message :29
Hello kafka message :30
Hello kafka message :31
Hello kafka message :32
Hello kafka message :33
Hello kafka message :34
Hello kafka message :35
Hello kafka message :36
Hello kafka message :37
Hello kafka message :38
Hello kafka message :39
Hello kafka message :40
Hello kafka message :41
Hello kafka message :42
Hello kafka message :43
Hello kafka message :44
Hello kafka message :45
Hello kafka message :46
Hello kafka message :47
Hello kafka message :48
Hello kafka message :49
Hello kafka message :50
Hello kafka message :51
Hello kafka message :52
Hello kafka message :53
Hello kafka message :54
Hello kafka message :55
Hello kafka message :56
Hello kafka message :57
Hello kafka message :58
Hello kafka message :59
Hello kafka message :60
Hello kafka message :61
Hello kafka message :62
Hello kafka message :63
Hello kafka message :64
Hello kafka message :65
Hello kafka message :66
Hello kafka message :67
Hello kafka message :68
Hello kafka message :69
Hello kafka message :70
Hello kafka message :71
Hello kafka message :72
Hello kafka message :73
Hello kafka message :74
Hello kafka message :75
Hello kafka message :76
Hello kafka message :77
Hello kafka message :78
Hello kafka message :79
Hello kafka message :80
Hello kafka message :81
Hello kafka message :82
Hello kafka message :83
Hello kafka message :84
Hello kafka message :85
Hello kafka message :86
Hello kafka message :87
Hello kafka message :88
Hello kafka message :89
Hello kafka message :90
Hello kafka message :91
Hello kafka message :92
Hello kafka message :93
Hello kafka message :94
Hello kafka message :95
Hello kafka message :96
Hello kafka message :97
Hello kafka message :98
Hello kafka message :99
Hello kafka message :100
Producer端一共产生了100条消息!
        2、Consumer端

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;

public class KafkaConsumer {

    private final ConsumerConnector consumer;

    private KafkaConsumer() {
        Properties props = new Properties();
        
        // zookeeper 配置
        props.put("zookeeper.connect", "server3:2181");

        // 消费者所在组
        props.put("group.id", "testgroup");

        // zk连接超时
        props.put("zookeeper.session.timeout.ms", "4000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");
        
        // 序列化类
        props.put("serializer.class", "kafka.serializer.StringEncoder");

        ConsumerConfig config = new ConsumerConfig(props);

        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
    }

    void consume() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(KafkaProducer.TOPIC, new Integer(1));

        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

        Map<String, List<KafkaStream<String, String>>> consumerMap = 
                consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
        KafkaStream<String, String> stream = consumerMap.get(KafkaProducer.TOPIC).get(0);
        ConsumerIterator<String, String> it = stream.iterator();
        
        int messageCount = 0;
        while (it.hasNext()){
        	System.out.println(it.next().message());
        	messageCount++;
        	if(messageCount == 100){
        		System.out.println("Consumer端一共消费了" + messageCount + "条消息!");
        	}
        }
    }

    public static void main(String[] args) {
        new KafkaConsumer().consume();
    }
}
        运行结果:

Hello kafka message :3
Hello kafka message :8
Hello kafka message :14
Hello kafka message :19
Hello kafka message :23
Hello kafka message :28
Hello kafka message :32
Hello kafka message :37
Hello kafka message :41
Hello kafka message :46
Hello kafka message :50
Hello kafka message :55
Hello kafka message :64
Hello kafka message :69
Hello kafka message :73
Hello kafka message :78
Hello kafka message :82
Hello kafka message :87
Hello kafka message :91
Hello kafka message :96
Hello kafka message :2
Hello kafka message :7
Hello kafka message :13
Hello kafka message :18
Hello kafka message :22
Hello kafka message :27
Hello kafka message :31
Hello kafka message :36
Hello kafka message :40
Hello kafka message :45
Hello kafka message :54
Hello kafka message :59
Hello kafka message :63
Hello kafka message :68
Hello kafka message :72
Hello kafka message :77
Hello kafka message :81
Hello kafka message :86
Hello kafka message :90
Hello kafka message :95
Hello kafka message :100
Hello kafka message :5
Hello kafka message :11
Hello kafka message :16
Hello kafka message :20
Hello kafka message :25
Hello kafka message :34
Hello kafka message :39
Hello kafka message :43
Hello kafka message :48
Hello kafka message :52
Hello kafka message :57
Hello kafka message :61
Hello kafka message :66
Hello kafka message :70
Hello kafka message :75
Hello kafka message :84
Hello kafka message :89
Hello kafka message :93
Hello kafka message :98
Hello kafka message :4
Hello kafka message :9
Hello kafka message :10
Hello kafka message :15
Hello kafka message :24
Hello kafka message :29
Hello kafka message :33
Hello kafka message :38
Hello kafka message :42
Hello kafka message :47
Hello kafka message :51
Hello kafka message :56
Hello kafka message :60
Hello kafka message :65
Hello kafka message :74
Hello kafka message :79
Hello kafka message :83
Hello kafka message :88
Hello kafka message :92
Hello kafka message :97
Hello kafka message :1
Hello kafka message :6
Hello kafka message :12
Hello kafka message :17
Hello kafka message :21
Hello kafka message :26
Hello kafka message :30
Hello kafka message :35
Hello kafka message :44
Hello kafka message :49
Hello kafka message :53
Hello kafka message :58
Hello kafka message :62
Hello kafka message :67
Hello kafka message :71
Hello kafka message :76
Hello kafka message :80
Hello kafka message :85
Hello kafka message :94
Hello kafka message :99
Consumer端一共消费了100条消息!




相关文章
|
8天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
28 2
|
13天前
|
JSON API 数据格式
Amazon商品详情API,json数据格式示例参考
亚马逊商品详情API接口返回的JSON数据格式通常包含丰富的商品信息,以下是一个简化的JSON数据格式示例参考
|
15天前
|
缓存 监控 Java
如何运用JAVA开发API接口?
本文详细介绍了如何使用Java开发API接口,涵盖创建、实现、测试和部署接口的关键步骤。同时,讨论了接口的安全性设计和设计原则,帮助开发者构建高效、安全、易于维护的API接口。
42 4
|
16天前
|
消息中间件 缓存 Java
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
零拷贝技术 Zero-Copy 是指计算机执行操作时,可以直接从源(如文件或网络套接字)将数据传输到目标缓冲区, 而不需要 CPU 先将数据从某处内存复制到另一个特定区域,从而减少上下文切换以及 CPU 的拷贝时间。
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
|
22天前
|
JSON API 数据格式
店铺所有商品列表接口json数据格式示例(API接口)
当然,以下是一个示例的JSON数据格式,用于表示一个店铺所有商品列表的API接口响应
|
23天前
|
Java API 数据处理
探索Java中的Lambda表达式与Stream API
【10月更文挑战第22天】 在Java编程中,Lambda表达式和Stream API是两个强大的功能,它们极大地简化了代码的编写和提高了开发效率。本文将深入探讨这两个概念的基本用法、优势以及在实际项目中的应用案例,帮助读者更好地理解和运用这些现代Java特性。
|
29天前
|
Java 大数据 API
别死脑筋,赶紧学起来!Java之Steam() API 常用方法使用,让开发简单起来!
分享Java Stream API的常用方法,让开发更简单。涵盖filter、map、sorted等操作,提高代码效率与可读性。关注公众号,了解更多技术内容。
|
8天前
|
JSON API 数据格式
携程API接口系列,酒店景点详情请求示例参考
携程API接口系列涵盖了酒店预订、机票预订、旅游度假产品预订、景点门票预订等多个领域,其中酒店和景点详情请求是较为常用的功能。以下提供酒店和景点详情请求的示例参考
|
12天前
|
JSON API 数据安全/隐私保护
拍立淘按图搜索API接口返回数据的JSON格式示例
拍立淘按图搜索API接口允许用户通过上传图片来搜索相似的商品,该接口返回的通常是一个JSON格式的响应,其中包含了与上传图片相似的商品信息。以下是一个基于淘宝平台的拍立淘按图搜索API接口返回数据的JSON格式示例,同时提供对其关键字段的解释
|
1月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
下一篇
无影云桌面