Kafka中数据通过SpringBoot-WebSocket进行实时数据可视化

简介: Kafka中数据通过SpringBoot-WebSocket进行实时数据可视化
最近需要做一个网络流量的实时可视化,决定采用Kafka+WebSocket的方式实现。

一、WebSocket简介

WebSocket网上很多教程,这里不详细描述。简单来说:WebSocket协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。

二、SpringBoot实现WebSocket

maven依赖如下

        <!--Web项目必须加上-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <!-- springboot websocket -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        
        <!--Kafka依赖包-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>2.3.0</version>
        </dependency>

编写以下代码启用WebSocket

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @author 李奇峰
 * 2019年5月10日11:08:22
 * websocket的配置
 */
@Configuration
public class WebSocketStompConfig{
    @Bean
    public ServerEndpointExporter serverEndpointExporter()
    {
        return new ServerEndpointExporter();
    }
}

编写WebSockerServer类

此类中的session连接会话全都保存在了一个静态的Map对象websocketClients 中,在开启连接时将连接会话根据连接名保存在此Map中,方便后续Kafka发送消息时进行全局调用。

package com.fsl.springbootwebsocket.config;

import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author 2019年5月10日11:08:16
 */
@Component
@ServerEndpoint("/websocket/{socketname}")
public class WebSocketServer {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    /**
     * 以通道名称为key,连接会话为对象保存起来
     */
    public static Map<String, Session> websocketClients = new ConcurrentHashMap<String, Session>();
    /**
     * 会话
     */
    private Session session;
    /**
     * 通道名称
     */
    private String socketname;

