Kafka中数据通过SpringBoot-WebSocket进行实时数据可视化

简介: Kafka中数据通过SpringBoot-WebSocket进行实时数据可视化
最近需要做一个网络流量的实时可视化,决定采用Kafka+WebSocket的方式实现。

一、WebSocket简介

WebSocket网上很多教程,这里不详细描述。简单来说:WebSocket协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。

二、SpringBoot实现WebSocket

maven依赖如下

        <!--Web项目必须加上-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <!-- springboot websocket -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        
        <!--Kafka依赖包-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>2.3.0</version>
        </dependency>

编写以下代码启用WebSocket

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @author 李奇峰
 * 2019年5月10日11:08:22
 * websocket的配置
 */
@Configuration
public class WebSocketStompConfig{
    @Bean
    public ServerEndpointExporter serverEndpointExporter()
    {
        return new ServerEndpointExporter();
    }
}

编写WebSockerServer类

此类中的session连接会话全都保存在了一个静态的Map对象websocketClients 中,在开启连接时将连接会话根据连接名保存在此Map中,方便后续Kafka发送消息时进行全局调用。

package com.fsl.springbootwebsocket.config;

import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author 2019年5月10日11:08:16
 */
@Component
@ServerEndpoint("/websocket/{socketname}")
public class WebSocketServer {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    /**
     * 以通道名称为key,连接会话为对象保存起来
     */
    public static Map<String, Session> websocketClients = new ConcurrentHashMap<String, Session>();
    /**
     * 会话
     */
    private Session session;
    /**
     * 通道名称
     */
    private String socketname;

    /**
     * 发送消息到指定连接
     * @param socketname 连接名
     * @param jsonString 消息
     */
    public static void sendMessage(String socketname,String jsonString){
        Session nowsession = websocketClients.get(socketname);
        if(nowsession!=null){
            try {
                nowsession.getBasicRemote().sendText(jsonString);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    @OnOpen
    public void onOpen(@PathParam("socketname") String socketname, Session session)
    {

        this.socketname = socketname;
        this.session = session;
        if(websocketClients.get(socketname)==null){
            websocketClients.put(socketname, session);
            System.out.println("当前socket通道"+socketname+"已加入连接");
        }
    }

    @OnError
    public void onError(Session session, Throwable error) {
        logger.info("服务端发生了错误"+error.getMessage());
    }
    /**
     * 连接关闭
     */
    @OnClose
    public void onClose()
    {
        websocketClients.remove(socketname);
        System.out.println("当前socket通道"+socketname+"已退出连接");

    }

    /**
     * 收到客户端的消息
     *
     * @param message 消息
     * @param session 会话
     */
    @OnMessage
    public void onMessage(String message, Session session){
        System.out.println("当前收到了消息:"+message);
        session.getAsyncRemote().sendText("来自服务器:"+this.socketname+"你的消息我收到啦");
    };

    /**
     * 向所有连接主动推送消息
     * @param jsonObject 消息体
     * @throws IOException
     */
    public void sendMessageAll(JSONObject jsonObject) throws IOException {
        for (Session item : websocketClients.values()) {
            item.getAsyncRemote().sendText(jsonObject.toJSONString());
        }
    }

}

三、Kafka实现

此消费者在消费消息时,会调用WebSockerServer类中的sendMessage函数,将消息发送到websocket中
此类继承了Thread类,因为Kafka运行时会一直监听通道中的消息,为了避免进程阻塞,我们将其作为单独的线程来运行

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.*;

import static com.fsl.springbootwebsocket.config.WebSocketServer.sendMessage;

public class SocketConsumer extends Thread {

    @Override
    public void run(){
        Properties prop = new Properties();
        System.out.println("启动kafka消费者....");
        prop.put("bootstrap.servers","cdh3:9092");
        prop.put("group.id","socket");
        prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//如果是之前存在的group.id
        Consumer consumer = new KafkaConsumer(prop);
        consumer.subscribe(Arrays.asList("zeek_test"));
        while (true) {
            ConsumerRecords<String, String> c = consumer.poll(100);
            for(ConsumerRecord<String, String> c1: c) {
                System.out.println(c1.value());
                sendMessage("socket",c1.value());
            }
        }
    }
}

在此类在SpringBoot注册并启动

import org.springframework.stereotype.Component;

@Component
public class ConsumerLinster  {
    public ConsumerLinster(){
        System.out.println("启用Kafka监听器");
        SocketConsumer socketConsumer = new SocketConsumer();
        socketConsumer.start();
        System.out.println("Kafka监听器启用成功");
    }
}

此项目整体的目录结构如下图所示
在这里插入图片描述

四、测试

将此项目运行后,打开http://www.websocket-test.com/此网址即可进行在线测试
在这里插入图片描述

目录
相关文章
|
1月前
|
Java
Springboot 导出word,动态填充表格数据
Springboot 导出word,动态填充表格数据
|
1月前
|
消息中间件 Java Kafka
Springboot集成高低版本kafka
Springboot集成高低版本kafka
|
2月前
|
JSON JavaScript 前端开发
解决js中Long类型数据在请求与响应过程精度丢失问题(springboot项目中)
解决js中Long类型数据在请求与响应过程精度丢失问题(springboot项目中)
45 0
|
2月前
|
存储 搜索推荐 Java
|
1月前
|
消息中间件 存储 算法
深入了解Kafka的数据持久化机制
深入了解Kafka的数据持久化机制
41 0
|
3天前
|
SQL Java 数据库
springboot用户创建的业务数据只能是同一组织能看的见
springboot用户创建的业务数据只能是同一组织能看的见
|
11天前
|
JSON JavaScript Java
从前端Vue到后端Spring Boot:接收JSON数据的正确姿势
从前端Vue到后端Spring Boot:接收JSON数据的正确姿势
22 0
|
12天前
|
SQL Java 数据库连接
Springboot框架整合Spring JDBC操作数据
JDBC是Java数据库连接API,用于执行SQL并访问多种关系数据库。它包括一系列Java类和接口,用于建立数据库连接、创建数据库操作对象、定义SQL语句、执行操作并处理结果集。直接使用JDBC涉及七个步骤,包括加载驱动、建立连接、创建对象、定义SQL、执行操作、处理结果和关闭资源。Spring Boot的`spring-boot-starter-jdbc`简化了这些步骤,提供了一个在Spring生态中更便捷使用JDBC的封装。集成Spring JDBC需要添加相关依赖,配置数据库连接信息,并通过JdbcTemplate进行数据库操作,如插入、更新、删除和查询。
|
12天前
|
SQL Java 数据库连接
Springboot框架整合Spring Data JPA操作数据
Spring Data JPA是Spring基于ORM和JPA规范封装的框架,简化了数据库操作,提供增删改查等接口,并可通过方法名自动生成查询。集成到Spring Boot需添加相关依赖并配置数据库连接和JPA设置。基础用法包括定义实体类和Repository接口,通过Repository接口可直接进行数据操作。此外,JPA支持关键字查询,如通过`findByAuthor`自动转换为SQL的`WHERE author=?`查询。
|
19天前
|
Java 数据库连接 数据库
【SpringBoot系列】微服务数据持久化方案
【4月更文挑战第8天】微服务数据持久化方案Spring Boot JPA + Hiberate

热门文章

最新文章