使用kafka-clients操作数据(java)

简介: 使用kafka-clients操作数据(java)

一、添加依赖

     <!--    kafka-clients-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.5.1</version>
        </dependency>

二、生产者

自定义分区,可忽略

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
 
import java.util.Map;
 
public class MyPatitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
 
        String msgStr = value.toString();
        if(msgStr.contains("a")){
            return 1;
        }
        return 0;
    }
 
    @Override
    public void close() {
 
    }
 
    @Override
    public void configure(Map<String, ?> configs) {
 
    }
}

1、普通消息

 public static void main(String[] args) throws ExecutionException, InterruptedException {
        //配置
        Properties properties = new Properties();
        //连接参数
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.157.130:9092");
        //序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //关联自定义分区器 可选
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.minos.kafka.producer.MyPatitioner");
 
        //优化参数 可选
        //缓冲器大小 32M
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 30 * 1024 * 1024);
        //批次大小
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);
        //Linger.ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        //压缩
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        //acks
        properties.put(ProducerConfig.ACKS_CONFIG, "-1");
        //重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);
 
 
        //创建生产者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
 
        //异步发送数据
        for (int i = 0; i < 10; i++) {
            //给first主题发消息
            kafkaProducer.send(new ProducerRecord<String, String>("first", "hello" + i));
            //回调异步发送
            kafkaProducer.send(new ProducerRecord<String, String>("first", "hello2" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println("主题:" + recordMetadata.topic() + "分区:" + recordMetadata.partition());
                    }
                }
            });
            kafkaProducer.send(new ProducerRecord<String, String>("first", "a" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println("主题:" + recordMetadata.topic() + "分区" + recordMetadata.partition() + "a");
                    }
                }
            });
            Thread.sleep(500);
        }
 
        //同步
        for (int i = 0; i < 10; i++) {
            //给first主题发消息
            kafkaProducer.send(new ProducerRecord<String, String>("first", "sync_hello" + i)).get();
        }
 
        //关闭资源
        kafkaProducer.close();
    }
root@ubuntu2203:/usr/local/kafka_2.12-3.5.1/bin# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
a0
hello0
hello20
a1
hello1
hello21
a2
hello2
hello22
a3
hello3
hello23
a4
hello4
hello24
a5
hello5
hello25
a6
hello6
hello26
a7
hello7
hello27
a8
hello8
hello28
a9
hello9
hello29
sync_hello0
sync_hello1
sync_hello2
sync_hello3
sync_hello4
sync_hello5
sync_hello6
sync_hello7
sync_hello8
sync_hello9

2、事务消息

 public static void main(String[] args) throws ExecutionException, InterruptedException {
        //配置
        Properties properties = new Properties();
        //连接参数
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.157.130:9092");
        //序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //关联自定义分区器 可选
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.minos.kafka.producer.MyPatitioner");
 
        //优化参数 可选
        //缓冲器大小 32M
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 30 * 1024 * 1024);
        //批次大小
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);
        //Linger.ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        //压缩
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        //acks
        properties.put(ProducerConfig.ACKS_CONFIG, "-1");
        //重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);
 
        //指定事务ID
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactional_id_01");
        properties.put("enable.idempotence", "true");
 
        //创建生产者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
 
        //事务消息 初始化
        kafkaProducer.initTransactions();
        //开始事务
        kafkaProducer.beginTransaction();
        try {
            kafkaProducer.send(new ProducerRecord<String, String>("first", "Transactions")).get();
            //提交事务
            kafkaProducer.commitTransaction();
        } catch (Exception e) {
            //终止事务
            kafkaProducer.abortTransaction();
        } finally {
            //关闭资源
            kafkaProducer.close();
        }
    }
root@ubuntu2203:/usr/local/kafka_2.12-3.5.1/bin# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
Transactions
相关文章
|
2月前
|
Java API 开发工具
【Azure Developer】Java代码实现获取Azure 资源的指标数据却报错 "invalid time interval input"
在使用 Java 调用虚拟机 API 获取指标数据时,因本地时区设置非 UTC,导致时间格式解析错误。解决方法是在代码中手动指定时区为 UTC,使用 `ZoneOffset.ofHours(0)` 并结合 `withOffsetSameInstant` 方法进行时区转换,从而避免因时区差异引发的时间格式问题。
185 3
|
3月前
|
消息中间件 Java Kafka
Java 事件驱动架构设计实战与 Kafka 生态系统组件实操全流程指南
本指南详解Java事件驱动架构与Kafka生态实操,涵盖环境搭建、事件模型定义、生产者与消费者实现、事件测试及高级特性,助你快速构建高可扩展分布式系统。
226 7
|
3月前
|
数据采集 JSON Java
Java爬虫获取1688店铺所有商品接口数据实战指南
本文介绍如何使用Java爬虫技术高效获取1688店铺商品信息,涵盖环境搭建、API调用、签名生成及数据抓取全流程,并附完整代码示例,助力市场分析与选品决策。
|
3月前
|
数据采集 存储 前端开发
Java爬虫性能优化:多线程抓取JSP动态数据实践
Java爬虫性能优化:多线程抓取JSP动态数据实践
|
2月前
|
算法 Java
Java多线程编程:实现线程间数据共享机制
以上就是Java中几种主要处理多线程序列化资源以及协调各自独立运行但需相互配合以完成任务threads 的技术手段与策略。正确应用上述技术将大大增强你程序稳定性与效率同时也降低bug出现率因此深刻理解每项技术背后理论至关重要.
203 16
|
7月前
|
自然语言处理 Java 关系型数据库
Java|小数据量场景的模糊搜索体验优化
在小数据量场景下,如何优化模糊搜索体验?本文分享一个简单实用的方案,虽然有点“土”,但效果还不错。
156 0
|
传感器 分布式计算 安全
Java 大视界 -- Java 大数据在智能安防入侵检测系统中的多源数据融合与分析技术(171)
本文围绕 Java 大数据在智能安防入侵检测系统中的应用展开,剖析系统现状与挑战,阐释多源数据融合及分析技术,结合案例与代码给出实操方案,提升入侵检测效能。
|
10月前
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
453 1