微服务架构设计与实践:用Spring Cloud实现抖音的推荐系统

简介: 本文基于Spring Cloud实现了一个简化的抖音推荐系统,涵盖用户行为管理、视频资源管理、个性化推荐和实时数据处理四大核心功能。通过Eureka进行服务注册与发现,使用Feign实现服务间调用,并借助Redis缓存用户画像,Kafka传递用户行为数据。文章详细介绍了项目搭建、服务创建及配置过程,包括用户服务、视频服务、推荐服务和数据处理服务的开发步骤。最后,通过业务测试验证了系统的功能,并引入Resilience4j实现服务降级,确保系统在部分服务故障时仍能正常运行。此示例旨在帮助读者理解微服务架构的设计思路与实践方法。

一、引子

抖音的推荐系统是其成功的关键之一,而背后是一套复杂的微服务架构支撑着高并发和庞大的用户数据处理。每当用户刷到新的视频时,背后都有一个复杂的推荐算法在实时运行。而在这样的场景下,构建一个高效、可扩展的微服务架构是至关重要的。本文将通过 Spring Cloud 构建一个简化版的抖音推荐系统,探讨微服务架构的设计与实践。

二、业务梳理

在正式的开发前,我们需要先对这个简化版的推荐系统所需要的功能做下梳理:

用户行为数据

推荐系统的核心在于个性化推荐,而个性化推荐的前提是对用户行为的全面了解。用户的每一次操作(如观看点赞转发评论等)都会影响推荐结果。因此,系统需要具备以下功能:

  • 记录用户行为数据:记录用户在平台上与视频的交互行为(比如用户观看了哪些视频、点赞了哪些视频等)。
  • 管理用户画像:基于用户的历史行为,生成用户的兴趣画像,用于推荐计算。

视频资源管理

抖音作为一个短视频平台,需要管理大量的视频资源。每个视频都有不同的标签(如类型话题风格等),这些标签是推荐算法的重要依据。因此,系统需要:

  • 存储视频基本信息:包括视频的ID、标题、标签、上传时间等。
  • 提供视频分类:根据视频的标签信息,将视频分类以便后续推荐。

个性化推荐

推荐系统的核心功能就是根据用户的兴趣和视频内容的标签,生成个性化推荐列表。为了实现这一功能,系统需要:

  • 获取用户画像和视频标签:结合用户的兴趣画像与视频的标签,匹配用户可能感兴趣的视频。
  • 生成推荐列表:根据算法计算,生成个性化推荐的视频列表并返回给用户。

用户行为数据的实时处理

用户在平台上的行为是实时发生的,因此推荐系统需要能够实时处理这些行为数据,并根据最新的行为更新用户画像。为此,系统需要:

  • 实时处理用户行为:当用户进行某个操作时(如点赞或观看某个视频),系统能够实时接收这些事件,并更新用户画像。
  • 异步处理:为了不影响用户的使用体验,行为数据的处理应尽量异步化,通过消息队列等手段解耦实时数据处理与推荐服务。

通过上述的业务需求梳理,我们最终可以总结出一个简化版的推荐系统需要具备的核心功能:

  • 用户行为管理:记录用户的观看、点赞等行为。
  • 视频资源管理:存储视频的基本信息和标签。
  • 个性化推荐:结合用户画像和视频标签,生成推荐列表。
  • 实时数据处理:处理用户行为数据,实时更新用户画像。

三、架构设计

完成了需求梳理后,我们总结了四大核心功能,进而可以抽象出四个服务来分别完成上述功能,因此,我们可以简单地绘制下业务架构图,如下:
01.png

这里我做了一些简化:比如省略了API网关,客户端直接请求到推荐服务上获取响应。因为我们决定使用SpringCloud来搭建这个项目,所以我们的服务都注册到Eureka上,服务之间的调用则采用Feign实现。另外,分别使用RedisKafka来缓存用户画像和传递用户行为数据。

四、具体实现

