Kafka快速入门

简介: Kafka快速入门

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。

Scala同Java一样都运行在Java虚拟机之上,经过编译之后都生成的是class字节码文件,所以scala同样具有跨平台的特性,可以做到一次编写,到处运行。所以可以说Kafka也可以说是跨平台

kafka是面相于数据流的生产转换存储消费整体的一个流处理平台,所以kafka不仅仅是消息队列。

kafka知识图

图片名称

小编,我,我。。。想下车。
别别别,这篇文章我们只讲快速入门,毕竟我们先学会用,再往下更深层去学。下面所讲的是只针对如何快速对接kafka消息队列。

即使是快速入门,我们也要了解ProducerConsumerConsumer GroupTopic,因为这些是我们必须要的。

概念介绍

Producer: 消息和数据的生产者,向kafka的一个topic发布消息的进程/代码/服务

Consumer: 消息和数据的消费者,订阅数据(Topic)并且处理其发布的消息的进程/代码

Consumer Group: 逻辑概念,对于同一个topic,会广播给不同的group,一个group中,只有一个consumer可以消费该消息

Topic: 逻辑概念,kafka消息的类别,对数据进行区分、隔离

Broker: 物理概念,kafka集群中的每个kafka节点

知道这些东西后我们就可以码代码了,额。。。等下,我们好像还没安装Kafka呢?这篇文章不讲安装Kafka,要了解如何安装Kafka请看第下面的文章。

现在流行Spring cloud,那我们就以Spring boot来进行讲解吧。

第一步首当其冲的是我们要引入spring-kafka jar包,我的项目用的是2.3.0的,如果想用最新的可以去Maven官网去查询。

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.3.0.RELEASE</version>
</dependency>

其次,我们要配置yml,为什么要用Spring boot,因为它非常好的帮我们整合了很多框架,我们只要随便在配置文件里配置就可以了。

spring:
  kafka:
    # 用于链接kakfa集群的节点,这个链接的是Broker
    bootstrap-servers: 192.168.199.198:9092
    # 消费着
    consumer:
      # 用于判别消费者,这个就是上面讲的Consumer Group
      group-id: test
      # key的反序列化
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # value的反序列化
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    # 生产者
    producer:
      # key的序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # value的序列化
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

然后上代码。

生产者

我们在测试类里面发送数据

@SpringBootTest
class KafkaApplicationTests {

    private static Logger logger = LoggerFactory.getLogger(KafkaApplicationTests.class);

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Test
    void contextLoads() {

        HashMap<String,String> hashMap = new HashMap<>();
        hashMap.put("name","zd");
        hashMap.put("age","18");
        //这地方如果有topic:kafka_test会往这个topic添加消息,如果没有则创建这个kafka_test然后往里面添加消息
        ListenableFuture<SendResult<String, String>> testTopic = kafkaTemplate.send("kafka_test", JSONObject.toJSONString(hashMap));

        //可有可无看自己需求
        testTopic.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                logger.info("发送失败");
                //失败do something
            }

            @Override
            public void onSuccess(SendResult<String, String> integerStringSendResult) {
                logger.info("发送成功");
                //成功do something
            }
        });
    }
}

运行后正如下图,得到发送成功

既然发送成功了,那我们写个消费者,来接受吧。

消费者

@Component
public class MyKafkaConsumer {

    public static Logger logger = LoggerFactory.getLogger(MyKafkaConsumer.class);

    @KafkaListener(topics = {"kafka_test"})
    public void preCommandTicket1(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            logger.info("----------------- record =" + record);
            logger.info("------------------ message =" + message);
        }
    }
}

因为发送者是往topics为kafka_test里发送的,那我们消费者接受的topics理应为kafka_test。
我们运行程序,来查看一下。


如图我们没有收到任何数据,为什么呢,我们明明发送了数据,怎么消费不了呢?

别别别,我们往下看。这个原因是因为auto.offset.reset,它有三个值分别是
earliest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费

latest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

none
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

当我们没有选的时候默认latest,所以它只消费新生产的。这时候我们再去调用生产者,去生产一个消息。

看到了吧,消费者收到了消息。

那kafka简单入门就算结束了。当然kafka有很多东西,正如上图所介绍的,这篇文章只是入门级的。后续会慢慢讲解图上的各个功能。喜欢的同学麻烦点个在看。代码已经同步到码云上了https://gitee.com/ZYSSS/kafka

相关文章
|
7月前
|
消息中间件 存储 安全
kafka快速入门1
kafka快速入门1
92 0
|
7月前
|
消息中间件 JSON Java
kafka快速入门2
kafka快速入门2
65 0
|
8月前
|
消息中间件 缓存 大数据
Kafka学习---1、Kafka 概述、Kafka快速入门
Kafka学习---1、Kafka 概述、Kafka快速入门
Kafka学习---1、Kafka 概述、Kafka快速入门
|
9月前
|
消息中间件 存储 传感器
macOS 系统 安装 Kafka 快速入门
macOS 系统 安装 Kafka 快速入门
150 0
|
消息中间件 存储 缓存
Kafka快速入门(Kafka Broker)节点服役和退役、手动调整副本(下)
Kafka快速入门(Kafka Broker)节点服役和退役、手动调整副本
Kafka快速入门(Kafka Broker)节点服役和退役、手动调整副本(下)
|
消息中间件 存储 缓存
Kafka快速入门(Kafka Broker)节点服役和退役、手动调整副本(上)
Kafka快速入门(Kafka Broker)节点服役和退役、手动调整副本
Kafka快速入门(Kafka Broker)节点服役和退役、手动调整副本(上)
|
消息中间件 存储 缓存
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
|
消息中间件 Kafka
Kafka快速入门(命令行操作)
Kafka快速入门(命令行操作)
Kafka快速入门(命令行操作)
|
消息中间件 Kafka
Kafka快速入门(安装集群)
Kafka快速入门(安装集群)
|
消息中间件 缓存 Kafka
Kafka快速入门(介绍)
Kafka快速入门(介绍)
Kafka快速入门(介绍)

热门文章

最新文章