前面说了kafka的topic有分区的概念,每个分区又有leader 和 follower,kafka听过ack机制保证消息的可靠性。
1、下载安装zookeeper
下载地址:http://zookeeper.apache.org/releases.html#download
1、进入解压地址,进入D:\apache-zookeeper-3.8.0\conf
2、 将“zoo_sample.cfg”重命名为“zoo.cfg”
3、 打开“zoo.cfg”找到并编辑dataDir=D:\apache-zookeeper-3.8.0
4、添加系统变量:ZOOKEEPER_HOME=D:\Kafka\zookeeper-3.8.0
5、 编辑path系统变量,添加路径:%ZOOKEEPER_HOME%\bin
6、 在zoo.cfg文件中修改默认的Zookeeper端口(默认端口2181)
7、 打开新的cmd,输入“zkServer“,运行Zookeeper
当出现下图,代表启动成功:
2、下载安装kafka
下载地址:https://kafka.apache.org/downloads
1、解压进入:D:\kafka_2.13-3.1.0\config
2、找到server.properties并打开,log.dirs=D:\kafka_2.13-3.1.0\kafka-logs
3、并编辑zookeeper.connect=localhost:2181
4、Kafka会按照默认,在9092端口上运行,并连接zookeeper的默认端口:2181
5、输入下面命令启动kafka,出现下图代表启动成功。
.\bin\windows\kafka-server-start.bat .\config\server.properties
3、集成springBoot
1、先pom文件加入kafka二方包
<!--引入kafak和spring整合的jar--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
#------------------------------------Kafka-------------------# # 指定kafka 代理地址,可以多个 spring.kafka.bootstrap-servers=localhost:9092 ##########################producer about config############################## spring.kafka.producer.acks=1 spring.kafka.producer.batch-size=16384 spring.kafka.producer.retries=0 spring.kafka.producer.buffer-memory=33554432 #spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer.class #spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer.class ##########################consumer about config############################## spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.group-id=kafka_group_2 spring.kafka.consumer.auto-commit-interval=100 #spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer.class #spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer.class
/** * @author keying */ @RestController public class TestKafkaController { @Resource private KafkaTemplate<String, String> kafkaTemplate; /** * 同步发送 * * @return */ @RequestMapping("/sendKafka") public String syncSendMessage() { for (int i = 0; i < 10; i++) { try { kafkaTemplate.send("kafka-boot", "0", "kafkaMessage" + i).get(); }catch (Exception e){ e.printStackTrace(); } } return "success"; } } /** * @author keying */ @Component public class ConsumerKafka { @KafkaListener(id = "foo", topics = "kafka-boot") public void listen1(String foo) { System.out.println("消费" + foo); } }