通过需求梳理、技术选型、架构设计,我们已经完成了项目开发前的准备工作,现在就可以正式进行开发了。首先需要我们根据业务架构图完成项目的基础搭建。

项目搭建

使用IDEA通过maven-archetype-quickstart进行快速创建,如下:
02.png
接下来通过同样的方式分别创建四大业务模块和Eureka服务模块,项目结构如下:
03.png
此时,在父模块的pom.xml文件中就可以看到子模块都已经被管理起来,我们再引入SpringBoot和
SpringCloud的依赖(Ps:版本可根据喜好自行选择,我这里演示也无所谓了,就不用SpringBoot3了),如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.itasass</groupId>
    <artifactId>recommendation-system</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>pom</packaging>

    <modules>
        <module>eureka-server</module>
        <module>user-service</module>
        <module>video-service</module>
        <module>recommendation-service</module>
        <module>data-processing-service</module>
    </modules>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spring.boot.version>2.6.4</spring.boot.version>
        <spring.cloud.version>2021.0.1</spring.cloud.version>
    </properties>

    <dependencyManagement>

        <dependencies>

            <!-- Spring Boot Dependencies-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring.boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

            <!-- Spring Cloud Dependencies-->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring.cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

        </dependencies>

    </dependencyManagement>

</project>
AI 代码解读

此时,我们就完成了项目的基础搭建,接下来开始编写各个服务的代码。

创建 Eureka 服务

各个服务都有自己的依赖需要导入,Eureka服务所需依赖如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.itasass</groupId>
        <artifactId>recommendation-system</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <artifactId>eureka-server</artifactId>
    <packaging>jar</packaging>


    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>

        <!-- Spring Boot Web Starter -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Eureka Server -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
        </dependency>

    </dependencies>

</project>
AI 代码解读

导入相关依赖后,可以创建 eureka-server 模块中的主类 EurekaServerApplication,如下:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;

@SpringBootApplication
@EnableEurekaServer
public class EurekaServerApplication
{
   
    public static void main( String[] args ) {
   
        SpringApplication.run(EurekaServerApplication.class, args);
    }
}
AI 代码解读

然后我们需要对eureka服务进行配置,在 src/main/resources 目录下创建 application.yml如下:

server:
  port: 8761 # 服务运行的端口

eureka:
  client:
    register-with-eureka: false
    fetch-registry: false
  server:
    enable-self-preservation: false
AI 代码解读

完成了上述配置后,我们需要启动Eureka来看看当前的这个服务是否可以正常运行,也很简单,启动 EurekaServerApplication,访问 http://localhost:8761,此时可以看到 Eureka Server 的控制台界面,如下:
04.png

创建用户服务

依然是先导入依赖,如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.itasass</groupId>
        <artifactId>recommendation-system</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <artifactId>user-service</artifactId>
    <packaging>jar</packaging>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- Spring Boot Web Starter -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Eureka Client -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>

        <!-- Kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>
AI 代码解读

同样地需要配置application.yml,四大业务模块都需要将自己注册到eureka中,如下:

server:
  port: 8081  # 服务运行的端口


spring:
  application:
    name: user-service  # 用户服务的名称
  kafka:
    bootstrap-servers: localhost:9092  # Kafka 服务器地址
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

eureka:
  client:
    service-url:
      defaultZone: http://localhost:8761/eureka/  # Eureka Server 的地址
    fetch-registry: true  # 从 Eureka 拉取服务注册表
    register-with-eureka: true  # 将自己注册到 Eureka
AI 代码解读

同理,完善用户服务的主类,这里主要是模拟用户看过的视频,如下:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import java.util.Arrays;
import java.util.List;

@SpringBootApplication
@EnableEurekaClient
@RestController
public class UserServiceApplication {

    public static void main( String[] args ) {
        SpringApplication.run(UserServiceApplication.class, args);
    }

