微服务架构设计与实践:用Spring Cloud实现抖音的推荐系统

简介: 本文基于Spring Cloud实现了一个简化的抖音推荐系统,涵盖用户行为管理、视频资源管理、个性化推荐和实时数据处理四大核心功能。通过Eureka进行服务注册与发现,使用Feign实现服务间调用,并借助Redis缓存用户画像,Kafka传递用户行为数据。文章详细介绍了项目搭建、服务创建及配置过程,包括用户服务、视频服务、推荐服务和数据处理服务的开发步骤。最后,通过业务测试验证了系统的功能,并引入Resilience4j实现服务降级,确保系统在部分服务故障时仍能正常运行。此示例旨在帮助读者理解微服务架构的设计思路与实践方法。

一、引子

抖音的推荐系统是其成功的关键之一,而背后是一套复杂的微服务架构支撑着高并发和庞大的用户数据处理。每当用户刷到新的视频时,背后都有一个复杂的推荐算法在实时运行。而在这样的场景下,构建一个高效、可扩展的微服务架构是至关重要的。本文将通过 Spring Cloud 构建一个简化版的抖音推荐系统,探讨微服务架构的设计与实践。

二、业务梳理

在正式的开发前,我们需要先对这个简化版的推荐系统所需要的功能做下梳理:

用户行为数据

推荐系统的核心在于个性化推荐,而个性化推荐的前提是对用户行为的全面了解。用户的每一次操作(如观看点赞转发评论等)都会影响推荐结果。因此,系统需要具备以下功能:

  • 记录用户行为数据:记录用户在平台上与视频的交互行为(比如用户观看了哪些视频、点赞了哪些视频等)。
  • 管理用户画像:基于用户的历史行为,生成用户的兴趣画像,用于推荐计算。

视频资源管理

抖音作为一个短视频平台,需要管理大量的视频资源。每个视频都有不同的标签(如类型话题风格等),这些标签是推荐算法的重要依据。因此,系统需要:

  • 存储视频基本信息:包括视频的ID、标题、标签、上传时间等。
  • 提供视频分类:根据视频的标签信息,将视频分类以便后续推荐。

个性化推荐

推荐系统的核心功能就是根据用户的兴趣和视频内容的标签,生成个性化推荐列表。为了实现这一功能,系统需要:

  • 获取用户画像和视频标签:结合用户的兴趣画像与视频的标签,匹配用户可能感兴趣的视频。
  • 生成推荐列表:根据算法计算,生成个性化推荐的视频列表并返回给用户。

用户行为数据的实时处理

用户在平台上的行为是实时发生的,因此推荐系统需要能够实时处理这些行为数据,并根据最新的行为更新用户画像。为此,系统需要:

  • 实时处理用户行为:当用户进行某个操作时(如点赞或观看某个视频),系统能够实时接收这些事件,并更新用户画像。
  • 异步处理:为了不影响用户的使用体验,行为数据的处理应尽量异步化,通过消息队列等手段解耦实时数据处理与推荐服务。

通过上述的业务需求梳理,我们最终可以总结出一个简化版的推荐系统需要具备的核心功能:

  • 用户行为管理:记录用户的观看、点赞等行为。
  • 视频资源管理:存储视频的基本信息和标签。
  • 个性化推荐:结合用户画像和视频标签,生成推荐列表。
  • 实时数据处理:处理用户行为数据,实时更新用户画像。

三、架构设计

完成了需求梳理后,我们总结了四大核心功能,进而可以抽象出四个服务来分别完成上述功能,因此,我们可以简单地绘制下业务架构图,如下:
01.png

这里我做了一些简化:比如省略了API网关,客户端直接请求到推荐服务上获取响应。因为我们决定使用SpringCloud来搭建这个项目,所以我们的服务都注册到Eureka上,服务之间的调用则采用Feign实现。另外,分别使用RedisKafka来缓存用户画像和传递用户行为数据。

四、具体实现

通过需求梳理、技术选型、架构设计,我们已经完成了项目开发前的准备工作,现在就可以正式进行开发了。首先需要我们根据业务架构图完成项目的基础搭建。

项目搭建

