❤ 作者主页: 欢迎来到我的技术博客😎
❀ 个人介绍:大家好,本人热衷于 Java后端开发,欢迎来交流学习哦!( ̄▽ ̄)~*
🍊 如果文章对您有帮助,记得 关注、 点赞、 收藏、 评论⭐️⭐️⭐️
📣 您的支持将是我创作的动力,让我们一起加油进步吧!!!🎉🎉
第三章:Kafka,构建TB级异步消息系统
一、阻塞队列
BlockingQueue
- 解决线程通信的问题。
- 阻塞方法:
put
、take
。
生产者消费者模式
- 生产者:产生数据的线程。
- 消费者:使用数据的线程。
实现类
ArrayBlockingQueue
LinkedBlockingQueue
PriorityBlockingQueue、SynchronousQueue、DelayQueue
等。
1. 阻塞队列测试方法
在 test
中添加 BlockingQueueTests
类,来表示阻塞队列的测试方法,代码如下:
public class BlockingQueueTests {
public static void main(String[] args) {
BlockingQueue queue = new ArrayBlockingQueue(10);
new Thread(new Producer(queue)).start();
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
}
}
class Producer implements Runnable{
private BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run(){
try{
for(int i = 0; i < 20; i ++ ) {
Thread.sleep(20);
queue.put(i);
System.out.println(Thread.currentThread().getName()+"生产:"+ queue.size());
}
}catch (Exception e){
e.printStackTrace();
}
}
}
class Consumer implements Runnable{
private BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true){
Thread.sleep(new Random().nextInt(1000));
queue.take();
System.out.println(Thread.currentThread().getName()+"消费:"+ queue.size());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2. 测试结果
Thread-0生产:1
Thread-0生产:2
Thread-0生产:3
Thread-0生产:4
Thread-0生产:5
Thread-0生产:6
Thread-0生产:7
Thread-0生产:8
Thread-0生产:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-2消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-2消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-1消费:9
Thread-1消费:8
Thread-2消费:7
Thread-1消费:6
Thread-2消费:5
Thread-1消费:4
Thread-3消费:3
Thread-3消费:2
Thread-2消费:1
Thread-3消费:0
二、Kafka入门
Kafka简介
- Kafka是一个分布式的流媒体平台。
- 应用:消息系统、日志收集、用户行为追踪、流式处理。
Kafka特点
- 高吞吐量、消息持久化、高可靠性、高扩展性。
Kafka术语
Broker
、Zookeeper
Topic
、Partition
、Offset
Leader Replica
、Follower Replica
1. Kafka下载
Kafka官网: https://kafka.apache.org/
2. Kafka安装与配置
下载Kafka的安装包后进行解压,就相当于安装成功了。
需要进行以下配置:修改 config包下的 zookeeper.properties:
修改 config包下的 server.properties:
3. Kafka的启动
首先在命令行中启动 Zookeeper
:
C:\Users\Andrew> d:
D:\> cd d:\kafka_2.13-3.2.3
d:\kafka_2.13-3.2.3>bin\windows\zookeeper-server-start.bat config\zookeeper.properties
启动成功后不关闭此窗口,重新打开一个新的命令窗口,用于启动 kafka
:
C:\Users\Andrew> d:
D:\> cd d:\kafka_2.13-3.2.3
d:\kafka_2.13-3.2.3>bin\windows\kafka-server-start.bat config\server.properties
注意: 当遇到“‘wmic’不是内部或外部命令,也不是可运行程序”。
在C盘下找到wbem文件夹,且里面包含WMIC.exe,将其添加到系统变量path中去。
比如我的路径是:C:\Windows\System32\wbem
,在系统变量path中新建该路径。就可以正常启动Kafka了。
4. Kafka使用
- 创建主题
cd到 …\kafka_2.13-2.8.0\bin\windows
这里,然后输入 kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
创建服务器端口号为9092(Kafka
默认端口号)的topic
,指生产者发布消息存储的位置在该服务器上localhost:9092
。--replication-factor
1 指1个副本。--partitions
1 指1个分区。--topic test
指该主题名为 test
。
- 以生产者身份发送消息
输入:kafka-console-producer.bat --broker-list localhost:9092 --topic test
生产者身份打开服务器列表中为localhost:9092
的服务器上的test
主题。--broker-list
指服务器列表。
并且输入要发送的消息:
- 以消费者身份读取消息
新打开一个命令行窗口,且cd到…\kafka_2.13-2.8.0\bin\windows
,并输入:kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
可以看到生产者发送的消息。并且这个消息队列中可以实时传送消息。
比如在生产者的命令行中继续输入信息,很快在消费者这边也能得到消息。
三、Spring整合Kafka
引入依赖
- `spring-kafka`
配置Kafka
- 配置
server
、consumer
- 配置
访问Kafka
- 生产者:
kafkaTemplate.send(topic, data);
- 消费者:
@KafkaListener(topics = {"test"}
public void handleMessage(ConsumerRecord record) {}
1. 引入依赖
在 pom.xml
添加相关的依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.6</version>
</dependency>
2. 配置Kafka
#KafkaProperties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=3000
3. 测试
在 test
包下添加 KafkaTests
类,代码如下:
@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class KafkaTests {
@Autowired
private KafkaProducer kafkaProducer;
@Test
public void testKafka() {
kafkaProducer.sendMessage("test", "你好");
kafkaProducer.sendMessage("test", "在吗");
try {
Thread.sleep(1000 * 20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Component
class KafkaProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMessage(String topic, String content) {
kafkaTemplate.send(topic, content);
}
}
@Component
class KafkaConsumer {
@KafkaListener(topics = {"test"})
public void handleMessage(ConsumerRecord record) {
System.out.println(record.value());
}
}
4. 测试结果
创作不易,如果有帮助到你,请给题解==点个赞和收藏==,让更多的人看到!!!
==关注博主==不迷路,内容持续更新中。