使用kafka-clients操作数据(java)

简介: 使用kafka-clients操作数据(java)

一、添加依赖

     <!--    kafka-clients-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.5.1</version>
        </dependency>

二、生产者

自定义分区,可忽略

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
 
import java.util.Map;
 
public class MyPatitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
 
        String msgStr = value.toString();
        if(msgStr.contains("a")){
            return 1;
        }
        return 0;
    }
 
    @Override
    public void close() {
 
    }
 
    @Override
    public void configure(Map<String, ?> configs) {
 
    }
}

1、普通消息

 public static void main(String[] args) throws ExecutionException, InterruptedException {
        //配置
        Properties properties = new Properties();
        //连接参数
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.157.130:9092");
        //序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //关联自定义分区器 可选
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.minos.kafka.producer.MyPatitioner");
 
        //优化参数 可选
        //缓冲器大小 32M
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 30 * 1024 * 1024);
        //批次大小
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);
        //Linger.ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        //压缩
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        //acks
        properties.put(ProducerConfig.ACKS_CONFIG, "-1");
        //重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);
 
 
        //创建生产者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
 
        //异步发送数据
        for (int i = 0; i < 10; i++) {
            //给first主题发消息
            kafkaProducer.send(new ProducerRecord<String, String>("first", "hello" + i));
            //回调异步发送
            kafkaProducer.send(new ProducerRecord<String, String>("first", "hello2" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println("主题:" + recordMetadata.topic() + "分区:" + recordMetadata.partition());
                    }
                }
            });
            kafkaProducer.send(new ProducerRecord<String, String>("first", "a" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println("主题:" + recordMetadata.topic() + "分区" + recordMetadata.partition() + "a");
                    }
                }
            });
            Thread.sleep(500);
        }
 
        //同步
        for (int i = 0; i < 10; i++) {
            //给first主题发消息
            kafkaProducer.send(new ProducerRecord<String, String>("first", "sync_hello" + i)).get();
        }
 
        //关闭资源
        kafkaProducer.close();
    }
root@ubuntu2203:/usr/local/kafka_2.12-3.5.1/bin# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
a0
hello0
hello20
a1
hello1
hello21
a2
hello2
hello22
a3
hello3
hello23
a4
hello4
hello24
a5
hello5
hello25
a6
hello6
hello26
a7
hello7
hello27
a8
hello8
hello28
a9
hello9
hello29
sync_hello0
sync_hello1
sync_hello2
sync_hello3
sync_hello4
sync_hello5
sync_hello6
sync_hello7
sync_hello8
sync_hello9

2、事务消息

 public static void main(String[] args) throws ExecutionException, InterruptedException {
        //配置
        Properties properties = new Properties();
        //连接参数
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.157.130:9092");
        //序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //关联自定义分区器 可选
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.minos.kafka.producer.MyPatitioner");
 
        //优化参数 可选
        //缓冲器大小 32M
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 30 * 1024 * 1024);
        //批次大小
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);
        //Linger.ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        //压缩
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        //acks
        properties.put(ProducerConfig.ACKS_CONFIG, "-1");
        //重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);
 
        //指定事务ID
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactional_id_01");
        properties.put("enable.idempotence", "true");
 
        //创建生产者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
 
        //事务消息 初始化
        kafkaProducer.initTransactions();
        //开始事务
        kafkaProducer.beginTransaction();
        try {
            kafkaProducer.send(new ProducerRecord<String, String>("first", "Transactions")).get();
            //提交事务
            kafkaProducer.commitTransaction();
        } catch (Exception e) {
            //终止事务
            kafkaProducer.abortTransaction();
        } finally {
            //关闭资源
            kafkaProducer.close();
        }
    }
root@ubuntu2203:/usr/local/kafka_2.12-3.5.1/bin# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
Transactions
相关文章
|
2月前
|
前端开发 JavaScript Java
java常用数据判空、比较和类型转换
本文介绍了Java开发中常见的数据处理技巧,包括数据判空、数据比较和类型转换。详细讲解了字符串、Integer、对象、List、Map、Set及数组的判空方法,推荐使用工具类如StringUtils、Objects等。同时,讨论了基本数据类型与引用数据类型的比较方法,以及自动类型转换和强制类型转换的规则。最后,提供了数值类型与字符串互相转换的具体示例。
124 3
|
16天前
|
存储 NoSQL Java
使用Java和Spring Data构建数据访问层
本文介绍了如何使用 Java 和 Spring Data 构建数据访问层的完整过程。通过创建实体类、存储库接口、服务类和控制器类,实现了对数据库的基本操作。这种方法不仅简化了数据访问层的开发,还提高了代码的可维护性和可读性。通过合理使用 Spring Data 提供的功能,可以大幅提升开发效率。
60 21
|
1天前
|
数据采集 JSON Java
Java爬虫获取微店快递费用item_fee API接口数据实现
本文介绍如何使用Java开发爬虫程序,通过微店API接口获取商品快递费用(item_fee)数据。主要内容包括:微店API接口的使用方法、Java爬虫技术背景、需求分析和技术选型。具体实现步骤为:发送HTTP请求获取数据、解析JSON格式的响应并提取快递费用信息,最后将结果存储到本地文件中。文中还提供了完整的代码示例,并提醒开发者注意授权令牌、接口频率限制及数据合法性等问题。
|
20天前
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
1月前
|
存储 分布式计算 Hadoop
基于Java的Hadoop文件处理系统:高效分布式数据解析与存储
本文介绍了如何借鉴Hadoop的设计思想,使用Java实现其核心功能MapReduce,解决海量数据处理问题。通过类比图书馆管理系统,详细解释了Hadoop的两大组件:HDFS(分布式文件系统)和MapReduce(分布式计算模型)。具体实现了单词统计任务,并扩展支持CSV和JSON格式的数据解析。为了提升性能,引入了Combiner减少中间数据传输,以及自定义Partitioner解决数据倾斜问题。最后总结了Hadoop在大数据处理中的重要性,鼓励Java开发者学习Hadoop以拓展技术边界。
50 7
|
28天前
|
SQL Java 数据库连接
【潜意识Java】深入理解MyBatis的Mapper层,以及让数据访问更高效的详细分析
深入理解MyBatis的Mapper层,以及让数据访问更高效的详细分析
60 1
|
1月前
|
存储 Java BI
java怎么统计每个项目下的每个类别的数据
通过本文,我们详细介绍了如何在Java中统计每个项目下的每个类别的数据,包括数据模型设计、数据存储和统计方法。通过定义 `Category`和 `Project`类,并使用 `ProjectManager`类进行管理,可以轻松实现项目和类别的数据统计。希望本文能够帮助您理解和实现类似的统计需求。
101 17
|
3月前
|
JSON Java 程序员
Java|如何用一个统一结构接收成员名称不固定的数据
本文介绍了一种 Java 中如何用一个统一结构接收成员名称不固定的数据的方法。
50 3
|
4月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
183 1
|
4月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
83 1

热门文章

最新文章