使用IDEA通过maven-archetype-quickstart进行快速创建,如下:
02.png
接下来通过同样的方式分别创建四大业务模块和Eureka服务模块,项目结构如下:
03.png
此时,在父模块的pom.xml文件中就可以看到子模块都已经被管理起来,我们再引入SpringBoot和
SpringCloud的依赖(Ps:版本可根据喜好自行选择,我这里演示也无所谓了,就不用SpringBoot3了),如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.itasass</groupId>
    <artifactId>recommendation-system</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>pom</packaging>

    <modules>
        <module>eureka-server</module>
        <module>user-service</module>
        <module>video-service</module>
        <module>recommendation-service</module>
        <module>data-processing-service</module>
    </modules>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spring.boot.version>2.6.4</spring.boot.version>
        <spring.cloud.version>2021.0.1</spring.cloud.version>
    </properties>

    <dependencyManagement>

        <dependencies>

            <!-- Spring Boot Dependencies-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring.boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

            <!-- Spring Cloud Dependencies-->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring.cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

        </dependencies>

    </dependencyManagement>

</project>
AI 代码解读

此时,我们就完成了项目的基础搭建,接下来开始编写各个服务的代码。

创建 Eureka 服务

各个服务都有自己的依赖需要导入,Eureka服务所需依赖如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.itasass</groupId>
        <artifactId>recommendation-system</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <artifactId>eureka-server</artifactId>
    <packaging>jar</packaging>


    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>

        <!-- Spring Boot Web Starter -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Eureka Server -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
        </dependency>

    </dependencies>

</project>
AI 代码解读

导入相关依赖后,可以创建 eureka-server 模块中的主类 EurekaServerApplication,如下:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;

@SpringBootApplication
@EnableEurekaServer
public class EurekaServerApplication
{
   
    public static void main( String[] args ) {
   
        SpringApplication.run(EurekaServerApplication.class, args);
    }
}
AI 代码解读

然后我们需要对eureka服务进行配置,在 src/main/resources 目录下创建 application.yml如下:

server:
  port: 8761 # 服务运行的端口

eureka:
  client:
    register-with-eureka: false
    fetch-registry: false
  server:
    enable-self-preservation: false
AI 代码解读

完成了上述配置后,我们需要启动Eureka来看看当前的这个服务是否可以正常运行,也很简单,启动 EurekaServerApplication,访问 http://localhost:8761,此时可以看到 Eureka Server 的控制台界面,如下:
04.png

创建用户服务

依然是先导入依赖,如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.itasass</groupId>
        <artifactId>recommendation-system</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <artifactId>user-service</artifactId>
    <packaging>jar</packaging>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- Spring Boot Web Starter -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Eureka Client -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>

        <!-- Kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>
AI 代码解读

同样地需要配置application.yml,四大业务模块都需要将自己注册到eureka中,如下:

server:
  port: 8081  # 服务运行的端口


spring:
  application:
    name: user-service  # 用户服务的名称
  kafka:
    bootstrap-servers: localhost:9092  # Kafka 服务器地址
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

eureka:
  client:
    service-url:
      defaultZone: http://localhost:8761/eureka/  # Eureka Server 的地址
    fetch-registry: true  # 从 Eureka 拉取服务注册表
    register-with-eureka: true  # 将自己注册到 Eureka
AI 代码解读

同理,完善用户服务的主类,这里主要是模拟用户看过的视频,如下:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import java.util.Arrays;
import java.util.List;

@SpringBootApplication
@EnableEurekaClient
@RestController
public class UserServiceApplication {

    public static void main( String[] args ) {
        SpringApplication.run(UserServiceApplication.class, args);
    }

    @GetMapping("/users/{userId}/history")
    public List<String> getUserHistory(@PathVariable String userId) {
        // 模拟用户的历史行为数据,这里返回一些示例视频ID
        return Arrays.asList("1", "3", "5","7");
    }
}
AI 代码解读

