Kafka单机版安装

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: Kafka单机版安装

Kafka单机版安装

下载

你可以在kafka官网http://kafka.apache.org/downloads下载到最新的kafka安装包,选择下载二进制版本的tgz文件。

安装

正如上篇文章Kafka快速入门Kafka是使用Scala和Java编写。

Scala同Java一样都运行在Java虚拟机之上,经过编译之后都生成的是class字节码文件,所以scala同样具有跨平台的特性,可以做到一次编写,到处运行。所以可以说Kafka也可以说是跨平台。所以说winlinuxMac OS都可以使用。这里我们就讲诉win下的使用方式。

首先确保你的机器上安装了jdkkafka需要java运行环境,以前的kafka还需要zookeeper,新版的kafka已经内置了一个zookeeper环境,所以我们可以直接使用。

说是安装,如果只需要进行最简单的尝试的话我们只需要解压到任意目录即可,这里我们将kafka压缩包解压到D盘目录。

图片

这里我们只要主意两个目录就可以了一个这行文件包bin,和配置文件config
首先我们先看config

图片

我们只需要关注这个红框框的配置文件就可以了,如果你只当这个是Kafka服务,生产者和消费者用java来实现,那我们只需关注zookeeper.propertieszookeeper配置文件,和server.propertieskafka配置文件就好了。

zookeeper.properties

图片

这地方,我们就填默认就可以了。

server.properties

这个配置文件里的东西挺多的,我们也是不需要做任何更改填默认就可以了。

图片

启动程序

我们进入bin目录下,我们可以看到,

图片
它贴心的为我们准备了windows的命令,和sh命令两种形式。如果你的win可以运行sh命令,我们也可以用sh命令启动。

现在我以两种形式来启动程序。

window中的bat

我们进入windows会看到如下,我们只要关心红色框里的bat就可以了。
图片

启动zookeeper

我们cmd命令进入到kafka目录下然后输入下面命令会车。

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

图片

这样就算启动成功了。

启动kafka

我们cmd命令进入到kafka目录下然后输入下面命令会车。

.\bin\windows\kafka-server-start.bat .\config\server.properties

图片

这样就算启动成功了。

用sh启动

如果你的电脑装了Xshell,或者git就可以运行sh,如果你用的是linuxMac OS就可以直接运行sh。这里我用的是Git Bash Here

启动zookeeper

我们用命令进入到kafka目录下然后输入下面命令会车。

./bin/zookeeper-server-start.sh ./config/zookeeper.properties

图片

这样就算启动成功了。

启动kafka

我们用命令进入到kafka目录下然后输入下面命令会车。

./bin/kafka-server-start.sh ./config/server.properties

图片

这样就算启动成功了。

这地方单机的kafka单机版安装就算已经结束了。

测试链接

概念介绍

Producer: 消息和数据的生产者,向kafka的一个topic发布消息的进程/代码/服务

Consumer: 消息和数据的消费者,订阅数据(Topic)并且处理其发布的消息的进程/代码

Consumer Group: 逻辑概念,对于同一个topic,会广播给不同的group,一个group中,只有一个consumer可以消费该消息

Topic: 逻辑概念,kafka消息的类别,对数据进行区分、隔离

Broker: 物理概念,kafka集群中的每个kafka节点

我们就以Spring boot来进行讲解吧。

第一步首当其冲的是我们要引入spring-kafka jar包,我的项目用的是2.3.0的,如果想用最新的可以去Maven官网去查询。

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.3.0.RELEASE</version>
</dependency>

其次,我们要配置yml,为什么要用Spring boot,因为它非常好的帮我们整合了很多框架,我们只要随便在配置文件里配置就可以了。

spring:
  kafka:
    # 用于链接kakfa集群的节点,这个链接的是Broker
    bootstrap-servers: 192.168.199.198:9092
    # 消费着
    consumer:
      # 用于判别消费者,这个就是上面讲的Consumer Group
      group-id: test
      # key的反序列化
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # value的反序列化
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    # 生产者
    producer:
      # key的序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # value的序列化
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

