Kafka Java API示例

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介:         1、Producer端 import java.util.Properties;import kafka.javaapi.producer.Producer;import kafka.

        1、Producer端

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class KafkaProducer{
	
	private final Producer<String, String> producer;
    public final static String TOPIC = "testtopic";

    private KafkaProducer(){
    	
        Properties props = new Properties();
        
        // 此处配置的是kafka的broker地址:端口列表
        props.put("metadata.broker.list", "192.168.1.225:9092,192.168.1.226:9092");

        //配置value的序列化类
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        
        //配置key的序列化类
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");

        //request.required.acks
        //0, which means that the producer never waits for an acknowledgement from the broker (the same behavior as 0.7). This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails).
        //1, which means that the producer gets an acknowledgement after the leader replica has received the data. This option provides better durability as the client waits until the server acknowledges the request as successful (only messages that were written to the now-dead leader but not yet replicated will be lost).
        //-1, which means that the producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the best durability, we guarantee that no messages will be lost as long as at least one in sync replica remains.
        props.put("request.required.acks","-1");

        producer = new Producer<String, String>(new ProducerConfig(props));
    }

    void produce() {
        int messageNo = 1;
        final int COUNT = 101;

        int messageCount = 0;
        while (messageNo < COUNT) {
            String key = String.valueOf(messageNo);
            String data = "Hello kafka message :" + key;
            producer.send(new KeyedMessage<String, String>(TOPIC, key ,data));
            System.out.println(data);
            messageNo ++;
            messageCount++;
        }
        
        System.out.println("Producer端一共产生了" + messageCount + "条消息!");
    }

    public static void main( String[] args )
    {
        new KafkaProducer().produce();
    }
}
        运行结果:

Hello kafka message :1
Hello kafka message :2
Hello kafka message :3
Hello kafka message :4
Hello kafka message :5
Hello kafka message :6
Hello kafka message :7
Hello kafka message :8
Hello kafka message :9
Hello kafka message :10
Hello kafka message :11
Hello kafka message :12
Hello kafka message :13
Hello kafka message :14
Hello kafka message :15
Hello kafka message :16
Hello kafka message :17
Hello kafka message :18
Hello kafka message :19
Hello kafka message :20
Hello kafka message :21
Hello kafka message :22
Hello kafka message :23
Hello kafka message :24
Hello kafka message :25
Hello kafka message :26
Hello kafka message :27
Hello kafka message :28
Hello kafka message :29
Hello kafka message :30
Hello kafka message :31
Hello kafka message :32
Hello kafka message :33
Hello kafka message :34
Hello kafka message :35
Hello kafka message :36
Hello kafka message :37
Hello kafka message :38
Hello kafka message :39
Hello kafka message :40
Hello kafka message :41
Hello kafka message :42
Hello kafka message :43
Hello kafka message :44
Hello kafka message :45
Hello kafka message :46
Hello kafka message :47
Hello kafka message :48
Hello kafka message :49
Hello kafka message :50
Hello kafka message :51
Hello kafka message :52
Hello kafka message :53
Hello kafka message :54
Hello kafka message :55
Hello kafka message :56
Hello kafka message :57
Hello kafka message :58
Hello kafka message :59
Hello kafka message :60
Hello kafka message :61
Hello kafka message :62
Hello kafka message :63
Hello kafka message :64
Hello kafka message :65
Hello kafka message :66
Hello kafka message :67
Hello kafka message :68
Hello kafka message :69
Hello kafka message :70
Hello kafka message :71
Hello kafka message :72
Hello kafka message :73
Hello kafka message :74
Hello kafka message :75
Hello kafka message :76
Hello kafka message :77
Hello kafka message :78
Hello kafka message :79
Hello kafka message :80
Hello kafka message :81
Hello kafka message :82
Hello kafka message :83
Hello kafka message :84
Hello kafka message :85
Hello kafka message :86
Hello kafka message :87
Hello kafka message :88
Hello kafka message :89
Hello kafka message :90
Hello kafka message :91
Hello kafka message :92
Hello kafka message :93
Hello kafka message :94
Hello kafka message :95
Hello kafka message :96
Hello kafka message :97
Hello kafka message :98
Hello kafka message :99
Hello kafka message :100
Producer端一共产生了100条消息!
        2、Consumer端

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;