这里说明下,我们本次的实例都采用模拟数据的思路,主要是让大家了解下设计思路,所以就不建立相关的库表了。启动用户服务后就可以在eureka控制台看到user服务了,如下:
05.png
另外,通过工具(如 Apifox)来访问用户服务的 REST API,如下:
06.png
完成了基础的配置,但别忘了在我们的架构设计中用户服务还有一项重任-记录用户行为数据。因此,我们需要在用户服务里编写Kafka的生产者服务,如下:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {
   

  private static final String TOPIC = "user-behavior-topic";  // Kafka 主题名称

  @Autowired
  private KafkaTemplate<String, String> kafkaTemplate;

  /**
   * 发送用户行为到 Kafka
   *
   * @param userId       用户ID
   * @param videoId      视频ID
   * @param videoTag     视频标签
   * @param isInterested 是否感兴趣 (0: 不感兴趣,1: 感兴趣)
   */
  public void sendUserBehavior(String userId, String videoId, String videoTag, int isInterested) {
   
    // 构建消息
    String message = String.format("User:%s watched Video:%s [Tag:%s] with interest:%d", userId, videoId, videoTag, isInterested);
    kafkaTemplate.send(TOPIC, message);
  }

}
AI 代码解读

同时,我们编写一个记录用户观看视频的行为的接口来模拟用户刷视频的场景,代码如下:

import com.recommendation.serive.KafkaProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class UserController {
   

  @Autowired
  private KafkaProducerService kafkaProducerService;

  /**
   * 记录用户观看视频的行为,并发送到 Kafka
   * @param userId 用户 ID
   * @param videoId 视频 ID
   * @param videoTag 视频标签
   * @param isInterested 是否感兴趣 (0: 不感兴趣,1: 感兴趣)
   */
  @PostMapping("/users/{userId}/watch/{videoId}/{videoTag}/{isInterested}")
  public String watchVideo(@PathVariable String userId,
                           @PathVariable String videoId,
                           @PathVariable String videoTag,
                           @PathVariable int isInterested) {
   
    // 调用 Kafka 生产者服务将行为数据(包括视频标签)发送到 Kafka
    kafkaProducerService.sendUserBehavior(userId, videoId, videoTag, isInterested);

    return String.format("User %s watched video %s with tag %s and interest %d", userId, videoId, videoTag, isInterested);
  }
}
AI 代码解读

通过Apifox模拟数据进行调用,查看API的响应结果和Kafka来观察消息是否发送成功,如下:
07.png
08.png
从结果来看,我们的消息发送成功。

创建视频服务

同样地,依然是先完善依赖,如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.itasass</groupId>
        <artifactId>recommendation-system</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <artifactId>video-service</artifactId>
    <packaging>jar</packaging>


    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- Spring Boot Web Starter -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Eureka Client -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>
AI 代码解读

接着配置application.yml文件,如下:

server:
  port: 8082  # 服务运行的端口

spring:
  application:
    name: video-service  # 视频服务的名称

eureka:
  client:
    service-url:
      defaultZone: http://localhost:8761/eureka/  # Eureka Server 的地址
    fetch-registry: true  # 从 Eureka 拉取服务注册表
    register-with-eureka: true  # 将自己注册到 Eureka
AI 代码解读

接着又是主类,视频服务通过模拟提供了所有的视频,如下:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Arrays;
import java.util.List;


@SpringBootApplication
@EnableEurekaClient
@RestController
public class VideoServiceApplication {
   
    public static void main( String[] args ) {
   
        SpringApplication.run(VideoServiceApplication.class, args);
    }

    @GetMapping("/videos")
    public List<Video> getAllVideos() {
   
        // 模拟视频数据,这里返回一些示例视频
        return Arrays.asList(
                new Video("1", "娱乐"),
                new Video("2", "娱乐"),
                new Video("3", "科技"),
                new Video("4", "美食"),
                new Video("5", "科技"),
                new Video("6", "美食"),
                new Video("7", "旅游"),
                new Video("8", "科技")
        );
    }

    static class Video {
   
        private String id;
        private String tag;

        public Video(String id, String tag) {
   
            this.id = id;
            this.tag = tag;
        }

        public String getId() {
   
            return id;
        }

        public String getTag() {
   
            return tag;
        }
    }
}
AI 代码解读

和用户服务一样,启动后查看eureka控制台和测试API调用,如下:
09.png
10.png

创建推荐服务

