通俗易懂 !Kafka 开发快速入门看这篇就够了3

本文涉及的产品
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 通俗易懂 !Kafka 开发快速入门看这篇就够了3

五、Kafka 安装


5.1 安装环境与前提条件


安装环境:Linux


前提条件:


Linux系统下安装好jdk 1.8以上版本,正确配置环境变量

Linux系统下安装好scala 2.11版本


安装ZooKeeper(注:kafka自带一个Zookeeper服务,如果不单独安装,也可以使用自带的ZK)


5.2 安装步骤


Apache基金会开源的这些软件基本上安装都比较方便,只需要下载、解压、配置环境变量三步即可完成,kafka也一样,官网选择对应版本下载后直接解压到一个安装目录下就可以使用了,如果为了方便可以在~/.bashrc里配置一下环境变量,这样使用的时候就不需要每次都切换到安装目录了。


具体可参考:Kafka 集群安装与环境测试


5.3 测试


接下来可以通过简单的console窗口来测试kafka是否安装正确。


(1)首先启动ZooKeeper服务


如果启动自己安装的ZooKeeper,使用命令zkServer.sh start即可。


如果使用kafka自带的ZK服务,启动命令如下(启动之后shell不会返回,后续其他命令需要另开一个Terminal):

$ cd /opt/tools/kafka #进入安装目录
$ bin/zookeeper-server-start.sh config/zookeeper.properties


(2)第二步启动kafka服务


启动Kafka服务的命令如下所示:

$ cd /opt/tools/kafka #进入安装目录
$ bin/kafka-server-start.sh config/server.properties


(3)第三步创建一个topic,假设为“test”


创建topic的命令如下所示,其参数也都比较好理解,依次指定了依赖的ZooKeeper,副本数量,分区数量,topic的名字:

$ cd /opt/tools/kafka #进入安装目录
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1


创建完成后,可以通过如下所示的命令查看topic列表:

$ bin/kafka-topics.sh --list --zookeeper localhost:2181


(4)开启Producer和Consumer服务


kafka提供了生产者和消费者对应的console窗口程序,可以先通过这两个console程序来进行验证。


首先启动Producer:

$ cd /opt/tools/kafka #进入安装目录
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test


然后启动Consumer:

$ cd /opt/tools/kafka #进入安装目录
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning


在打开生产者服务的终端输入一些数据,回车后,在打开消费者服务的终端能看到生产者终端输入的数据,即说明kafka安装成功。


六、Apache Kafka 简单示例


6.1 创建消息队列



kafka-topics.sh --create --zookeeper 192.168.56.137:2181 --topic test --replication-factor 1 --partitions 1


6.2 pom.xml


<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.1.1</version>
</dependency>


6.3 生产者


package com.njbdqn.services;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
 * @author:Tokgo J
 * @date:2020/9/11
 * @aim:生产者:往test消息队列写入消息
 */