    @GetMapping("/users/{userId}/history")
    public List<String> getUserHistory(@PathVariable String userId) {
        // 模拟用户的历史行为数据,这里返回一些示例视频ID
        return Arrays.asList("1", "3", "5","7");
    }
}
AI 代码解读

这里说明下,我们本次的实例都采用模拟数据的思路,主要是让大家了解下设计思路,所以就不建立相关的库表了。启动用户服务后就可以在eureka控制台看到user服务了,如下:
05.png
另外,通过工具(如 Apifox)来访问用户服务的 REST API,如下:
06.png
完成了基础的配置,但别忘了在我们的架构设计中用户服务还有一项重任-记录用户行为数据。因此,我们需要在用户服务里编写Kafka的生产者服务,如下:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {
   

  private static final String TOPIC = "user-behavior-topic";  // Kafka 主题名称

  @Autowired
  private KafkaTemplate<String, String> kafkaTemplate;

  /**
   * 发送用户行为到 Kafka
   *
   * @param userId       用户ID
   * @param videoId      视频ID
   * @param videoTag     视频标签
   * @param isInterested 是否感兴趣 (0: 不感兴趣,1: 感兴趣)
   */
  public void sendUserBehavior(String userId, String videoId, String videoTag, int isInterested) {
   
    // 构建消息
    String message = String.format("User:%s watched Video:%s [Tag:%s] with interest:%d", userId, videoId, videoTag, isInterested);
    kafkaTemplate.send(TOPIC, message);
  }

}
AI 代码解读

同时,我们编写一个记录用户观看视频的行为的接口来模拟用户刷视频的场景,代码如下:

import com.recommendation.serive.KafkaProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class UserController {
   

  @Autowired
  private KafkaProducerService kafkaProducerService;

  /**
   * 记录用户观看视频的行为,并发送到 Kafka
   * @param userId 用户 ID
   * @param videoId 视频 ID
   * @param videoTag 视频标签
   * @param isInterested 是否感兴趣 (0: 不感兴趣,1: 感兴趣)
   */
  @PostMapping("/users/{userId}/watch/{videoId}/{videoTag}/{isInterested}")
  public String watchVideo(@PathVariable String userId,
                           @PathVariable String videoId,
                           @PathVariable String videoTag,
                           @PathVariable int isInterested) {
   
    // 调用 Kafka 生产者服务将行为数据(包括视频标签)发送到 Kafka
    kafkaProducerService.sendUserBehavior(userId, videoId, videoTag, isInterested);

    return String.format("User %s watched video %s with tag %s and interest %d", userId, videoId, videoTag, isInterested);
  }
}
AI 代码解读

通过Apifox模拟数据进行调用,查看API的响应结果和Kafka来观察消息是否发送成功,如下:
07.png
08.png
从结果来看,我们的消息发送成功。

创建视频服务

同样地,依然是先完善依赖,如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.itasass</groupId>
        <artifactId>recommendation-system</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <artifactId>video-service</artifactId>
    <packaging>jar</packaging>


    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- Spring Boot Web Starter -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Eureka Client -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>
AI 代码解读

接着配置application.yml文件,如下:

server:
  port: 8082  # 服务运行的端口

spring:
  application:
    name: video-service  # 视频服务的名称

eureka:
  client:
    service-url:
      defaultZone: http://localhost:8761/eureka/  # Eureka Server 的地址
    fetch-registry: true  # 从 Eureka 拉取服务注册表
    register-with-eureka: true  # 将自己注册到 Eureka
AI 代码解读

接着又是主类,视频服务通过模拟提供了所有的视频,如下:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Arrays;
import java.util.List;


@SpringBootApplication
@EnableEurekaClient
@RestController
public class VideoServiceApplication {
   
    public static void main( String[] args ) {
   
        SpringApplication.run(VideoServiceApplication.class, args);
    }