依然是先进行依赖和配置文件的编写,如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.itasass</groupId>
        <artifactId>recommendation-system</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <artifactId>recommendation-service</artifactId>
    <packaging>jar</packaging>


    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- Spring Boot Web Starter -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Eureka Client -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>

        <!-- Feign Client -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>

        <!-- Spring Boot Redis Starter -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <!-- Jackson Databind (用于 Redis 的序列化/反序列化) -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

    </dependencies>

</project>
AI 代码解读
server:
  port: 8083  # 服务运行的端口

spring:
  application:
    name: recommendation-service  # 推荐服务的名称

eureka:
  client:
    service-url:
      defaultZone: http://localhost:8761/eureka/  # Eureka Server 的地址
    fetch-registry: true  # 从 Eureka 拉取服务注册表
    register-with-eureka: true  # 将自己注册到 Eureka
AI 代码解读

由于推荐服务 需要调用 用户服务视频服务 来获取用户的历史行为和视频列表。我们可以使用 Feign 客户端来简化服务之间的调用。因此,我们需要建立一个client目录来存放客户端接口,如下:
11.png

两个客户端接口都是模拟获取数据,如下:

import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;

import java.util.List;

@FeignClient(name = "user-service")
public interface UserServiceClient {
   

  @GetMapping("/users/{userId}/history")
  List<String> getUserHistory(@PathVariable("userId") String userId);
}
AI 代码解读
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;

import java.util.List;

@FeignClient(name = "video-service")  // 指定服务名称为 video-service
public interface VideoServiceClient {

  @GetMapping("/videos")
  List<Video> getAllVideos();

  class Video {
    private String id;
    private String tag;

    public String getId() {
      return id;
    }

    public void setId(String id) {
      this.id = id;
    }

    public String getTag() {
      return tag;
    }

    public void setTag(String tag) {
      this.tag = tag;
    }
  }
}
AI 代码解读

当然,别忘了推荐服务是根据用户画像来推荐视频,所以这里我们还需要从redis中获取用户画像,那么我们声明一个redis的工具类来获取用户画像,如下:

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Repository;

import java.util.Map;


@Repository
public class RedisUserProfileRepository {
   

  @Autowired
  private StringRedisTemplate redisTemplate;  // 用于操作 Redis

  @Autowired
  private ObjectMapper objectMapper;  // 用于 JSON 序列化/反序列化

  private static final String USER_PROFILE_KEY_PREFIX = "user_profile:";

  /**
   * 从 Redis 中获取用户画像
   * @param userId 用户 ID
   * @return 用户画像(Map结构)
   */
  public Map<String, Object> getUserProfile(String userId) throws JsonProcessingException {
   
    String key = USER_PROFILE_KEY_PREFIX + userId;
    String jsonProfile = redisTemplate.opsForValue().get(key);  // 从 Redis 获取用户画像(JSON 字符串)
    if (jsonProfile != null) {
   
      return objectMapper.readValue(jsonProfile, Map.class);  // 将 JSON 转为 Map
    }
    return null;  // 如果 Redis 中没有用户画像,返回 null
  }
}
AI 代码解读

接着我们在主类里写推荐服务的主要逻辑,大家可以直接看代码的注释,根据用户画像进行推荐,没有则全部返回,如下:

import com.recommendation.client.VideoServiceClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients
@RestController
public class RecommendationServiceApplication {
   
    public static void main( String[] args ) {
   
        SpringApplication.run(RecommendationServiceApplication.class, args);
    }

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private VideoServiceClient videoServiceClient;

    @GetMapping("/recommendations/{userId}")
    public List<VideoServiceClient.Video> getRecommendations(@PathVariable("userId") String userId) {
   
        //从Redis获取用户画像,包括兴趣标签和观看历史
        String userProfileKey = "user:" + userId;
        //获取用户的兴趣标签
        Set<String> userInterests = redisTemplate.opsForSet().members(userProfileKey + ":interests");
        //获取用户的观看历史
        List<String> userHistory = redisTemplate.opsForList().range(userProfileKey + ":history", 0, -1);
        //如果用户画像不存在或用户没有兴趣标签,返回默认推荐列表
        if (userInterests.isEmpty() || userHistory.isEmpty()) {
   
            return videoServiceClient.getAllVideos();
        }
        //获取所有可用的视频列表
        List<VideoServiceClient.Video> allVideos = videoServiceClient.getAllVideos();
        //根据用户画像中的兴趣标签,推荐符合兴趣且用户未看过的视频
        return allVideos.stream()
                //筛选出用户未看过的视频
                .filter(video -> !userHistory.contains(video.getId()))
                //只推荐与用户画像中的兴趣标签匹配的视频
                .filter(video -> userInterests.contains(video.getTag()))
                .collect(Collectors.toList());
    }

}
AI 代码解读

