【Springboot】微服务学习笔记十:微服务项目整合Kafka实现文章上下架功能

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 【Springboot】微服务学习笔记十:微服务项目整合Kafka实现文章上下架功能

 一:Kafka消息发送快速入门

1.传递字符串消息

(1)发送消息

创建一个Controller包并编写一个测试类用于发送消息

package com.my.kafka.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class HelloController {
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;
    @GetMapping("hello")
    public String helloProducer(){
        kafkaTemplate.send("my-topic","Hello~");
        return "ok";
    }
}

image.gif

(2)监听消息

编写测试类用于接收消息:

package com.my.kafka.listener;
import org.junit.platform.commons.util.StringUtils;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class HelloListener {
    @KafkaListener(topics = "my-topic")
    public void helloListener(String message) {
        if(StringUtils.isNotBlank(message)) {
            System.out.println(message);
        }
    }
}

image.gif

(3)测试结果

打开浏览器输入localhost:9991/hello,然后到控制台查看消息,可以看到成功消息监听到并且进行了消费。

image.gif编辑

2.传递对象消息

目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有两种方式:

方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强,这里不做介绍。

方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可,本项目采用这种方式。

(1)修改生产者代码

@GetMapping("hello")
public String helloProducer(){
    User user = new User();
    user.setName("赵四");
    user.setAge(20);
    kafkaTemplate.send("my-topic", JSON.toJSONString(user));
    return "ok";
}

image.gif

(2)结果测试

image.gif编辑

可以看到成功接收都对象参数,后期要使用该对象只需要将其转换成User对象即可。

二:功能引入

1.需求分析

       发布文章之后,可能会由于文章出现某些错误或者其他原因,我们会在文章管理端实现文章的上下架功能(见下图),也即当管理端实现对文章下架之后移动端将不会再展示该文章,只有该文章重新被上架之后才能在移动端看到该文章信息。

image.gif编辑

2.逻辑分析

image.gif编辑

       后端接收到前端传过来的参数之后要先做一个校验,参数不为空才能继续往下执行,首先应该根据前端传过来的文章id(自媒体端文章id)查询自媒体数据库的文章信息并判断该文章是否已是发布状态,因为只有审核成功并成功发布了的文章才能进行上下架操作。自媒体端微服务对文章上下架状态进行修改之后便可以向Kafka发送一条消息,该消息为Map对象,里面存储的数据为移动端的文章id以及前端传过来的上下架参数enable,当然要将该Map对象转换成JSON字符串才能进行发送。

       文章微服务监听到Kafka发送过来的消息之后将JSON字符串转换成Map对象之后再获取相关参数对移动端文章的上下架状态进行修改。

三:前期准备

1.引入依赖

<!-- kafkfa -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>

image.gif

2.定义常量

package com.my.common.constans;
public class WmNewsMessageConstants {
    public static final String WM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic";
}

image.gif

3.Kafka配置信息

由于我是用Nacos来作为注册中心,所以配置信息放置在Nacos上面即可。

(1)自媒体端配置

spring:
  kafka:
    bootstrap-servers: 4.234.52.122:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

image.gif

(2)移动端配置

spring:
  kafka:
    bootstrap-servers: 4.234.52.122:9092
    consumer:
      group-id: ${spring.application.name}-test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

image.gif

四:代码实现

1.自媒体端

@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
/**
 * 文章下架或上架
 * @param id
 * @param enable
 * @return
 */
@Override
public ResponseResult downOrUp(Integer id,Integer enable) {
    log.info("执行文章上下架操作...");
    if(id == null || enable == null) {
        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
    }
    //根据id获取文章
    WmNews news = getById(id);
    if(news == null) {
        return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章信息不存在");
    }
    //获取当前文章状态
    Short status = news.getStatus();
    if(!status.equals(WmNews.Status.PUBLISHED.getCode())) {
        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"文章非发布状态,不能上下架");
    }
    //更改文章状态
    news.setEnable(enable.shortValue());
    updateById(news);
    log.info("更改文章上架状态{}-->{}",status,news.getEnable());
    //发送消息到Kafka
    Map<String, Object> map = new HashMap<>();
    map.put("articleId",news.getArticleId());
    map.put("enable",enable.shortValue());
    kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map));
    log.info("发送消息到Kafka...");
    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}

image.gif

2.移动端

(1)设置监听器