    @GetMapping("/videos")
    public List<Video> getAllVideos() {
   
        // 模拟视频数据,这里返回一些示例视频
        return Arrays.asList(
                new Video("1", "娱乐"),
                new Video("2", "娱乐"),
                new Video("3", "科技"),
                new Video("4", "美食"),
                new Video("5", "科技"),
                new Video("6", "美食"),
                new Video("7", "旅游"),
                new Video("8", "科技")
        );
    }

    static class Video {
   
        private String id;
        private String tag;

        public Video(String id, String tag) {
   
            this.id = id;
            this.tag = tag;
        }

        public String getId() {
   
            return id;
        }

        public String getTag() {
   
            return tag;
        }
    }
}
AI 代码解读

和用户服务一样,启动后查看eureka控制台和测试API调用,如下:
09.png
10.png

创建推荐服务

依然是先进行依赖和配置文件的编写,如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.itasass</groupId>
        <artifactId>recommendation-system</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <artifactId>recommendation-service</artifactId>
    <packaging>jar</packaging>


    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- Spring Boot Web Starter -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Eureka Client -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>

        <!-- Feign Client -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>

        <!-- Spring Boot Redis Starter -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <!-- Jackson Databind (用于 Redis 的序列化/反序列化) -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

    </dependencies>

</project>
AI 代码解读
server:
  port: 8083  # 服务运行的端口

spring:
  application:
    name: recommendation-service  # 推荐服务的名称

eureka:
  client:
    service-url:
      defaultZone: http://localhost:8761/eureka/  # Eureka Server 的地址
    fetch-registry: true  # 从 Eureka 拉取服务注册表
    register-with-eureka: true  # 将自己注册到 Eureka
AI 代码解读

由于推荐服务 需要调用 用户服务视频服务 来获取用户的历史行为和视频列表。我们可以使用 Feign 客户端来简化服务之间的调用。因此,我们需要建立一个client目录来存放客户端接口,如下:
11.png

两个客户端接口都是模拟获取数据,如下:

import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;

import java.util.List;

@FeignClient(name = "user-service")
public interface UserServiceClient {
   

  @GetMapping("/users/{userId}/history")
  List<String> getUserHistory(@PathVariable("userId") String userId);
}
AI 代码解读
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;

import java.util.List;

@FeignClient(name = "video-service")  // 指定服务名称为 video-service
public interface VideoServiceClient {

  @GetMapping("/videos")
  List<Video> getAllVideos();

  class Video {
    private String id;
    private String tag;

    public String getId() {
      return id;
    }

    public void setId(String id) {
      this.id = id;
    }

    public String getTag() {
      return tag;
    }

    public void setTag(String tag) {
      this.tag = tag;
    }
  }
}
AI 代码解读

当然,别忘了推荐服务是根据用户画像来推荐视频,所以这里我们还需要从redis中获取用户画像,那么我们声明一个redis的工具类来获取用户画像,如下:

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Repository;

import java.util.Map;


@Repository
public class RedisUserProfileRepository {
   

  @Autowired
  private StringRedisTemplate redisTemplate;  // 用于操作 Redis

  @Autowired
  private ObjectMapper objectMapper;  // 用于 JSON 序列化/反序列化

  private static final String USER_PROFILE_KEY_PREFIX = "user_profile:";

  /**
   * 从 Redis 中获取用户画像
   * @param userId 用户 ID
   * @return 用户画像(Map结构)
   */
  public Map<String, Object> getUserProfile(String userId) throws JsonProcessingException {
   
    String key = USER_PROFILE_KEY_PREFIX + userId;
    String jsonProfile = redisTemplate.opsForValue().get(key);  // 从 Redis 获取用户画像(JSON 字符串)
    if (jsonProfile != null) {
   
      return objectMapper.readValue(jsonProfile, Map.class);  // 将 JSON 转为 Map
    }
    return null;  // 如果 Redis 中没有用户画像,返回 null
  }
}
AI 代码解读

接着我们在主类里写推荐服务的主要逻辑,大家可以直接看代码的注释,根据用户画像进行推荐,没有则全部返回,如下:

import com.recommendation.client.VideoServiceClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients
@RestController
public class RecommendationServiceApplication {
   
    public static void main( String[] args ) {
   
        SpringApplication.run(RecommendationServiceApplication.class, args);
    }

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private VideoServiceClient videoServiceClient;

    @GetMapping("/recommendations/{userId}")
    public List<VideoServiceClient.Video> getRecommendations(@PathVariable("userId") String userId) {
   
        //从Redis获取用户画像,包括兴趣标签和观看历史
        String userProfileKey = "user:" + userId;
        //获取用户的兴趣标签
        Set<String> userInterests = redisTemplate.opsForSet().members(userProfileKey + ":interests");
        //获取用户的观看历史
        List<String> userHistory = redisTemplate.opsForList().range(userProfileKey + ":history", 0, -1);
        //如果用户画像不存在或用户没有兴趣标签,返回默认推荐列表
        if (userInterests.isEmpty() || userHistory.isEmpty()) {
   
            return videoServiceClient.getAllVideos();
        }
        //获取所有可用的视频列表
        List<VideoServiceClient.Video> allVideos = videoServiceClient.getAllVideos();
        //根据用户画像中的兴趣标签,推荐符合兴趣且用户未看过的视频
        return allVideos.stream()
                //筛选出用户未看过的视频
                .filter(video -> !userHistory.contains(video.getId()))
                //只推荐与用户画像中的兴趣标签匹配的视频
                .filter(video -> userInterests.contains(video.getTag()))
                .collect(Collectors.toList());
    }

}
AI 代码解读

创建数据处理服务

数据处理服务是整个系统运转的关键,一方面它需要接收用户行为数据,另一方面还需要对用户行为数据进行处理去更新用户画像,所以这个服务的主要逻辑也是围绕它们展开。老规矩,依然是先补充依赖和配置文件,如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.itasass</groupId>
        <artifactId>recommendation-system</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <artifactId>data-processing-service</artifactId>
    <packaging>jar</packaging>


    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>

        <!-- Spring Kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <!-- Redis -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <!-- Jackson -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

        <!-- Spring Boot Starter Web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Eureka Client -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>

    </dependencies>
</project>
AI 代码解读
server:
  port: 8084  # 服务运行的端口

spring:
  kafka:
    bootstrap-servers: localhost:9092  # Kafka broker 地址
    consumer:
      group-id: data-processing-group   # Kafka 消费者组 ID
      auto-offset-reset: earliest       # 消费者从最早的消息开始读取
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

  redis:
    host: 127.0.0.1
    port: 6379
AI 代码解读

接下来,通过一个kafka的监听接口来实现上述功能,我这里演示就随意了,大家用MQ传递消息还是用JSON格式,否则你解析很容易出问题,如下:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

@Service
public class KafkaConsumerService {
   

  @Autowired
  private DataProcessingService dataProcessingService;

  // 监听 Kafka 主题,消费用户行为数据
  @KafkaListener(topics = "user-behavior-topic", groupId = "data-processing-group")
  public void consumeUserBehavior(String message) {
   
    try {
   
      System.out.println("Received message: " + message);

      // 使用正则表达式提取信息
      String userId = extractValue(message, "User:(\d+)");
      String videoId = extractValue(message, "Video:(\d+)");
      String videoTag = extractValue(message, "Tag:([^\]]+)");

      if (userId != null && videoId != null && videoTag != null) {
   
        dataProcessingService.processUserBehavior(userId, videoId, videoTag);
      } else {
   
        System.err.println("Failed to parse message: " + message);
      }
    } catch (Exception e) {
   
      System.err.println("Error processing message: " + message);
      e.printStackTrace();
    }
  }

  private String extractValue(String message, String pattern) {
   
    Pattern r = Pattern.compile(pattern);
    Matcher m = r.matcher(message);
    return m.find() ? m.group(1) : null;
  }

}
AI 代码解读