创建数据处理服务

数据处理服务是整个系统运转的关键,一方面它需要接收用户行为数据,另一方面还需要对用户行为数据进行处理去更新用户画像,所以这个服务的主要逻辑也是围绕它们展开。老规矩,依然是先补充依赖和配置文件,如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.itasass</groupId>
        <artifactId>recommendation-system</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <artifactId>data-processing-service</artifactId>
    <packaging>jar</packaging>


    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>

        <!-- Spring Kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <!-- Redis -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <!-- Jackson -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

        <!-- Spring Boot Starter Web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Eureka Client -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>

    </dependencies>
</project>
AI 代码解读
server:
  port: 8084  # 服务运行的端口

spring:
  kafka:
    bootstrap-servers: localhost:9092  # Kafka broker 地址
    consumer:
      group-id: data-processing-group   # Kafka 消费者组 ID
      auto-offset-reset: earliest       # 消费者从最早的消息开始读取
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

  redis:
    host: 127.0.0.1
    port: 6379
AI 代码解读

接下来,通过一个kafka的监听接口来实现上述功能,我这里演示就随意了,大家用MQ传递消息还是用JSON格式,否则你解析很容易出问题,如下:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

@Service
public class KafkaConsumerService {
   

  @Autowired
  private DataProcessingService dataProcessingService;

  // 监听 Kafka 主题,消费用户行为数据
  @KafkaListener(topics = "user-behavior-topic", groupId = "data-processing-group")
  public void consumeUserBehavior(String message) {
   
    try {
   
      System.out.println("Received message: " + message);

      // 使用正则表达式提取信息
      String userId = extractValue(message, "User:(\d+)");
      String videoId = extractValue(message, "Video:(\d+)");
      String videoTag = extractValue(message, "Tag:([^\]]+)");

      if (userId != null && videoId != null && videoTag != null) {
   
        dataProcessingService.processUserBehavior(userId, videoId, videoTag);
      } else {
   
        System.err.println("Failed to parse message: " + message);
      }
    } catch (Exception e) {
   
      System.err.println("Error processing message: " + message);
      e.printStackTrace();
    }
  }

  private String extractValue(String message, String pattern) {
   
    Pattern r = Pattern.compile(pattern);
    Matcher m = r.matcher(message);
    return m.find() ? m.group(1) : null;
  }

}
AI 代码解读

在数据处理服务中,我们需要处理消息并更新用户画像,如下:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

@Service
public class DataProcessingService {

  @Autowired
  private StringRedisTemplate redisTemplate;

  private static final String USER_PROFILE_KEY_PREFIX = "user:";
  private static final String USER_HISTORY_KEY = "history";
  private static final String USER_INTERESTS_KEY = "interests";

  /**
   * 处理用户行为数据:更新 Redis 中的用户画像
   * @param userId 用户 ID
   * @param videoId 视频 ID
   * @param videoTag 视频的标签
   */
  public void processUserBehavior(String userId, String videoId, String videoTag) {
    //1.更新用户的观看历史记录
    updateUserHistory(userId, videoId);
    //2.更新用户的兴趣标签
    updateUserInterests(userId, videoTag);
  }

  /**
   * 更新用户的观看历史记录
   * @param userId 用户 ID
   * @param videoId 视频 ID
   */
  private void updateUserHistory(String userId, String videoId) {
    String userProfileKey = USER_PROFILE_KEY_PREFIX + userId;
    redisTemplate.opsForList()
            .leftPush(userProfileKey + ":" + USER_HISTORY_KEY, videoId);
  }

