使用kafka-clients操作数据(java)

简介: 使用kafka-clients操作数据(java)

一、添加依赖

     <!--    kafka-clients-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.5.1</version>
        </dependency>

二、生产者

自定义分区,可忽略

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
 
import java.util.Map;
 
public class MyPatitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
 
        String msgStr = value.toString();
        if(msgStr.contains("a")){
            return 1;
        }
        return 0;
    }
 
    @Override
    public void close() {
 
    }
 
    @Override
    public void configure(Map<String, ?> configs) {
 
    }
}

1、普通消息

 public static void main(String[] args) throws ExecutionException, InterruptedException {
        //配置
        Properties properties = new Properties();
        //连接参数
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.157.130:9092");
        //序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //关联自定义分区器 可选
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.minos.kafka.producer.MyPatitioner");
 
        //优化参数 可选
        //缓冲器大小 32M
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 30 * 1024 * 1024);
        //批次大小
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);
        //Linger.ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        //压缩
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        //acks
        properties.put(ProducerConfig.ACKS_CONFIG, "-1");
        //重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);
 
 
        //创建生产者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
 
        //异步发送数据
        for (int i = 0; i < 10; i++) {
            //给first主题发消息
            kafkaProducer.send(new ProducerRecord<String, String>("first", "hello" + i));
            //回调异步发送
            kafkaProducer.send(new ProducerRecord<String, String>("first", "hello2" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println("主题:" + recordMetadata.topic() + "分区:" + recordMetadata.partition());
                    }
                }
            });
            kafkaProducer.send(new ProducerRecord<String, String>("first", "a" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println("主题:" + recordMetadata.topic() + "分区" + recordMetadata.partition() + "a");
                    }
                }
            });
            Thread.sleep(500);
        }
 
        //同步
        for (int i = 0; i < 10; i++) {
            //给first主题发消息
            kafkaProducer.send(new ProducerRecord<String, String>("first", "sync_hello" + i)).get();
        }
 
        //关闭资源
        kafkaProducer.close();
    }
root@ubuntu2203:/usr/local/kafka_2.12-3.5.1/bin# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
a0
hello0
hello20
a1
hello1
hello21
a2
hello2
hello22
a3
hello3
hello23
a4
hello4
hello24
a5
hello5
hello25
a6
hello6
hello26
a7
hello7
hello27
a8
hello8
hello28
a9
hello9
hello29
sync_hello0
sync_hello1
sync_hello2
sync_hello3
sync_hello4
sync_hello5
sync_hello6
sync_hello7
sync_hello8
sync_hello9

2、事务消息

 public static void main(String[] args) throws ExecutionException, InterruptedException {
        //配置
        Properties properties = new Properties();
        //连接参数
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.157.130:9092");
        //序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //关联自定义分区器 可选
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.minos.kafka.producer.MyPatitioner");
 
        //优化参数 可选
        //缓冲器大小 32M
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 30 * 1024 * 1024);
        //批次大小
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);
        //Linger.ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        //压缩
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        //acks
        properties.put(ProducerConfig.ACKS_CONFIG, "-1");
        //重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);
 
        //指定事务ID
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactional_id_01");
        properties.put("enable.idempotence", "true");
 
        //创建生产者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
 
        //事务消息 初始化
        kafkaProducer.initTransactions();
        //开始事务
        kafkaProducer.beginTransaction();
        try {
            kafkaProducer.send(new ProducerRecord<String, String>("first", "Transactions")).get();
            //提交事务
            kafkaProducer.commitTransaction();
        } catch (Exception e) {
            //终止事务
            kafkaProducer.abortTransaction();
        } finally {
            //关闭资源
            kafkaProducer.close();
        }
    }
root@ubuntu2203:/usr/local/kafka_2.12-3.5.1/bin# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
Transactions
目录
相关文章
|
3天前
|
算法 搜索推荐 Java
Java插入排序:优雅整理数据的艺术
Java插入排序:优雅整理数据的艺术
|
2天前
|
数据采集 监控 前端开发
JAVA公立医院绩效考核管理系统源码-对接HIS数据
在医院的工作和管理上,院领导需要对院内工作人员的工作情况进行了解、评价和监控。 下面将对医院绩效管理系统的HIS数据流程加以阐述。
10 1
JAVA公立医院绩效考核管理系统源码-对接HIS数据
|
1天前
|
存储 缓存 安全
Java List操作详解及常用方法
Java List操作详解及常用方法
|
2天前
|
存储 Java
java用modbus4j的RTU去操作那些寄存器(线圈,保持,输入,离散输入寄存器)
java用modbus4j的RTU去操作那些寄存器(线圈,保持,输入,离散输入寄存器)
9 0
|
3天前
|
XML Java 定位技术
详尽分享经纬坐标(BLH)数据创建.kml文件小工具设计Java版
详尽分享经纬坐标(BLH)数据创建.kml文件小工具设计Java版
|
4天前
|
Java 机器人 程序员
Java中的文件I/O操作:流、读写和NIO详解
Java中的文件I/O操作:流、读写和NIO详解
|
4天前
|
JSON JavaScript Java
如何在Java中处理JSON数据?
如何在Java中处理JSON数据?
|
14天前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之通过flink同步kafka数据进到doris,decimal数值类型的在kafka是正常显示数值,但是同步到doris表之后数据就变成了整数,该如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
14天前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之 从Kafka读取数据,并与两个仅在任务启动时读取一次的维度表进行内连接(inner join)时,如果没有匹配到的数据会被直接丢弃还是会被存储在内存中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
12天前
|
消息中间件 Java 关系型数据库
实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
629 0