public class KafkaConsumer {

    private final ConsumerConnector consumer;

    private KafkaConsumer() {
        Properties props = new Properties();
        
        // zookeeper 配置
        props.put("zookeeper.connect", "server3:2181");

        // 消费者所在组
        props.put("group.id", "testgroup");

        // zk连接超时
        props.put("zookeeper.session.timeout.ms", "4000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");
        
        // 序列化类
        props.put("serializer.class", "kafka.serializer.StringEncoder");

        ConsumerConfig config = new ConsumerConfig(props);

        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
    }

    void consume() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(KafkaProducer.TOPIC, new Integer(1));

        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

        Map<String, List<KafkaStream<String, String>>> consumerMap = 
                consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
        KafkaStream<String, String> stream = consumerMap.get(KafkaProducer.TOPIC).get(0);
        ConsumerIterator<String, String> it = stream.iterator();
        
        int messageCount = 0;
        while (it.hasNext()){
        	System.out.println(it.next().message());
        	messageCount++;
        	if(messageCount == 100){
        		System.out.println("Consumer端一共消费了" + messageCount + "条消息!");
        	}
        }
    }

    public static void main(String[] args) {
        new KafkaConsumer().consume();
    }
}
        运行结果:

Hello kafka message :3
Hello kafka message :8
Hello kafka message :14
Hello kafka message :19
Hello kafka message :23
Hello kafka message :28
Hello kafka message :32
Hello kafka message :37
Hello kafka message :41
Hello kafka message :46
Hello kafka message :50
Hello kafka message :55
Hello kafka message :64
Hello kafka message :69
Hello kafka message :73
Hello kafka message :78
Hello kafka message :82
Hello kafka message :87
Hello kafka message :91
Hello kafka message :96
Hello kafka message :2
Hello kafka message :7
Hello kafka message :13
Hello kafka message :18
Hello kafka message :22
Hello kafka message :27
Hello kafka message :31
Hello kafka message :36
Hello kafka message :40
Hello kafka message :45
Hello kafka message :54
Hello kafka message :59
Hello kafka message :63
Hello kafka message :68
Hello kafka message :72
Hello kafka message :77
Hello kafka message :81
Hello kafka message :86
Hello kafka message :90
Hello kafka message :95
Hello kafka message :100
Hello kafka message :5
Hello kafka message :11
Hello kafka message :16
Hello kafka message :20
Hello kafka message :25
Hello kafka message :34
Hello kafka message :39
Hello kafka message :43
Hello kafka message :48
Hello kafka message :52
Hello kafka message :57
Hello kafka message :61
Hello kafka message :66
Hello kafka message :70
Hello kafka message :75
Hello kafka message :84
Hello kafka message :89
Hello kafka message :93
Hello kafka message :98
Hello kafka message :4
Hello kafka message :9
Hello kafka message :10
Hello kafka message :15
Hello kafka message :24
Hello kafka message :29
Hello kafka message :33
Hello kafka message :38
Hello kafka message :42
Hello kafka message :47
Hello kafka message :51
Hello kafka message :56
Hello kafka message :60
Hello kafka message :65
Hello kafka message :74
Hello kafka message :79
Hello kafka message :83
Hello kafka message :88
Hello kafka message :92
Hello kafka message :97
Hello kafka message :1
Hello kafka message :6
Hello kafka message :12
Hello kafka message :17
Hello kafka message :21
Hello kafka message :26
Hello kafka message :30
Hello kafka message :35
Hello kafka message :44
Hello kafka message :49
Hello kafka message :53
Hello kafka message :58
Hello kafka message :62
Hello kafka message :67
Hello kafka message :71
Hello kafka message :76
Hello kafka message :80
Hello kafka message :85
Hello kafka message :94
Hello kafka message :99
Consumer端一共消费了100条消息!