  /**
   * 更新用户的兴趣标签
   * @param userId 用户 ID
   * @param tag 视频的标签
   */
  private void updateUserInterests(String userId, String tag) {
    String userProfileKey = USER_PROFILE_KEY_PREFIX + userId;
    redisTemplate.opsForSet()
            .add(userProfileKey + ":" + USER_INTERESTS_KEY, tag);
  }

}
AI 代码解读

至此,我们的这个简版推荐系统就搭建完成了,可以看到eureka的控制台上四大业务服务都已经注册到了,如下:
12.png

五、业务测试

完成了系统的开发后,我们现在开始测试,首先我们先访问推荐服务,按照我们的逻辑:由于是第一次访问还没有形成画像,此时应该返回所有视频,如下:
13.png
可以看到此时返回了我们模拟的所有8个视频,那么假定我这个用户对科技类视频感兴趣,假如我看了id为3的视频并点赞了,那么下次推荐服务应该只返回id=5和8的视频。我们先去调用一次记录用户观看视频的行为的接口,如下:
14.png
然后我们去看下reids里是否已经有用户画像了,如下:
15.png
16.png
那么这时候我们再去调用推荐服务,就会得到我们上面期望的结果了,如下:
17.png
我们的推荐系统通过多个服务的协作成功实现了预期功能,但是按照现在的实现,如果其中一个服务挂了,整个推荐系统就不可用了,显然这是我们不期望发生的,所以我们就需要考虑做服务降级,确保即使某个服务出问题,不影响整个系统的使用。

六、服务降级

我们在推荐服务的主类接口里有调用redis来获取用户画像,就以它为例,因为我们用的springcloud,就直接用配套的resilience4j来实现,依然是先引入依赖和补充配置,如下:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-circuitbreaker-resilience4j</artifactId>
</dependency>
AI 代码解读
resilience4j:
  circuitbreaker:
    instances:
      redisService:  # 熔断器名称,与 @CircuitBreaker 注解中的 name 属性对应
        slidingWindowSize: 10  # 熔断器的滑动窗口大小,用于计算失败率的调用数量
        failureRateThreshold: 50  # 失败率阈值,50% 代表当失败率达到 50% 时开启熔断
        waitDurationInOpenState: 10000  # 熔断器在打开状态下保持的时间(毫秒),即熔断器打开后等待 10 秒再尝试进入半开状态
        permittedNumberOfCallsInHalfOpenState: 3  # 半开状态下允许的最大调用次数,用于测试服务是否恢复
        minimumNumberOfCalls: 5  # 统计失败率所需的最小调用次数,至少进行 5 次调用后熔断器才会判断是否开启熔断
        automaticTransitionFromOpenToHalfOpenEnabled: true  # 允许自动从打开状态过渡到半开状态,尝试恢复服务
        slowCallDurationThreshold: 2000  # 慢调用的阈值,2 秒内完成的调用视为正常,超过 2 秒的调用视为慢调用
        slowCallRateThreshold: 50  # 慢调用比例阈值,50% 代表当慢调用比例达到 50% 时开启熔断
AI 代码解读

然后我们改造原推荐接口,加上注解和降级方法,主要逻辑是当redis连接不上时,直接调视频服务的接口获取所有视频返回,代码如下:

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private VideoServiceClient videoServiceClient;

    private static final String REDIS_BACKEND = "redisService"; // 定义熔断器名称

    @GetMapping("/recommendations/{userId}")
    @CircuitBreaker(name = REDIS_BACKEND, fallbackMethod = "getAllVideosFallback") // 添加熔断器
    public List<VideoServiceClient.Video> getRecommendations(@PathVariable("userId") String userId) {
   
        //从Redis获取用户画像,包括兴趣标签和观看历史
        String userProfileKey = "user:" + userId;
        //获取用户的兴趣标签
        Set<String> userInterests = redisTemplate.opsForSet().members(userProfileKey + ":interests");
        //获取用户的观看历史
        List<String> userHistory = redisTemplate.opsForList().range(userProfileKey + ":history", 0, -1);
        //如果用户画像不存在或用户没有兴趣标签,返回默认推荐列表
        if (userInterests.isEmpty() || userHistory.isEmpty()) {
   
            return videoServiceClient.getAllVideos();
        }
        //获取所有可用的视频列表
        List<VideoServiceClient.Video> allVideos = videoServiceClient.getAllVideos();
        //根据用户画像中的兴趣标签,推荐符合兴趣且用户未看过的视频
        return allVideos.stream()
                //筛选出用户未看过的视频
                .filter(video -> !userHistory.contains(String.valueOf(video.getId())))
                //只推荐与用户画像中的兴趣标签匹配的视频
                .filter(video -> userInterests.contains(String.valueOf(video.getTag())))
                .collect(Collectors.toList());
    }

    // 降级方法,当 Redis 连接失败时调用此方法
    public List<VideoServiceClient.Video> getAllVideosFallback(Throwable ex) {
   
        // 打印日志,记录异常信息
        System.err.println("Redis is unavailable. Returning fallback recommendation. Exception: " + ex.getMessage());

        // 在降级方法中调用 Feign 客户端返回默认的视频列表
        return videoServiceClient.getAllVideos();
    }
}
AI 代码解读