在数据处理服务中,我们需要处理消息并更新用户画像,如下:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

@Service
public class DataProcessingService {

  @Autowired
  private StringRedisTemplate redisTemplate;

  private static final String USER_PROFILE_KEY_PREFIX = "user:";
  private static final String USER_HISTORY_KEY = "history";
  private static final String USER_INTERESTS_KEY = "interests";

  /**
   * 处理用户行为数据:更新 Redis 中的用户画像
   * @param userId 用户 ID
   * @param videoId 视频 ID
   * @param videoTag 视频的标签
   */
  public void processUserBehavior(String userId, String videoId, String videoTag) {
    //1.更新用户的观看历史记录
    updateUserHistory(userId, videoId);
    //2.更新用户的兴趣标签
    updateUserInterests(userId, videoTag);
  }

  /**
   * 更新用户的观看历史记录
   * @param userId 用户 ID
   * @param videoId 视频 ID
   */
  private void updateUserHistory(String userId, String videoId) {
    String userProfileKey = USER_PROFILE_KEY_PREFIX + userId;
    redisTemplate.opsForList()
            .leftPush(userProfileKey + ":" + USER_HISTORY_KEY, videoId);
  }

  /**
   * 更新用户的兴趣标签
   * @param userId 用户 ID
   * @param tag 视频的标签
   */
  private void updateUserInterests(String userId, String tag) {
    String userProfileKey = USER_PROFILE_KEY_PREFIX + userId;
    redisTemplate.opsForSet()
            .add(userProfileKey + ":" + USER_INTERESTS_KEY, tag);
  }

}
AI 代码解读

至此,我们的这个简版推荐系统就搭建完成了,可以看到eureka的控制台上四大业务服务都已经注册到了,如下:
12.png

五、业务测试

完成了系统的开发后,我们现在开始测试,首先我们先访问推荐服务,按照我们的逻辑:由于是第一次访问还没有形成画像,此时应该返回所有视频,如下:
13.png
可以看到此时返回了我们模拟的所有8个视频,那么假定我这个用户对科技类视频感兴趣,假如我看了id为3的视频并点赞了,那么下次推荐服务应该只返回id=5和8的视频。我们先去调用一次记录用户观看视频的行为的接口,如下:
14.png
然后我们去看下reids里是否已经有用户画像了,如下:
15.png
16.png
那么这时候我们再去调用推荐服务,就会得到我们上面期望的结果了,如下:
17.png
我们的推荐系统通过多个服务的协作成功实现了预期功能,但是按照现在的实现,如果其中一个服务挂了,整个推荐系统就不可用了,显然这是我们不期望发生的,所以我们就需要考虑做服务降级,确保即使某个服务出问题,不影响整个系统的使用。

六、服务降级

我们在推荐服务的主类接口里有调用redis来获取用户画像,就以它为例,因为我们用的springcloud,就直接用配套的resilience4j来实现,依然是先引入依赖和补充配置,如下:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-circuitbreaker-resilience4j</artifactId>
</dependency>
AI 代码解读
resilience4j:
  circuitbreaker:
    instances:
      redisService:  # 熔断器名称,与 @CircuitBreaker 注解中的 name 属性对应
        slidingWindowSize: 10  # 熔断器的滑动窗口大小,用于计算失败率的调用数量
        failureRateThreshold: 50  # 失败率阈值,50% 代表当失败率达到 50% 时开启熔断
        waitDurationInOpenState: 10000  # 熔断器在打开状态下保持的时间(毫秒),即熔断器打开后等待 10 秒再尝试进入半开状态
        permittedNumberOfCallsInHalfOpenState: 3  # 半开状态下允许的最大调用次数,用于测试服务是否恢复
        minimumNumberOfCalls: 5  # 统计失败率所需的最小调用次数,至少进行 5 次调用后熔断器才会判断是否开启熔断
        automaticTransitionFromOpenToHalfOpenEnabled: true  # 允许自动从打开状态过渡到半开状态,尝试恢复服务
        slowCallDurationThreshold: 2000  # 慢调用的阈值,2 秒内完成的调用视为正常,超过 2 秒的调用视为慢调用
        slowCallRateThreshold: 50  # 慢调用比例阈值,50% 代表当慢调用比例达到 50% 时开启熔断