相关文章
|
1天前
|
Java API
Java一分钟之-Java日期与时间API:LocalDate, LocalDateTime
【5月更文挑战第13天】Java 8引入`java.time`包,改进日期时间API。`LocalDate`代表日期,`LocalDateTime`包含日期和时间。本文概述两者的基本用法、常见问题及解决策略。创建日期时间使用`of()`和`parse()`,操作日期时间有`plusDays()`、`minusMonths()`等。注意点包括:设置正确的`DateTimeFormatter`,考虑闰年影响,以及在需要时区信息时使用`ZonedDateTime`。正确使用这些类能提升代码质量。
10 3
|
1天前
|
Java API 数据处理
Java一分钟之-Stream API:数据处理新方式
【5月更文挑战第13天】Java 8的Stream API为集合操作提供了声明式编程,简化数据处理。本文介绍了Stream的基本概念、常见问题和易错点。问题包括并行流与顺序流的区别,状态改变操作的影响,以及忘记调用终止操作和误用`peek()`。理解并合理使用Stream API能提升代码效率和可维护性。实践中不断探索,将发掘更多Stream API的潜力。
11 3
|
1天前
|
安全 Java API
Java Stream API详解与使用
Java Stream API是Java 8引入的特性,提供函数式操作处理集合,支持链式操作和并行处理,提升代码可读性和性能。关键点包括:延迟执行的中间操作(如filter, map)和触发计算的终端操作(如collect, forEach)。示例展示了如何从Person列表过滤出年龄大于20的姓名并排序。使用Stream时注意避免中间操作的副作用,终端操作后Stream不能复用,以及并行操作的线程安全性。
|
3天前
|
Java 程序员 API
Java 8新特性之Lambda表达式与Stream API的深度解析
【5月更文挑战第12天】本文将深入探讨Java 8中的两个重要新特性:Lambda表达式和Stream API。我们将从基本概念入手,逐步深入到实际应用场景,帮助读者更好地理解和掌握这两个新特性,提高Java编程效率。
40 2
|
4天前
|
Java API
【JAVA进阶篇教学】第三篇:JDK8中Stream API使用
【JAVA进阶篇教学】第三篇:JDK8中Stream API使用
|
Java API
Java 8 Stream API详解
版权声明:本文为博主chszs的原创文章,未经博主允许不得转载。 https://blog.csdn.net/chszs/article/details/47038607 Java ...
982 0
|
3天前
|
安全 API 开发者
智能体-Agent能力升级!新增Assistant API & Tools API服务接口
ModelScope-Agent是一个交互式创作空间,它支持LLM(Language Model)的扩展能力,例如工具调用(function calling)和知识检索(knowledge retrieval)。它已经对相关接口进行了开源,以提供更原子化的应用LLM能力。用户可以通过Modelscope-Agent上的不同代理(agent),结合自定义的LLM配置和消息,调用这些能力。
|
8天前
|
JSON 搜索推荐 数据挖掘
电商数据分析的利器:电商关键词搜索API接口(标题丨图片丨价格丨链接)
淘宝关键词搜索接口为电商领域的数据分析提供了丰富的数据源。通过有效利用这一接口,企业和研究人员可以更深入地洞察市场动态,优化营销策略,并提升用户体验。随着电商平台技术的不断进步,未来的API将更加智能和个性化,为电商行业带来更多的可能性。
|
15天前
|
存储 缓存 运维
DataWorks操作报错合集之DataWorks根据api,调用查询文件列表接口报错如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
23 1
|
16天前
|
SQL 数据管理 API
数据管理DMS产品使用合集之阿里云DMS提供API接口来进行数据导出功能吗
阿里云数据管理DMS提供了全面的数据管理、数据库运维、数据安全、数据迁移与同步等功能,助力企业高效、安全地进行数据库管理和运维工作。以下是DMS产品使用合集的详细介绍。