Kafka+WebSocket=实时数据大屏

简介: Kafka+WebSocket=实时数据大屏

一、WebSocket简介

WebSocket网上很多教程,这里不详细描述。简单来说:WebSocket协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。

二、SpringBoot实现WebSocket

maven依赖如下

<!--Web项目必须加上-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <!-- springboot websocket -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!--Kafka依赖包-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>2.3.0</version>
        </dependency>

编写以下代码启用WebSocket

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
 * @author 李奇峰
 * 2019年5月10日11:08:22
 * websocket的配置
 */
@Configuration
public class WebSocketStompConfig{
    @Bean
    public ServerEndpointExporter serverEndpointExporter()
    {
        return new ServerEndpointExporter();
    }
}

编写WebSockerServer类

此类中的session连接会话全都保存在了一个静态的Map对象websocketClients 中,在开启连接时将连接会话根据连接名保存在此Map中,方便后续Kafka发送消息时进行全局调用。

package com.fsl.springbootwebsocket.config;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
 * @author 2019年5月10日11:08:16
 */
@Component
@ServerEndpoint("/websocket/{socketname}")
public class WebSocketServer {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    /**
     * 以通道名称为key,连接会话为对象保存起来
     */
    public static Map<String, Session> websocketClients = new ConcurrentHashMap<String, Session>();
    /**
     * 会话
     */
    private Session session;
    /**
     * 通道名称
     */
    private String socketname;
    /**
     * 发送消息到指定连接
     * @param socketname 连接名
     * @param jsonString 消息
     */
    public static void sendMessage(String socketname,String jsonString){
        Session nowsession = websocketClients.get(socketname);
        if(nowsession!=null){
            try {
                nowsession.getBasicRemote().sendText(jsonString);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    @OnOpen
    public void onOpen(@PathParam("socketname") String socketname, Session session)
    {
        this.socketname = socketname;
        this.session = session;
        if(websocketClients.get(socketname)==null){
            websocketClients.put(socketname, session);
            System.out.println("当前socket通道"+socketname+"已加入连接");
        }
    }
    @OnError
    public void onError(Session session, Throwable error) {
        logger.info("服务端发生了错误"+error.getMessage());
    }
    /**
     * 连接关闭
     */
    @OnClose
    public void onClose()
    {
        websocketClients.remove(socketname);
        System.out.println("当前socket通道"+socketname+"已退出连接");
    }
    /**
     * 收到客户端的消息
     *
     * @param message 消息
     * @param session 会话
     */
    @OnMessage
    public void onMessage(String message, Session session){
        System.out.println("当前收到了消息:"+message);
        session.getAsyncRemote().sendText("来自服务器:"+this.socketname+"你的消息我收到啦");
    };
    /**
     * 向所有连接主动推送消息
     * @param jsonObject 消息体
     * @throws IOException
     */
    public void sendMessageAll(JSONObject jsonObject) throws IOException {
        for (Session item : websocketClients.values()) {
            item.getAsyncRemote().sendText(jsonObject.toJSONString());
        }
    }
}

三、Kafka实现

此消费者在消费消息时,会调用WebSockerServer类中的sendMessage函数,将消息发送到websocket中

此类继承了Thread类,因为Kafka运行时会一直监听通道中的消息,为了避免进程阻塞,我们将其作为单独的线程来运行

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.*;
import static com.fsl.springbootwebsocket.config.WebSocketServer.sendMessage;
public class SocketConsumer extends Thread {
    @Override
    public void run(){
        Properties prop = new Properties();
        System.out.println("启动kafka消费者....");
        prop.put("bootstrap.servers","cdh3:9092");
        prop.put("group.id","socket");
        prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//如果是之前存在的group.id
        Consumer consumer = new KafkaConsumer(prop);
        consumer.subscribe(Arrays.asList("zeek_test"));
        while (true) {
            ConsumerRecords<String, String> c = consumer.poll(100);
            for(ConsumerRecord<String, String> c1: c) {
                System.out.println(c1.value());
                sendMessage("socket",c1.value());
            }
        }
    }
}

在此类在SpringBoot注册并启动

import org.springframework.stereotype.Component;
@Component
public class ConsumerLinster  {
    public ConsumerLinster(){
        System.out.println("启用Kafka监听器");
        SocketConsumer socketConsumer = new SocketConsumer();
        socketConsumer.start();
        System.out.println("Kafka监听器启用成功");
    }
}

此项目整体的目录结构如下图所示

640.png

四、测试

将此项目运行后,打开http://www.websocket-test.com/此网址即可进行在线测试

640.png

相关文章
|
消息中间件 数据采集 运维
Kafka监控数据采集
上篇文章讲解了运维平台的整体设计,对各个部分并未深入介绍,今天将比较重要的一环——监控数据来源进行讲解。
499 0
|
消息中间件 数据可视化 关系型数据库
(3)sparkstreaming从kafka接入实时数据流最终实现数据可视化展示
1)我们通过kafka与各个业务系统的数据对接,将各系统中的数据实时接到kafka; 2)通过sparkstreaming接入kafka数据流,定义时间窗口和计算窗口大小,业务计算逻辑处理; 3)将结果数据写入到mysql; 4)通过可视化平台接入mysql数据库,这里使用的是NBI大数据可视化构建平台; 5)在平台上通过拖拽式构建各种数据应用,数据展示;
(3)sparkstreaming从kafka接入实时数据流最终实现数据可视化展示
|
1月前
|
消息中间件 数据采集 分布式计算
【数据采集与预处理】数据接入工具Kafka
【数据采集与预处理】数据接入工具Kafka
43 1
【数据采集与预处理】数据接入工具Kafka
|
1月前
|
消息中间件 存储 物联网
|
消息中间件 数据采集 存储
大数据数据采集的数据采集(收集/聚合)的Flume之数据采集流程的Channel的Kafka Channel
在大数据处理和管理中,数据采集是非常重要的一环。为了更加高效地进行数据采集,Flume作为一种流式数据采集工具得到了广泛的应用。其中,Flume的Channel模块是实现数据缓存和传输的核心模块之一。本文将介绍Flume中的Kafka Channel,讲解其数据采集流程。
154 0
|
8月前
|
消息中间件 运维 物联网
一文告诉你为什么时序场景下 TDengine 数据订阅比 Kafka 好
在本文中,TDengine 研发人员详细揭秘了 TDengine 数据订阅的流程和具体实现。
164 0
|
9月前
|
消息中间件 存储 负载均衡
流平台 Kafka
流平台 Kafka
64 0
|
消息中间件 存储 运维
基于 RocketMQ Connect 构建数据流转处理平台
基于 RocketMQ Connect 构建数据流转处理平台
基于 RocketMQ Connect 构建数据流转处理平台
|
消息中间件 存储 负载均衡
RocketMQ Connect 构建流式数据处理平台
RocketMQ Connect 作为 RocketMQ 与其他系统间流式数据传输的重要工具,轻松将 RocketMQ 与其他存储技术进行集成,并实现低延迟流/批处理。
356 1
RocketMQ Connect 构建流式数据处理平台
|
消息中间件 设计模式 缓存
Apache ShenYu 集成 RocketMQ 实时采集海量日志的实践
最佳实践 | 一起来了解 Apache ShenYu 如何集成 RocketMQ 实现日志的可观测性吧~
558 0
Apache ShenYu 集成 RocketMQ 实时采集海量日志的实践