springboot整合SSE技术开发经验总结及心得

简介: springboot整合SSE技术开发经验总结及心得

一、开发背景

公司需要开发一个大屏界面,大屏页面的数据是实时更新的,由后端主动实时推送数据给大屏页面。此时会立刻联想到:websocket 技术。当然使用websocket,确实可以解决这个场景。但是今天本文的主角是 :SSE,他和websocket略有不同,SSE只能由服务端主动发消息,而websocket前后端都可以推送消息。

二、快速了解SSE

1、概念

SSE全称 Server Sent Event,顾名思义,就是服务器发送事件,所以也就注定了他 只能由服务端发送信息。

2、特性

  • 主动从服务端推送消息的技术
  • 本质是一个HTTP的长连接
  • 发送的是一个stream流,格式为text/event-stream

三、开发思路

要实现后端的实时推送消息,前台实时更新数据,思路如下:

  • 1、前后端需要建立连接
  • 2、后端如何做到实时推送信息呢?可以采用定时调度

四、代码演示

1、引入依赖

原则上是不需要引入的,因为springboot底层已经整合了SSE

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

2、服务端代码

controller层

@RestController
@CrossOrigin
@RequestMapping("/sse")
public class SseEmitterController extends BaseController {
    @Autowired
    private SseEmitterService sseEmitterService;
    /**
     * 创建SSE连接
     *
     * @return
     */
    @GetMapping("/connect/{type}")
    public SseEmitter connect(@PathVariable("type") String type) {
        return sseEmitterService.connect(type);
    }
}

service层

public interface SseEmitterService {
    SseEmitter connect(String type);
    void volumeOverview();
    void sysOperation();
    void monitor();
    ........
}

service实现层