public class MyProducer {
    public static void main(String[] args) {
        // 定义配置信息
        Properties prop = new Properties();
        // kafka地址,多个地址用逗号分割  "192.168.23.76:9092,192.168.23.77:9092"
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.56.137:9092");
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        KafkaProducer<String,String> prod = new KafkaProducer<String, String>(prop);
        // 发送消息
        try {
            for(int i=0;i<10;i++) {
                // 生产者记录消息
                ProducerRecord<String, String> pr = new ProducerRecord<String, String>("test", "hello world"+i);
                prod.send(pr);
                Thread.sleep(500);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            prod.close();
        }
    }
}


注意:


1.kafka如果是集群,多个地址用逗号分割(,) ;


2.Properties的put方法,第一个参数可以是字符串,

如:p.put("bootstrap.servers","192.168.23.76:9092") ;


3.kafkaProducer.send(record)可以通过返回的Future来判断是否已经发送到kafka,增强消息的可靠性。同时也可以使用send的第二个参数来回调,通过回调判断是否发送成功;


4.p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 设置序列化类,可以写类的全路径。


6.4 消费者


package com.njbdqn.services;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
/**
 * @author:Tokgo J
 * @date:2020/9/11
 * @aim:消费者:读取kafka数据
 */
public class MyConsumer {
    public static void main(String[] args) {
        Properties prop = new Properties();
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.137:9092");
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        prop.put("session.timeout.ms", "30000");
        //消费者是否自动提交偏移量,默认是true 避免出现重复数据 设为false
        prop.put("enable.auto.commit", "false");
        prop.put("auto.commit.interval.ms", "1000");
        //auto.offset.reset 消费者在读取一个没有偏移量的分区或者偏移量无效的情况下的处理
        //earliest 在偏移量无效的情况下 消费者将从起始位置读取分区的记录
        //latest 在偏移量无效的情况下 消费者将从最新位置读取分区的记录
        prop.put("auto.offset.reset", "earliest");
        // 设置组名
        prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
        KafkaConsumer<String, String> con = new KafkaConsumer<String, String>(prop);
        con.subscribe(Collections.singletonList("test"));
        while (true) {
            ConsumerRecords<String, String> records = con.poll(Duration.ofSeconds(100));
            for (ConsumerRecord<String, String> rec : records) {
                System.out.println(String.format("offset:%d,key:%s,value:%s", rec.offset(), rec.key(), rec.value()));
            }
        }
    }
}


注意:


1.订阅消息可以订阅多个主题;


2.ConsumerConfig.GROUP_ID_CONFIG 表示消费者的分组,kafka根据分组名称判断是不是同一组消费者,同一组消费者去消费一个主题的数据的时候,数据将在这一组消费者上面轮询;


3.主题涉及到分区的概念,同一组消费者的个数不能大于分区数。因为:一个分区只能被同一群组的一个消费者消费。出现分区小于消费者个数的时候,可以动态增加分区;


4.注意和生产者的对比,Properties中的key和value是反序列化,而生产者是序列化。


七、参考


朱小厮:《深入理解Kafka:核心设计与实践原理》


宇宙湾:《Apache Kafka 分布式消息队列框架》

目录
相关文章
|
6月前
|
消息中间件 缓存 Java
✈️【Kafka技术专题】「开发实战篇」深入实战探索Kafka的生产者的开发实现及实战指南
✈️【Kafka技术专题】「开发实战篇」深入实战探索Kafka的生产者的开发实现及实战指南
76 0
|
6月前
|
消息中间件 存储 大数据
大数据开发岗面试复习30天冲刺 - 日积月累,每日五题【Day06】——Kafka4
大数据开发岗面试复习30天冲刺 - 日积月累,每日五题【Day06】——Kafka4
65 0
|
6月前
|
消息中间件 存储 数据采集
大数据开发岗大厂面试30天冲刺 - 日积月累,每日五题【Day03】——Kafka1
大数据开发岗大厂面试30天冲刺 - 日积月累,每日五题【Day03】——Kafka1
59 0
|
消息中间件 存储 安全
kafka快速入门1
kafka快速入门1
122 0
|
消息中间件 NoSQL 关系型数据库
6年高级开发就因这道题少了5K,Kafka如何避免消息重复消费?
一个6年工作经验的小伙伴,被问到这样一个问题,说Kafka是如何避免消息重复消费的?面试完之后,这位小伙伴来找到我,希望我能给一个思路。今天,我给大家分享一下我的思路。
161 1
|
消息中间件 缓存 NoSQL
手把手教你云相册项目简易开发 day1 Kafka+IDEA+Springboot+Redis+MySQL+libvips 简单运行和使用
手把手教你云相册项目简易开发 day1 Kafka+IDEA+Springboot+Redis+MySQL+libvips 简单运行和使用
220 0
|
5月前
|
消息中间件 Java Kafka
SpringBoot实用开发篇第六章(整合第三方技术,ActiveMQ,RabbitMQ,RocketMQ,Kafka)
SpringBoot实用开发篇第六章(整合第三方技术,ActiveMQ,RabbitMQ,RocketMQ,Kafka)
|
4月前
|
消息中间件 存储 Java
快速入门 Kafka 和 Java 搭配使用
快速入门 Kafka 和 Java 搭配使用
185 0
|
5月前
|
消息中间件 存储 Java
后端开发Spring框架之消息介绍 同步异步 JMS AMQP MQTT Kafka介绍
后端开发Spring框架之消息介绍 同步异步 JMS AMQP MQTT Kafka介绍
41 0
|
6月前
|
消息中间件 安全 大数据
大数据开发岗常见面试复习30天冲刺 - 日积月累,每日五题【Day05】——Kafka3
大数据开发岗常见面试复习30天冲刺 - 日积月累,每日五题【Day05】——Kafka3
46 0
下一篇
无影云桌面