AI 代码解读

然后我们改造原推荐接口,加上注解和降级方法,主要逻辑是当redis连接不上时,直接调视频服务的接口获取所有视频返回,代码如下:

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private VideoServiceClient videoServiceClient;

    private static final String REDIS_BACKEND = "redisService"; // 定义熔断器名称

    @GetMapping("/recommendations/{userId}")
    @CircuitBreaker(name = REDIS_BACKEND, fallbackMethod = "getAllVideosFallback") // 添加熔断器
    public List<VideoServiceClient.Video> getRecommendations(@PathVariable("userId") String userId) {
   
        //从Redis获取用户画像,包括兴趣标签和观看历史
        String userProfileKey = "user:" + userId;
        //获取用户的兴趣标签
        Set<String> userInterests = redisTemplate.opsForSet().members(userProfileKey + ":interests");
        //获取用户的观看历史
        List<String> userHistory = redisTemplate.opsForList().range(userProfileKey + ":history", 0, -1);
        //如果用户画像不存在或用户没有兴趣标签,返回默认推荐列表
        if (userInterests.isEmpty() || userHistory.isEmpty()) {
   
            return videoServiceClient.getAllVideos();
        }
        //获取所有可用的视频列表
        List<VideoServiceClient.Video> allVideos = videoServiceClient.getAllVideos();
        //根据用户画像中的兴趣标签,推荐符合兴趣且用户未看过的视频
        return allVideos.stream()
                //筛选出用户未看过的视频
                .filter(video -> !userHistory.contains(String.valueOf(video.getId())))
                //只推荐与用户画像中的兴趣标签匹配的视频
                .filter(video -> userInterests.contains(String.valueOf(video.getTag())))
                .collect(Collectors.toList());
    }

    // 降级方法,当 Redis 连接失败时调用此方法
    public List<VideoServiceClient.Video> getAllVideosFallback(Throwable ex) {
   
        // 打印日志,记录异常信息
        System.err.println("Redis is unavailable. Returning fallback recommendation. Exception: " + ex.getMessage());

        // 在降级方法中调用 Feign 客户端返回默认的视频列表
        return videoServiceClient.getAllVideos();
    }
}
AI 代码解读

然后我们把Redis停掉,如下:
18.png
这时候我们再调用推荐接口,分别观察控制台有输出报错信息提示我们redis连接失败以及接口返回了所有视频数据,如下:
19.png
20.png
我们这里模拟了redis挂掉的情况,但其实熔断降级机制只要是需要调用别的服务的地方,我们都可以做相关处理来提升可用性。

七、小结

本文基于Spring Cloud实现了一个简单的推荐系统,当然真实的推荐系统远比这复杂,涉及到很多算法,例如协同过滤、用户行为等等。这篇文章的重点是带大家建立微服务的设计思想:怎么围绕业务去做服务拆解,怎么通过Spring Cloud去构建微服务项目以及服务的熔断降级处理,并最终希望让大家在看完后能够有所收获!

