Kafka——使用java api进行pub & sub

简介:        之前用过老的api,但是最近在写消费的时候,发现之前老的api很多方法都out了,又去官网看了下最新的0.10.x的api.1,producer org.apache.


       之前用过老的api,但是最近在写消费的时候,发现之前老的api很多方法都out了,又去官网看了下最新的0.10.x的api.

1,producer


<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>0.10.1.0</version>
		</dependency>


  public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.1.31:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for(int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<String, String>("defaultTopic", Integer.toString(i), String.valueOf(i)));

        producer.close();
    }


2,Comsumer


<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>0.10.1.0</version>
		</dependency>
 
  
 
  
 public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.1.31:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("defaultTopic"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(10);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }

    }



写完java api,可以try一下spring-intergeted,方便,一些池操作可以屏蔽掉,专注业务。






目录
相关文章
|
8天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
28 2
|
16天前
|
消息中间件 缓存 Java
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
零拷贝技术 Zero-Copy 是指计算机执行操作时,可以直接从源(如文件或网络套接字)将数据传输到目标缓冲区, 而不需要 CPU 先将数据从某处内存复制到另一个特定区域,从而减少上下文切换以及 CPU 的拷贝时间。
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
|
23天前
|
Java API 数据处理
探索Java中的Lambda表达式与Stream API
【10月更文挑战第22天】 在Java编程中,Lambda表达式和Stream API是两个强大的功能,它们极大地简化了代码的编写和提高了开发效率。本文将深入探讨这两个概念的基本用法、优势以及在实际项目中的应用案例,帮助读者更好地理解和运用这些现代Java特性。
|
1月前
|
消息中间件 NoSQL Kafka
大数据-52 Kafka 基础概念和基本架构 核心API介绍 应用场景等
大数据-52 Kafka 基础概念和基本架构 核心API介绍 应用场景等
62 5
|
1月前
|
消息中间件 分布式计算 Java
大数据-73 Kafka 高级特性 稳定性-事务 相关配置 事务操作Java 幂等性 仅一次发送
大数据-73 Kafka 高级特性 稳定性-事务 相关配置 事务操作Java 幂等性 仅一次发送
31 2
|
1月前
|
消息中间件 存储 Java
大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器 Java代码实现
大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器 Java代码实现
44 3
|
1月前
|
消息中间件 NoSQL Kafka
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
42 4
|
1月前
|
分布式计算 Java 大数据
大数据-147 Apache Kudu 常用 Java API 增删改查
大数据-147 Apache Kudu 常用 Java API 增删改查
32 1
|
1月前
|
消息中间件 Java 大数据
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
66 2
|
Java API
Java如何制作帮助文档(API)
Java如何制作帮助文档(API) 步骤如下:   (1)写一个工具类   (2)对这个类加入文档注释   (3)用工具解析文档注释     javadoc工具   (4)格式     javadoc -d 目录 -author -version ArrayTool.java   制作帮助文档(API)出错问题解决:     找不到可以文档化的公共或受保护的类       这句话告诉我们对想要操作的类的权限不够。
1117 0

热门文章

最新文章

下一篇
无影云桌面