仿牛客社区项目(第五章)(上)

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,182元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
MSE Nacos/ZooKeeper 企业版试用,1600元额度,限量50份
简介: 仿牛客社区项目(第五章)(上)
❤ 作者主页: 欢迎来到我的技术博客😎
❀ 个人介绍:大家好,本人热衷于 Java后端开发,欢迎来交流学习哦!( ̄▽ ̄)~*
🍊 如果文章对您有帮助,记得 关注点赞收藏评论⭐️⭐️⭐️
📣 您的支持将是我创作的动力,让我们一起加油进步吧!!!🎉🎉

第三章:Kafka,构建TB级异步消息系统

一、阻塞队列

  • BlockingQueue

    • 解决线程通信的问题。
    • 阻塞方法:puttake
  • 生产者消费者模式

    • 生产者:产生数据的线程。
    • 消费者:使用数据的线程。
  • 实现类

    • ArrayBlockingQueue
    • LinkedBlockingQueue
    • PriorityBlockingQueue、SynchronousQueue、DelayQueue等。

1. 阻塞队列测试方法

test 中添加 BlockingQueueTests类,来表示阻塞队列的测试方法,代码如下:

public class BlockingQueueTests {
    public static void main(String[] args) {
        BlockingQueue queue = new ArrayBlockingQueue(10);
        new Thread(new Producer(queue)).start();
        new Thread(new Consumer(queue)).start();
        new Thread(new Consumer(queue)).start();
        new Thread(new Consumer(queue)).start();
    }
}

class Producer implements Runnable{
    private BlockingQueue<Integer> queue;