目录
打赏
0
17
17
0
162
分享
相关文章
飞算 JavaAI:革新电商订单系统 Spring Boot 微服务开发
在电商订单系统开发中,传统方式耗时约30天,需应对复杂代码、调试与测试。飞算JavaAI作为一款AI代码生成工具,专注于简化Spring Boot微服务开发。它能根据业务需求自动生成RESTful API、数据库交互及事务管理代码,将开发时间缩短至1小时,效率提升80%。通过减少样板代码编写,提供规范且准确的代码,飞算JavaAI显著降低了开发成本,为软件开发带来革新动力。
深入理解 Spring Boot 中日期时间格式化:@DateTimeFormat 与 @JsonFormat 完整实践
在 Spring Boot 开发中,日期时间格式化是前后端交互的常见痛点。本文详细解析了 **@DateTimeFormat** 和 **@JsonFormat** 两个注解的用法,分别用于将前端传入的字符串解析为 Java 时间对象,以及将时间对象序列化为指定格式返回给前端。通过完整示例代码,展示了从数据接收、业务处理到结果返回的全流程,并总结了解决时区问题和全局配置的最佳实践,助你高效处理日期时间需求。
300 0
Spring Boot 注册登录系统:问题总结与优化实践
在Spring Boot开发中,注册登录模块常面临数据库设计、密码加密、权限配置及用户体验等问题。本文以便利店销售系统为例,详细解析四大类问题:数据库字段约束(如默认值缺失)、密码加密(明文存储风险)、Spring Security配置(路径权限不当)以及表单交互(数据丢失与提示不足)。通过优化数据库结构、引入BCrypt加密、完善安全配置和改进用户交互,提供了一套全面的解决方案,助力开发者构建更 robust 的系统。
111 0
Linux多节点多硬盘部署MinIO:分布式MinIO集群部署指南搭建高可用架构实践
通过以上步骤,已成功基于已有的 MinIO 服务,扩展为一个 MinIO 集群。该集群具有高可用性和容错性,适合生产环境使用。如果有任何问题,请检查日志或参考MinIO 官方文档。作者联系方式vx:2743642415。
585 57
一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
本文详细介绍了分布式消息中间件RocketMQ的核心概念、部署方式及使用方法。RocketMQ由阿里研发并开源,具有高性能、高可靠性和分布式特性,广泛应用于金融、互联网等领域。文章从环境搭建到消息类型的实战(普通消息、延迟消息、顺序消息和事务消息)进行了全面解析,并对比了三种消费者类型(PushConsumer、SimpleConsumer和PullConsumer)的特点与适用场景。最后总结了使用RocketMQ时的关键注意事项,如Topic和Tag的设计、监控告警的重要性以及性能与可靠性的平衡。通过学习本文,读者可掌握RocketMQ的使用精髓并灵活应用于实际项目中。
585 7
 一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
千万级数据秒级响应!碧桂园基于 EMR Serverless StarRocks 升级存算分离架构实践
碧桂园服务通过引入 EMR Serverless StarRocks 存算分离架构,解决了海量数据处理中的资源利用率低、并发能力不足等问题,显著降低了硬件和运维成本。实时查询性能提升8倍,查询出错率减少30倍,集群数据 SLA 达99.99%。此次技术升级不仅优化了用户体验,还结合AI打造了“一看”和“—问”智能场景助力精准决策与风险预测。
328 69
阿里云SLB深度解析:从流量分发到架构优化的技术实践
本文深入探讨了阿里云负载均衡服务(SLB)的核心技术与应用场景,从流量分配到架构创新全面解析其价值。SLB不仅是简单的流量分发工具,更是支撑高并发、保障系统稳定性的智能中枢。文章涵盖四层与七层负载均衡原理、弹性伸缩引擎、智能DNS解析等核心技术,并结合电商大促、微服务灰度发布等实战场景提供实施指南。同时,针对性能调优与安全防护,分享连接复用优化、DDoS防御及零信任架构集成的实践经验,助力企业构建面向未来的弹性架构。
329 76
|
28天前
|
微信读书十周年,后台架构的技术演进和实践总结
微信读书经过了多年的发展,赢得了良好的用户口碑,后台系统的服务质量直接影响着用户的体验。团队多年来始终保持着“小而美”的基因,快速试错与迭代成为常态。后台团队在日常业务开发的同时,需要主动寻求更多架构上的突破,提升后台服务的可用性、扩展性,以不断适应业务与团队的变化。
56 0
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问