然后上代码。

生产者

我们在测试类里面发送数据

@SpringBootTest
class KafkaApplicationTests {

    private static Logger logger = LoggerFactory.getLogger(KafkaApplicationTests.class);

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Test
    void contextLoads() {

        HashMap<String,String> hashMap = new HashMap<>();
        hashMap.put("name","zd");
        hashMap.put("age","18");
        //这地方如果有topic:kafka_test会往这个topic添加消息,如果没有则创建这个kafka_test然后往里面添加消息
        ListenableFuture<SendResult<String, String>> testTopic = kafkaTemplate.send("kafka_test", JSONObject.toJSONString(hashMap));

        //可有可无看自己需求
        testTopic.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                logger.info("发送失败");
                //失败do something
            }

            @Override
            public void onSuccess(SendResult<String, String> integerStringSendResult) {
                logger.info("发送成功");
                //成功do something
            }
        });
    }
}

运行后正如下图,得到发送成功
图片
既然发送成功了,那我们写个消费者,来接受吧。

消费者

@Component
public class MyKafkaConsumer {

    public static Logger logger = LoggerFactory.getLogger(MyKafkaConsumer.class);

    @KafkaListener(topics = {"kafka_test"})
    public void preCommandTicket1(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            logger.info("----------------- record =" + record);
            logger.info("------------------ message =" + message);
        }
    }
}

因为发送者是往topics为kafka_test里发送的,那我们消费者接受的topics理应为kafka_test。
我们运行程序,来查看一下。

图片
如图我们没有收到任何数据,为什么呢,我们明明发送了数据,怎么消费不了呢?

我们往下看。这个原因是因为auto.offset.reset,它有三个值分别是
earliest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费

latest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

none
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

当我们没有选的时候默认latest,所以它只消费新生产的。这时候我们再去调用生产者,去生产一个消息。

图片

看到了吧,消费者收到了消息。

那kafka简单入门就算结束了。当然kafka有很多东西,正如上图所介绍的,这篇文章只是入门级的。后续会慢慢讲解图上的各个功能。喜欢的同学麻烦点个在看。代码已经同步到码云上了https://gitee.com/ZYSSS/kafka

相关文章
|
6月前
|
消息中间件 监控 数据可视化
Linux安装Kafka图形化界面
Linux安装Kafka图形化界面
270 4
|
4月前
|
消息中间件 Ubuntu Java
在Ubuntu 18.04上安装Apache Kafka的方法
在Ubuntu 18.04上安装Apache Kafka的方法
210 0
|
1月前
|
消息中间件 Ubuntu Java
Ubuntu系统上安装Apache Kafka
Ubuntu系统上安装Apache Kafka
|
3月前
|
消息中间件 Java Linux
linux 之centos7安装kafka;;;;;待补充,未完成
linux 之centos7安装kafka;;;;;待补充,未完成
|
5月前
|
消息中间件 Java Kafka
kafka Linux环境搭建安装及命令创建队列生产消费消息
kafka Linux环境搭建安装及命令创建队列生产消费消息
115 4
|
4月前
|
消息中间件 存储 Ubuntu
在Ubuntu 14.04上安装Apache Kafka的方法
在Ubuntu 14.04上安装Apache Kafka的方法
26 0
|
4月前
|
消息中间件 Java Kafka
Docker 安装 kafka
Docker 安装 kafka
100 0
|
4月前
|
消息中间件 Kafka Apache
部署安装kafka集群
部署安装kafka集群
|
6月前
|
消息中间件 存储 Ubuntu
Linux安装kafka3.5.1
Linux安装kafka3.5.1
144 2
|
6月前
|
消息中间件 存储 Kafka
Kafka 2.13-3.7.0 在 Windows 上的安装与配置指南
Kafka 2.13-3.7.0 在 Windows 上的安装与配置指南
451 0

相关实验场景

更多
下一篇
DataWorks