package com.my.article.listener;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.my.article.service.ApArticleService;
import com.my.common.constans.WmNewsMessageConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.kafka.annotation.KafkaListener;
@Slf4j
@Component
public class EnableListener {
    @Autowired
    private ApArticleService apArticleService;
    @KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
    public void downOrUp(String message) {
        if(StringUtils.isNotBlank(message)) {
            log.info("监听到消息{}",message);
            apArticleService.downOrUp(message);
        }
    }
}

image.gif

(2)获取消息并修改文章状态

/**
* 文章上下架
* @param message
* @return
*/
@Override
public ResponseResult downOrUp(String message) {
    Map map = JSON.parseObject(message, Map.class);
    //获取文章id
    Long articleId = (Long) map.get("articleId");
    //获取文章待修改状态
    Integer enable = (Integer) map.get("enable");
    //查询文章配置
    ApArticleConfig apArticleConfig = apArticleConfigMapper.selectOne
            (Wrappers.<ApArticleConfig>lambdaQuery().eq(ApArticleConfig::getArticleId, articleId));
    if(apArticleConfig != null) {
        //上架
        if(enable == 1) {
            log.info("文章重新上架");
            apArticleConfig.setIsDown(false);
            apArticleConfigMapper.updateById(apArticleConfig);
        }
        //下架
        if(enable == 0) {
            log.info("文章下架");
            apArticleConfig.setIsDown(true);
            apArticleConfigMapper.updateById(apArticleConfig);
        }
    }
    else {
        throw new RuntimeException("文章信息不存在");
    }
    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}

image.gif


相关文章
|
3月前
|
Java 数据库连接 Maven
springBoot:项目建立&配置修改&yaml的使用&resource 文件夹(二)
本文档介绍了如何创建一个基于Maven的项目,并配置阿里云仓库、数据库连接、端口号、自定义启动横幅及多环境配置等。同时,详细说明了如何使用YAML格式进行配置,以及如何处理静态资源和模板文件。文档还涵盖了Spring Boot项目的`application.properties`和`application.yaml`文件的配置方法,包括设置数据库驱动、URL、用户名、密码等关键信息,以及如何通过配置文件管理不同环境下的应用设置。
336 1
|
3月前
|
NoSQL Java MongoDB
Springboot WebFlux项目结合mongodb进行crud
这篇文章介绍了如何使用Spring Boot WebFlux框架结合MongoDB进行基本的CRUD(创建、读取、更新、删除)操作,包括项目设置、实体类和Repository的创建、控制器的实现以及配置文件的编写。
64 0
Springboot WebFlux项目结合mongodb进行crud
|
2月前
|
Java 应用服务中间件
SpringBoot获取项目文件的绝对路径和相对路径
SpringBoot获取项目文件的绝对路径和相对路径
118 1
SpringBoot获取项目文件的绝对路径和相对路径
|
2月前
|
分布式计算 关系型数据库 MySQL
SpringBoot项目中mysql字段映射使用JSONObject和JSONArray类型
SpringBoot项目中mysql字段映射使用JSONObject和JSONArray类型 图像处理 光通信 分布式计算 算法语言 信息技术 计算机应用
62 8
|
2月前
|
存储 运维 安全
Spring运维之boot项目多环境(yaml 多文件 proerties)及分组管理与开发控制
通过以上措施,可以保证Spring Boot项目的配置管理在专业水准上,并且易于维护和管理,符合搜索引擎收录标准。
52 2
|
3月前
|
JavaScript 前端开发 Java
解决跨域问题大集合:vue-cli项目 和 java/springboot(6种方式) 两端解决(完美解决)
这篇文章详细介绍了如何在前端Vue项目和后端Spring Boot项目中通过多种方式解决跨域问题。
414 1
解决跨域问题大集合:vue-cli项目 和 java/springboot(6种方式) 两端解决(完美解决)
|
2月前
|
JavaScript 前端开发 Java
SpringBoot项目的html页面使用axios进行get post请求
SpringBoot项目的html页面使用axios进行get post请求
60 2
|
2月前
|
前端开发 Java Spring
SpringBoot项目thymeleaf页面支持词条国际化切换
SpringBoot项目thymeleaf页面支持词条国际化切换
83 2
|
2月前
|
JSON Java 数据库
SpringBoot项目使用AOP及自定义注解保存操作日志
SpringBoot项目使用AOP及自定义注解保存操作日志
56 1
|
3月前
|
Java Maven Android开发
eclipse如何导入springboot项目
本文介绍了如何在Eclipse中导入Spring Boot项目。
50 1
eclipse如何导入springboot项目