    /**
     * 发送消息到指定连接
     * @param socketname 连接名
     * @param jsonString 消息
     */
    public static void sendMessage(String socketname,String jsonString){
        Session nowsession = websocketClients.get(socketname);
        if(nowsession!=null){
            try {
                nowsession.getBasicRemote().sendText(jsonString);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    @OnOpen
    public void onOpen(@PathParam("socketname") String socketname, Session session)
    {

        this.socketname = socketname;
        this.session = session;
        if(websocketClients.get(socketname)==null){
            websocketClients.put(socketname, session);
            System.out.println("当前socket通道"+socketname+"已加入连接");
        }
    }

    @OnError
    public void onError(Session session, Throwable error) {
        logger.info("服务端发生了错误"+error.getMessage());
    }
    /**
     * 连接关闭
     */
    @OnClose
    public void onClose()
    {
        websocketClients.remove(socketname);
        System.out.println("当前socket通道"+socketname+"已退出连接");

    }

    /**
     * 收到客户端的消息
     *
     * @param message 消息
     * @param session 会话
     */
    @OnMessage
    public void onMessage(String message, Session session){
        System.out.println("当前收到了消息:"+message);
        session.getAsyncRemote().sendText("来自服务器:"+this.socketname+"你的消息我收到啦");
    };

    /**
     * 向所有连接主动推送消息
     * @param jsonObject 消息体
     * @throws IOException
     */
    public void sendMessageAll(JSONObject jsonObject) throws IOException {
        for (Session item : websocketClients.values()) {
            item.getAsyncRemote().sendText(jsonObject.toJSONString());
        }
    }

}

三、Kafka实现

此消费者在消费消息时,会调用WebSockerServer类中的sendMessage函数,将消息发送到websocket中
此类继承了Thread类,因为Kafka运行时会一直监听通道中的消息,为了避免进程阻塞,我们将其作为单独的线程来运行

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.*;

import static com.fsl.springbootwebsocket.config.WebSocketServer.sendMessage;

public class SocketConsumer extends Thread {

    @Override
    public void run(){
        Properties prop = new Properties();
        System.out.println("启动kafka消费者....");
        prop.put("bootstrap.servers","cdh3:9092");
        prop.put("group.id","socket");
        prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//如果是之前存在的group.id
        Consumer consumer = new KafkaConsumer(prop);
        consumer.subscribe(Arrays.asList("zeek_test"));
        while (true) {
            ConsumerRecords<String, String> c = consumer.poll(100);
            for(ConsumerRecord<String, String> c1: c) {
                System.out.println(c1.value());
                sendMessage("socket",c1.value());
            }
        }
    }
}

在此类在SpringBoot注册并启动

import org.springframework.stereotype.Component;

@Component
public class ConsumerLinster  {
    public ConsumerLinster(){
        System.out.println("启用Kafka监听器");
        SocketConsumer socketConsumer = new SocketConsumer();
        socketConsumer.start();
        System.out.println("Kafka监听器启用成功");
    }
}

此项目整体的目录结构如下图所示
在这里插入图片描述

四、测试

将此项目运行后,打开http://www.websocket-test.com/此网址即可进行在线测试
在这里插入图片描述

目录
相关文章
|
7月前
|
JSON Java 数据格式
微服务——SpringBoot使用归纳——Spring Boot返回Json数据及数据封装——封装统一返回的数据结构
本文介绍了在Spring Boot中封装统一返回的数据结构的方法。通过定义一个泛型类`JsonResult&lt;T&gt;`,包含数据、状态码和提示信息三个属性,满足不同场景下的JSON返回需求。例如,无数据返回时可设置默认状态码&quot;0&quot;和消息&quot;操作成功!&quot;,有数据返回时也可自定义状态码和消息。同时,文章展示了如何在Controller中使用该结构,通过具体示例(如用户信息、列表和Map)说明其灵活性与便捷性。最后总结了Spring Boot中JSON数据返回的配置与实际项目中的应用技巧。
568 0
|
7月前
|
JSON Java fastjson
微服务——SpringBoot使用归纳——Spring Boot返回Json数据及数据封装——使用 fastJson 处理 null
本文介绍如何使用 fastJson 处理 null 值。与 Jackson 不同,fastJson 需要通过继承 `WebMvcConfigurationSupport` 类并覆盖 `configureMessageConverters` 方法来配置 null 值的处理方式。例如,可将 String 类型的 null 转为 &quot;&quot;,Number 类型的 null 转为 0,避免循环引用等。代码示例展示了具体实现步骤,包括引入相关依赖、设置序列化特性及解决中文乱码问题。
346 0
|
7月前
|
JSON Java fastjson
微服务——SpringBoot使用归纳——Spring Boot返回Json数据及数据封装——Spring Boot 默认对Json的处理
本文介绍了在Spring Boot中返回Json数据的方法及数据封装技巧。通过使用`@RestController`注解,可以轻松实现接口返回Json格式的数据,默认使用的Json解析框架是Jackson。文章详细讲解了如何处理不同数据类型(如类对象、List、Map)的Json转换,并提供了自定义配置以应对null值问题。此外,还对比了Jackson与阿里巴巴FastJson的特点,以及如何在项目中引入和配置FastJson,解决null值转换和中文乱码等问题。
1013 0
|
3月前
|
JSON Java 数据格式
Spring Boot返回Json数据及数据封装
在Spring Boot中,接口间及前后端的数据传输通常使用JSON格式。通过@RestController注解,可轻松实现Controller返回JSON数据。该注解是Spring Boot新增的组合注解,结合了@Controller和@ResponseBody的功能,默认将返回值转换为JSON格式。Spring Boot底层默认采用Jackson作为JSON解析框架,并通过spring-boot-starter-json依赖集成了相关库,包括jackson-databind、jackson-datatype-jdk8等常用模块,简化了开发者对依赖的手动管理。
422 3
|
9月前
|
前端开发 Java API
SpringBoot整合Flowable【06】- 查询历史数据
本文介绍了Flowable工作流引擎中历史数据的查询与管理。首先回顾了流程变量的应用场景及其局限性,引出表单在灵活定制流程中的重要性。接着详细讲解了如何通过Flowable的历史服务API查询用户的历史绩效数据,包括启动流程、执行任务和查询历史记录的具体步骤,并展示了如何将查询结果封装为更易理解的对象返回。最后总结了Flowable提供的丰富API及其灵活性,为后续学习驳回功能做了铺垫。
636 0
SpringBoot整合Flowable【06】- 查询历史数据
|
人工智能 自然语言处理 前端开发
SpringBoot + 通义千问 + 自定义React组件:支持EventStream数据解析的技术实践
【10月更文挑战第7天】在现代Web开发中,集成多种技术栈以实现复杂的功能需求已成为常态。本文将详细介绍如何使用SpringBoot作为后端框架,结合阿里巴巴的通义千问(一个强大的自然语言处理服务),并通过自定义React组件来支持服务器发送事件(SSE, Server-Sent Events)的EventStream数据解析。这一组合不仅能够实现高效的实时通信,还能利用AI技术提升用户体验。
804 2
|
6月前
|
消息中间件 Java Kafka
Spring Boot整合kafka
本文简要记录了Spring Boot与Kafka的整合过程。首先通过Docker搭建Kafka环境,包括Zookeeper和Kafka服务的配置文件。接着引入Spring Kafka依赖,并在`application.properties`中配置生产者和消费者参数。随后创建Kafka配置类,定义Topic及重试机制。最后实现生产者发送消息和消费者监听消息的功能,支持手动ACK确认。此方案适用于快速构建基于Spring Boot的Kafka消息系统。
1135 7
|
7月前
|
前端开发 Cloud Native Java
Java||Springboot读取本地目录的文件和文件结构,读取服务器文档目录数据供前端渲染的API实现
博客不应该只有代码和解决方案,重点应该在于给出解决方案的同时分享思维模式,只有思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
Java||Springboot读取本地目录的文件和文件结构,读取服务器文档目录数据供前端渲染的API实现
|
8月前
|
Java 关系型数据库 MySQL
SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动
通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。
1923 45
|
7月前
|
消息中间件 Java Kafka
SpringBoot使用Kafka生产者、消费者
SpringBoot使用Kafka生产者、消费者
298 10