    public Producer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }
    @Override
    public void run(){
        try{
            for(int i = 0; i < 20; i ++ ) {
                Thread.sleep(20);
                queue.put(i);
                System.out.println(Thread.currentThread().getName()+"生产:"+ queue.size());
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

class Consumer implements Runnable{
    private BlockingQueue<Integer> queue;

    public Consumer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true){
                Thread.sleep(new Random().nextInt(1000));
                queue.take();
                System.out.println(Thread.currentThread().getName()+"消费:"+ queue.size());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

2. 测试结果

Thread-0生产:1
Thread-0生产:2
Thread-0生产:3
Thread-0生产:4
Thread-0生产:5
Thread-0生产:6
Thread-0生产:7
Thread-0生产:8
Thread-0生产:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-2消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-2消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-1消费:9
Thread-1消费:8
Thread-2消费:7
Thread-1消费:6
Thread-2消费:5
Thread-1消费:4
Thread-3消费:3
Thread-3消费:2
Thread-2消费:1
Thread-3消费:0

二、Kafka入门

  • Kafka简介

    • Kafka是一个分布式的流媒体平台。
    • 应用:消息系统、日志收集、用户行为追踪、流式处理。
  • Kafka特点

    • 高吞吐量、消息持久化、高可靠性、高扩展性。
  • Kafka术语

    • BrokerZookeeper
    • TopicPartitionOffset
    • Leader ReplicaFollower Replica

1. Kafka下载

Kafka官网: https://kafka.apache.org/
在这里插入图片描述

2. Kafka安装与配置

下载Kafka的安装包后进行解压,就相当于安装成功了。
需要进行以下配置:
修改 config包下的 zookeeper.properties:
在这里插入图片描述
修改 config包下的 server.properties:
在这里插入图片描述
 

3. Kafka的启动

首先在命令行中启动 Zookeeper:

C:\Users\Andrew> d:
D:\> cd d:\kafka_2.13-3.2.3
d:\kafka_2.13-3.2.3>bin\windows\zookeeper-server-start.bat config\zookeeper.properties

启动成功后不关闭此窗口,重新打开一个新的命令窗口,用于启动 kafka

C:\Users\Andrew> d:
D:\> cd d:\kafka_2.13-3.2.3
d:\kafka_2.13-3.2.3>bin\windows\kafka-server-start.bat config\server.properties

注意: 当遇到“‘wmic’不是内部或外部命令,也不是可运行程序”。
在C盘下找到wbem文件夹,且里面包含WMIC.exe,将其添加到系统变量path中去。
比如我的路径是:C:\Windows\System32\wbem,在系统变量path中新建该路径。就可以正常启动Kafka了。

4. Kafka使用

  • 创建主题

cd到 …\kafka_2.13-2.8.0\bin\windows这里,然后输入 kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
创建服务器端口号为9092(Kafka默认端口号)的topic,指生产者发布消息存储的位置在该服务器上localhost:9092--replication-factor 1 指1个副本。--partitions 1 指1个分区。--topic test 指该主题名为 test
在这里插入图片描述

  • 以生产者身份发送消息

输入:kafka-console-producer.bat --broker-list localhost:9092 --topic test
生产者身份打开服务器列表中为localhost:9092的服务器上的test主题。--broker-list 指服务器列表。
并且输入要发送的消息:
在这里插入图片描述

  • 以消费者身份读取消息

新打开一个命令行窗口,且cd到…\kafka_2.13-2.8.0\bin\windows,并输入:kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
可以看到生产者发送的消息。并且这个消息队列中可以实时传送消息。
比如在生产者的命令行中继续输入信息,很快在消费者这边也能得到消息。
在这里插入图片描述
 

三、Spring整合Kafka

  • 引入依赖

    - `spring-kafka`
    
    • 配置Kafka

      • 配置 serverconsumer
    • 访问Kafka

      • 生产者:

      kafkaTemplate.send(topic, data);

      • 消费者:

      @KafkaListener(topics = {"test"}
      public void handleMessage(ConsumerRecord record) {}

1. 引入依赖

pom.xml 添加相关的依赖:

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.8.6</version>
        </dependency>    

2. 配置Kafka

#KafkaProperties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=3000

3. 测试

test 包下添加 KafkaTests 类,代码如下:

@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class KafkaTests {

    @Autowired
    private KafkaProducer kafkaProducer;

    @Test
    public void testKafka() {
        kafkaProducer.sendMessage("test", "你好");
        kafkaProducer.sendMessage("test", "在吗");

        try {
            Thread.sleep(1000 * 20);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

@Component
class KafkaProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void sendMessage(String topic, String content) {
        kafkaTemplate.send(topic, content);
    }

}

@Component
class KafkaConsumer {

    @KafkaListener(topics = {"test"})
    public void handleMessage(ConsumerRecord record) {
        System.out.println(record.value());
    }
}

 

4. 测试结果

在这里插入图片描述
 
 
创作不易,如果有帮助到你,请给题解==点个赞和收藏==,让更多的人看到!!!
==关注博主==不迷路,内容持续更新中。

目录
相关文章
|
监控 安全 测试技术
从开发到测试再到发布,全方位解析项目上线的完美路程!
从开发到测试再到发布,全方位解析项目上线的完美路程!
505 0
|
存储 前端开发 安全
webhook是什么 与API的区别在哪里
webhooks是一个api概念,是微服务api的使用范式之一,也被成为反向api,即:前端不主动发送请求,完全由后端推送。 举个常用例子,比如你的好友发了一条朋友圈,后端将这条消息推送给所有其他好友的客户端,就是 Webhooks 的典型场景。
webhook是什么 与API的区别在哪里
|
11月前
「Mac畅玩鸿蒙与硬件41」UI互动应用篇18 - 多滑块联动控制器
本篇将带你实现一个多滑块联动的控制器应用。用户可以通过拖动多个滑块,动态控制不同参数(如红绿蓝三色值),并实时显示最终结果。我们将以动态颜色调节为例,展示如何结合状态管理和交互逻辑,打造一个高级的滑块控制器应用。
342 78
「Mac畅玩鸿蒙与硬件41」UI互动应用篇18 - 多滑块联动控制器
|
12月前
|
数据采集 测试技术 Python
自动化淘宝秒杀:使用Selenium WebDriver的实战指南
本文详细介绍了如何利用Selenium WebDriver自动化淘宝秒杀操作,包括环境配置、代码实现及注意事项,旨在帮助读者提升秒杀成功率,同时提醒合理使用以遵守平台规则。
549 8
|
10月前
|
前端开发 Java 程序员
菜鸟之路day02-04拼图小游戏开发一一JAVA基础综合项目
本项目基于黑马程序员教程,涵盖面向对象进阶、继承、多态等知识,历时约24小时完成。项目去除了登录和注册模块,专注于单机游戏体验。使用Git进行版本管理,代码托管于Gitee。项目包含窗体搭建、事件监听、图片加载与打乱、交互逻辑实现、菜单功能及美化界面等内容。通过此项目,巩固了Java基础并提升了实际开发能力。 仓库地址:[https://gitee.com/zhang-tenglan/puzzlegame.git](https://gitee.com/zhang-tenglan/puzzlegame.git)
246 6
|
消息中间件 存储 监控
ActiveMQ、RocketMQ、RabbitMQ、Kafka 的区别
【10月更文挑战第24天】ActiveMQ、RocketMQ、RabbitMQ 和 Kafka 都有各自的特点和优势,在不同的应用场景中发挥着重要作用。在选择消息队列时,需要根据具体的需求、性能要求、扩展性要求等因素进行综合考虑,选择最适合的消息队列技术。同时,随着技术的不断发展和演进,这些消息队列也在不断地更新和完善,以适应不断变化的应用需求。
796 1
|
监控 JavaScript 前端开发
前端的混合之路Meteor篇(六):发布订阅示例代码及如何将Meteor的响应数据映射到vue3的reactive系统
本文介绍了 Meteor 3.0 中的发布-订阅模型,详细讲解了如何在服务器端通过 `Meteor.publish` 发布数据,包括简单发布和自定义发布。客户端则通过 `Meteor.subscribe` 订阅数据,并使用 MiniMongo 实现实时数据同步。此外,还展示了如何在 Vue 3 中将 MiniMongo 的 `cursor` 转化为响应式数组,实现数据的自动更新。
196 2
|
存储 NoSQL Java
分布式session-SpringSession的应用
Spring Session 提供了一种创建和管理 Servlet HttpSession 的方案,默认使用外置 Redis 存储 Session 数据,解决 Session 共享问题。其主要特性包括:提供 API 和实现来管理用户会话,以中立方式替换应用程序容器中的 HttpSession,简化集群会话支持,并在单个浏览器实例中管理多个用户会话。此外,Spring Session 允许通过 headers 提供会话 ID 以使用 RESTful API。结合 Spring Boot 使用时,可通过配置 Redis 依赖和支持缓存的依赖实现 Session 共享。
292 0
分布式session-SpringSession的应用
|
API 调度 C语言
互斥锁,自旋锁,原子操作的原理,区别和实现
v互斥锁,自旋锁,原子操作的原理,区别和实现
345 0
|
弹性计算 编解码 运维
飞天技术沙龙回顾:业务创新新选择,倚天Arm架构深入探讨
阿里云、平头哥与Arm联合举办的飞天技术沙龙在上海举行,聚焦Arm Neoverse核心优势和倚天710计算实例在大数据、视频领域的应用。活动中,专家解读了倚天710的性能提升和成本效益,强调了CIPU云原生基础设施处理器的角色,以及如何通过软件优化实现资源池化和稳定性平衡。实例展示在视频编码和大数据处理上的性能提升分别达到80%和70%的性价比优化。沙龙吸引众多企业代表参与,促进技术交流与实践解决方案的探讨。
飞天技术沙龙回顾:业务创新新选择,倚天Arm架构深入探讨