@Service
public class SseEmitterServiceImpl implements SseEmitterService {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private static Map<String, SseEmitterUTF8> sseCache = new ConcurrentHashMap<>();
    /**
     * 创建连接sse
     * @param type
     * @return
     */
    @Override
    public SseEmitter connect(String type) {
        final String clientId = UUID.randomUUID().toString().replace("-", "");
        SseEmitterUTF8 sseEmitter = new SseEmitterUTF8(0L);
        try {
            sseEmitter.send(SseEmitter.event().comment("创建连接成功 !!!"));
        } catch (IOException e) {
            logger.error("创建连接失败 , {} " , e.getMessage());
        }
        sseEmitter.onCompletion(() -> {
            logger.info("connect onCompletion , {} 结束连接 ..." , clientId);
            removeClient(clientId);
        });
        sseEmitter.onTimeout(() -> {
            logger.info("connect onTimeout , {} 连接超时 ..." , clientId);
            removeClient(clientId);
        });
        sseEmitter.onError((throwable) -> {
            logger.error("connect onError , {} 连接异常 ..." , clientId);
            removeClient(clientId);
        });
        sseCache.put(clientId, sseEmitter);
        //立即推送
        volumeOverview();
        dealResp();
        monitor();
        if (type.equals(SseEmitterConstant.OVER_VIEW)){
            sysOperation();
            mileStone();
        }
        logger.info("当前用户总连接数 : {} " , sseCache.size());
        return sseEmitter;
    }
    /**
     * 交易量概览
     */
    @Override
    public void volumeOverview() {
        Map<String,Object> map = new HashMap<>();
        map.put("latest_tps",440.3);
        map.put("total_cics_trans",341656001);
        map.put("total_zjcx_trans",391656001);
        map.put("zjcx_tps",23657);
        map.put("day10",48388352);
        map.put("history",105013985);
        SseEmitter.SseEventBuilder data = SseEmitter.event().name(SseEmitterConstant.VOLUME_OVERVIEW).data(map, MediaType.APPLICATION_JSON);
        for (Map.Entry<String, SseEmitterUTF8> entry : sseCache.entrySet()) {
            SseEmitterUTF8 sseEmitter = entry.getValue();
            if (sseEmitter == null) {
                continue;
            }
            try {
                sseEmitter.send(data);
            } catch (IOException e) {
                String body = "SseEmitterServiceImpl[volumeOverview  ]";
                logger.error(body + ": 向客户端 {} 推送消息失败 , 尝试进行重推 : {}", entry.getKey() ,e.getMessage());
                messageRepush(entry.getKey(),data,body);
            }
        }
    }
    private void messageRepush(String type, SseEmitter.SseEventBuilder data,String body){
        for (int i = 0; i < 3; i++) {
            try {
                Thread.sleep(2000);
                SseEmitterUTF8 sseEmitter = sseCache.get(type);
                if (sseEmitter == null) {
                    logger.error(body + " :向客户端{} 第{}次消息重推失败,未创建长链接", type, i + 1);
                    continue;
                }
                sseEmitter.send(data);
            } catch (Exception ex) {
                logger.error(body + " :向客户端{} 第{}次消息重推失败", type, i + 1, ex);
                continue;
            }
            logger.info(body + " :向客户端{} 第{}次消息重推成功", type, i + 1);
            return;
        }
    }

常量类

public class SseEmitterConstant {
    /**
     * 创建连接的客户端类型
     */
    public static final String OVER_VIEW = "overview";
    /**
     * even 数据类型
     */
    public static final String VOLUME_OVERVIEW = "vw";
    public SseEmitterConstant(){}
}

3、后端定时任务代码

采用注解的方式实现:@Scheduled,使用该注解时,需要增加这个注解@EnableScheduling,相当于来开启定时调度功能,如果不加@EnableScheduling注解,那么定时调度会不生效的。

启动类增加注解@EnableScheduling

package com.hidata;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication(exclude = { DataSourceAutoConfiguration.class })
@EnableScheduling
public class HidataApplication {
    public static void main(String[] args)
    {
        SpringApplication.run(HidataApplication.class, args);
        System.out.println("[HiUrlShorter platform startup!]");
    }
}

创建 定时任务调度类,在该类上加上@Scheduled注解,

@Configuration
public class SendMessageTask{
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    private SseEmitterService sseEmitterService;
    @Scheduled(cron = "0/40 * * * * ?}")
    public void volumeOverviewTask() {
        try {
            sseEmitterService.volumeOverview();
        } catch (Exception e) {
            logger.error("SendMessageTask [volumeOverviewTask]: {} ",e.getMessage());
        }
    }
.......
}

4、解决乱码的实体类

如果发送中文数据的时候,会出现乱码的现象。此时需要做对应的处理

package com.hidata.devops.lagrescreen.domain;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.nio.charset.StandardCharsets;
public class SseEmitterUTF8 extends SseEmitter {
    public SseEmitterUTF8(Long timeout) {
        super(timeout);
    }
    @Override
    protected void extendResponse(ServerHttpResponse outputMessage) {
        super.extendResponse(outputMessage);
        HttpHeaders headers = outputMessage.getHeaders();
        headers.setContentType(new MediaType(MediaType.TEXT_EVENT_STREAM, StandardCharsets.UTF_8));
    }
}

4、前端代码

// 连接服务器
    var sseSource = new EventSource("http://localhost:8080/sse/connect");
    // 连接打开
    sseSource.onopen = function () {
        console.log("连接打开");
    }
    // 连接错误
    sseSource.onerror = function (err) {
        console.log("连接错误:", err);
    }
  //接收信息
    eventSource.addEventListener("vw", function (event) {
    console.log(event.data);
    .....
  });

五、核心代码分析

先看代码片段

SseEmitter.event().name("vw").data(map, MediaType.APPLICATION_JSON);

分析:

后端不会把所有数据一起发送给前端,而是会把页面分成多个模块,然后发给前端,此时前端需要区分哪一块数据对应哪一块页面。所以我们可以给各个模块的数据起个名字。也就是上述的代码

SseEmitter.event().name("vw")

这样,前端就知道怎么渲染页面了,类似于这样

关于even()的属性,可以查看源码,

public interface SseEventBuilder {
        SseEmitter.SseEventBuilder id(String var1);
        SseEmitter.SseEventBuilder name(String var1);
        SseEmitter.SseEventBuilder reconnectTime(long var1);
        SseEmitter.SseEventBuilder comment(String var1);
        SseEmitter.SseEventBuilder data(Object var1);
        SseEmitter.SseEventBuilder data(Object var1, @Nullable MediaType var2);
        Set<DataWithMediaType> build();
    }

相关文章
|
1月前
|
存储 安全 前端开发
基于springboot技术的美食烹饪互动平台的设计与实现
基于springboot技术的美食烹饪互动平台的设计与实现
|
2月前
|
机器学习/深度学习 人工智能 自然语言处理
springboot基于人工智能和自然语言理解技术的医院智能导医系统源码
智能导诊系统可为患者提供线上挂号智能辅助服务,患者根据提示手动输入自己的基本症状,通过智能对话方式,该系统会依据大数据一步步帮助患者“诊断”,并最终推荐就医的科室和相关专家。患者可自主选择,实现“一键挂号”。这一模式将精确的导诊服务前置,从源头上让医疗服务更高效。
372 2
|
9天前
|
Java 关系型数据库 MySQL
一套java+ spring boot与vue+ mysql技术开发的UWB高精度工厂人员定位全套系统源码有应用案例
UWB (ULTRA WIDE BAND, UWB) 技术是一种无线载波通讯技术,它不采用正弦载波,而是利用纳秒级的非正弦波窄脉冲传输数据,因此其所占的频谱范围很宽。一套UWB精确定位系统,最高定位精度可达10cm,具有高精度,高动态,高容量,低功耗的应用。
一套java+ spring boot与vue+ mysql技术开发的UWB高精度工厂人员定位全套系统源码有应用案例
|
10天前
|
存储 数据可视化 安全
Java全套智慧校园系统源码springboot+elmentui +Quartz可视化校园管理平台系统源码 建设智慧校园的5大关键技术
智慧校园指的是以物联网为基础的智慧化的校园工作、学习和生活一体化环境,这个一体化环境以各种应用服务系统为载体,将教学、科研、管理和校园生活进行充分融合。无处不在的网络学习、融合创新的网络科研、透明高效的校务治理、丰富多彩的校园文化、方便周到的校园生活。简而言之,“要做一个安全、稳定、环保、节能的校园。
37 6
|
1月前
|
Kubernetes Cloud Native Devops
云原生技术落地实现之二KubeSphere DevOps 系统在 Kubernetes 集群上实现springboot项目的自动部署和管理 CI/CD (2/2)
云原生技术落地实现之二KubeSphere DevOps 系统在 Kubernetes 集群上实现springboot项目的自动部署和管理 CI/CD (2/2)
51 1
|
1月前
|
JavaScript 前端开发 Java
基于SpringBoot和VUE技术的智慧生活商城系统设计与实现
基于SpringBoot和VUE技术的智慧生活商城系统设计与实现
|
3月前
|
Java 数据库连接 mybatis
JAVA框架技术之十八节springboot课件上手教程(二)
JAVA框架技术之十八节springboot课件上手教程
83 0
JAVA框架技术之十八节springboot课件上手教程(二)
|
3月前
|
XML Java 数据库连接
JAVA框架技术之十八节springboot课件上手教程(一)
JAVA框架技术之十八节springboot课件上手教程
136 1
JAVA框架技术之十八节springboot课件上手教程(一)
|
3月前
|
前端开发 Java 应用服务中间件
物资管理|基于Spring Boot技术的防疫物资管理系统设计与实现
物资管理|基于Spring Boot技术的防疫物资管理系统设计与实现
|
3月前
|
Java 数据库连接 数据库
【Spring技术专题】「实战开发系列」保姆级教你SpringBoot整合Mybatis框架实现多数据源的静态数据源和动态数据源配置落地
Mybatis是一个基于JDBC实现的,支持普通 SQL 查询、存储过程和高级映射的优秀持久层框架,去掉了几乎所有的 JDBC 代码和参数的手工设置以及对结果集的检索封装。 Mybatis主要思想是将程序中大量的 SQL 语句剥离出来,配置在配置文件中,以实现 SQL 的灵活配置。在所有 ORM 框架中都有一个非常重要的媒介——PO(持久化对象),PO 的作用就是完成持久化操作,通过该对象对数据库执行增删改的操作,以面向对象的方式操作数据库。
45 1
【Spring技术专题】「实战开发系列」保姆级教你SpringBoot整合Mybatis框架实现多数据源的静态数据源和动态数据源配置落地