Kafka单机版安装
下载
你可以在kafka官网http://kafka.apache.org/downloads
下载到最新的kafka安装包,选择下载二进制版本的tgz文件。
安装
正如上篇文章Kafka快速入门Kafka
是使用Scala和Java编写。
Scala同Java一样都运行在Java虚拟机之上,经过编译之后都生成的是class字节码文件,所以scala同样具有跨平台的特性,可以做到一次编写,到处运行。所以可以说Kafka也可以说是跨平台。所以说win
、linux
、Mac OS
都可以使用。这里我们就讲诉win
下的使用方式。
首先确保你的机器上安装了jdk
,kafka
需要java
运行环境,以前的kafka
还需要zookeeper
,新版的kafka
已经内置了一个zookeeper
环境,所以我们可以直接使用。
说是安装,如果只需要进行最简单的尝试的话我们只需要解压到任意目录即可,这里我们将kafka
压缩包解压到D盘
目录。
这里我们只要主意两个目录就可以了一个这行文件包bin
,和配置文件config
。
首先我们先看config
我们只需要关注这个红框框的配置文件就可以了,如果你只当这个是Kafka服务,生产者和消费者用java来实现,那我们只需关注zookeeper.properties
zookeeper配置文件,和server.properties
kafka配置文件就好了。
zookeeper.properties
这地方,我们就填默认就可以了。
server.properties
这个配置文件里的东西挺多的,我们也是不需要做任何更改填默认就可以了。
启动程序
我们进入bin
目录下,我们可以看到,
它贴心的为我们准备了windows
的命令,和sh
命令两种形式。如果你的win
可以运行sh
命令,我们也可以用sh
命令启动。
现在我以两种形式来启动程序。
window中的bat
我们进入windows
会看到如下,我们只要关心红色框里的bat
就可以了。
启动zookeeper
我们cmd命令进入到kafka目录下然后输入下面命令会车。
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
这样就算启动成功了。
启动kafka
我们cmd命令进入到kafka目录下然后输入下面命令会车。
.\bin\windows\kafka-server-start.bat .\config\server.properties
这样就算启动成功了。
用sh启动
如果你的电脑装了Xshell
,或者git
就可以运行sh
,如果你用的是linux
或Mac OS
就可以直接运行sh
。这里我用的是Git Bash Here
。
启动zookeeper
我们用命令进入到kafka目录下然后输入下面命令会车。
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
这样就算启动成功了。
启动kafka
我们用命令进入到kafka目录下然后输入下面命令会车。
./bin/kafka-server-start.sh ./config/server.properties
这样就算启动成功了。
这地方单机的kafka单机版安装就算已经结束了。
测试链接
概念介绍
Producer: 消息和数据的生产者,向kafka的一个topic发布消息的进程/代码/服务
Consumer: 消息和数据的消费者,订阅数据(Topic)并且处理其发布的消息的进程/代码
Consumer Group: 逻辑概念,对于同一个topic,会广播给不同的group,一个group中,只有一个consumer可以消费该消息
Topic: 逻辑概念,kafka消息的类别,对数据进行区分、隔离
Broker: 物理概念,kafka集群中的每个kafka节点
我们就以Spring boot来进行讲解吧。
第一步首当其冲的是我们要引入spring-kafka jar包,我的项目用的是2.3.0的,如果想用最新的可以去Maven官网去查询。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.0.RELEASE</version>
</dependency>
其次,我们要配置yml,为什么要用Spring boot,因为它非常好的帮我们整合了很多框架,我们只要随便在配置文件里配置就可以了。
spring:
kafka:
# 用于链接kakfa集群的节点,这个链接的是Broker
bootstrap-servers: 192.168.199.198:9092
# 消费着
consumer:
# 用于判别消费者,这个就是上面讲的Consumer Group
group-id: test
# key的反序列化
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value的反序列化
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 生产者
producer:
# key的序列化
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# value的序列化
value-serializer: org.apache.kafka.common.serialization.StringSerializer
然后上代码。
生产者
我们在测试类里面发送数据
@SpringBootTest
class KafkaApplicationTests {
private static Logger logger = LoggerFactory.getLogger(KafkaApplicationTests.class);
@Autowired
private KafkaTemplate kafkaTemplate;
@Test
void contextLoads() {
HashMap<String,String> hashMap = new HashMap<>();
hashMap.put("name","zd");
hashMap.put("age","18");
//这地方如果有topic:kafka_test会往这个topic添加消息,如果没有则创建这个kafka_test然后往里面添加消息
ListenableFuture<SendResult<String, String>> testTopic = kafkaTemplate.send("kafka_test", JSONObject.toJSONString(hashMap));
//可有可无看自己需求
testTopic.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
logger.info("发送失败");
//失败do something
}
@Override
public void onSuccess(SendResult<String, String> integerStringSendResult) {
logger.info("发送成功");
//成功do something
}
});
}
}
运行后正如下图,得到发送成功
既然发送成功了,那我们写个消费者,来接受吧。
消费者
@Component
public class MyKafkaConsumer {
public static Logger logger = LoggerFactory.getLogger(MyKafkaConsumer.class);
@KafkaListener(topics = {"kafka_test"})
public void preCommandTicket1(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
logger.info("----------------- record =" + record);
logger.info("------------------ message =" + message);
}
}
}
因为发送者是往topics为kafka_test里发送的,那我们消费者接受的topics理应为kafka_test。
我们运行程序,来查看一下。
如图我们没有收到任何数据,为什么呢,我们明明发送了数据,怎么消费不了呢?
我们往下看。这个原因是因为auto.offset.reset
,它有三个值分别是
earliest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
none
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
当我们没有选的时候默认latest,所以它只消费新生产的。这时候我们再去调用生产者,去生产一个消息。
看到了吧,消费者收到了消息。
那kafka简单入门就算结束了。当然kafka有很多东西,正如上图所介绍的,这篇文章只是入门级的。后续会慢慢讲解图上的各个功能。喜欢的同学麻烦点个在看。代码已经同步到码云上了https://gitee.com/ZYSSS/kafka
。