然后我们把Redis停掉,如下:
18.png
这时候我们再调用推荐接口,分别观察控制台有输出报错信息提示我们redis连接失败以及接口返回了所有视频数据,如下:
19.png
20.png
我们这里模拟了redis挂掉的情况,但其实熔断降级机制只要是需要调用别的服务的地方,我们都可以做相关处理来提升可用性。

七、小结

本文基于Spring Cloud实现了一个简单的推荐系统,当然真实的推荐系统远比这复杂,涉及到很多算法,例如协同过滤、用户行为等等。这篇文章的重点是带大家建立微服务的设计思想:怎么围绕业务去做服务拆解,怎么通过Spring Cloud去构建微服务项目以及服务的熔断降级处理,并最终希望让大家在看完后能够有所收获!

目录
打赏
0
17
17
0
153
分享
相关文章
微服务——SpringBoot使用归纳——Spring Boot 中集成Redis——Redis 介绍
本文介绍在 Spring Boot 中集成 Redis 的方法。Redis 是一种支持多种数据结构的非关系型数据库(NoSQL),具备高并发、高性能和灵活扩展的特点,适用于缓存、实时数据分析等场景。其数据以键值对形式存储,支持字符串、哈希、列表、集合等类型。通过将 Redis 与 Mysql 集群结合使用,可实现数据同步,提升系统稳定性。例如,在网站架构中优先从 Redis 获取数据,故障时回退至 Mysql,确保服务不中断。
43 0
微服务——SpringBoot使用归纳——Spring Boot 中集成Redis——Redis 介绍
|
6天前
|
微服务——SpringBoot使用归纳——Spring Boot中集成 Shiro——Shiro 身份和权限认证
本文介绍了 Apache Shiro 的身份认证与权限认证机制。在身份认证部分,分析了 Shiro 的认证流程,包括应用程序调用 `Subject.login(token)` 方法、SecurityManager 接管认证以及通过 Realm 进行具体的安全验证。权限认证部分阐述了权限(permission)、角色(role)和用户(user)三者的关系,其中用户可拥有多个角色,角色则对应不同的权限组合,例如普通用户仅能查看或添加信息,而管理员可执行所有操作。
39 0
微服务——SpringBoot使用归纳——Spring Boot中集成 Shiro——Shiro 三大核心组件
本课程介绍如何在Spring Boot中集成Shiro框架,主要讲解Shiro的认证与授权功能。Shiro是一个简单易用的Java安全框架,用于认证、授权、加密和会话管理等。其核心组件包括Subject(认证主体)、SecurityManager(安全管理员)和Realm(域)。Subject负责身份认证,包含Principals(身份)和Credentials(凭证);SecurityManager是架构核心,协调内部组件运作;Realm则是连接Shiro与应用数据的桥梁,用于访问用户账户及权限信息。通过学习,您将掌握Shiro的基本原理及其在项目中的应用。
40 0
微服务——SpringBoot使用归纳——Spring Boot中集成ActiveMQ——ActiveMQ安装
本教程介绍ActiveMQ的安装与基本使用。首先从官网下载apache-activemq-5.15.3版本,解压后即可完成安装,非常便捷。启动时进入解压目录下的bin文件夹,根据系统选择win32或win64,运行activemq.bat启动服务。通过浏览器访问`http://127.0.0.1:8161/admin/`可进入管理界面,默认用户名密码为admin/admin。ActiveMQ支持两种消息模式:点对点(Queue)和发布/订阅(Topic)。前者确保每条消息仅被一个消费者消费,后者允许多个消费者同时接收相同消息。
37 0
微服务——SpringBoot使用归纳——Spring Boot中集成ActiveMQ——ActiveMQ安装
微服务——SpringBoot使用归纳——Spring Boot中集成ActiveMQ——发布/订阅消息的生产和消费
本文详细讲解了Spring Boot中ActiveMQ的发布/订阅消息机制,包括消息生产和消费的具体实现方式。生产端通过`sendMessage`方法发送订阅消息,消费端则需配置`application.yml`或自定义工厂以支持topic消息监听。为解决点对点与发布/订阅消息兼容问题,可通过设置`containerFactory`实现两者共存。最后,文章还提供了测试方法及总结,帮助读者掌握ActiveMQ在异步消息处理中的应用。
50 0
微服务——SpringBoot使用归纳——Spring Boot中集成ActiveMQ——ActiveMQ集成
本文介绍了在 Spring Boot 中集成 ActiveMQ 的详细步骤。首先通过引入 `spring-boot-starter-activemq` 依赖并配置 `application.yml` 文件实现基本设置。接着,创建 Queue 和 Topic 消息类型,分别使用 `ActiveMQQueue` 和 `ActiveMQTopic` 类完成配置。随后,利用 `JmsMessagingTemplate` 实现消息发送功能,并通过 Controller 和监听器实现点对点消息的生产和消费。最后,通过浏览器访问测试接口验证消息传递的成功性。
18 0
微服务——SpringBoot使用归纳——Spring Boot中集成ActiveMQ—— JMS 和 ActiveMQ 介绍
本文介绍如何在Spring Boot中集成ActiveMQ,首先阐述了JMS(Java消息服务)的概念及其作为与具体平台无关的API在异步通信中的作用。接着说明了JMS的主要对象模型,如连接工厂、会话、生产者和消费者等,并指出JMS支持点对点和发布/订阅两种消息类型。随后重点讲解了ActiveMQ,作为Apache开源的消息总线,它完全支持JMS规范,适用于异步消息处理。最后,文章探讨了在Spring Boot中使用队列(Queue)和主题(Topic)这两种消息通信形式的方法。
22 0
|
6天前
|
微服务——SpringBoot使用归纳——Spring Boot 中集成Redis——Spring Boot 集成 Redis
本文介绍了在Spring Boot中集成Redis的方法,包括依赖导入、Redis配置及常用API的使用。通过导入`spring-boot-starter-data-redis`依赖和配置`application.yml`文件,可轻松实现Redis集成。文中详细讲解了StringRedisTemplate的使用,适用于字符串操作,并结合FastJSON将实体类转换为JSON存储。还展示了Redis的string、hash和list类型的操作示例。最后总结了Redis在缓存和高并发场景中的应用价值,并提供课程源代码下载链接。
29 0
|
6天前
|
微服务——SpringBoot使用归纳——Spring Boot 中集成Redis——Redis 安装
本教程介绍在 VMware 虚拟机(CentOS 7)或阿里云服务器中安装 Redis 的过程,包括安装 gcc 编译环境、下载 Redis(官网或 wget)、解压安装、修改配置文件(如 bind、daemonize、requirepass 等设置)、启动 Redis 服务及测试客户端连接。通过 set 和 get 命令验证安装是否成功。适用于初学者快速上手 Redis 部署。
17 0
微服务——SpringBoot使用归纳——Spring Boot中使用拦截器——拦截器使用实例
本文主要讲解了Spring Boot中拦截器的使用实例,包括判断用户是否登录和取消特定拦截操作两大场景。通过token验证实现登录状态检查,未登录则拦截请求;定义自定义注解@UnInterception实现灵活取消拦截功能。最后总结了拦截器的创建、配置及对静态资源的影响,并提供两种配置方式供选择,帮助读者掌握拦截器的实际应用。
19 0
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等