一、WebSocket简介
WebSocket网上很多教程,这里不详细描述。简单来说:WebSocket协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。
二、SpringBoot实现WebSocket
maven依赖如下
<!--Web项目必须加上--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!-- springboot websocket --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!--Kafka依赖包--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>2.3.0</version> </dependency>
编写以下代码启用WebSocket
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; /** * @author 李奇峰 * 2019年5月10日11:08:22 * websocket的配置 */ @Configuration public class WebSocketStompConfig{ @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
编写WebSockerServer类
此类中的session连接会话全都保存在了一个静态的Map对象websocketClients 中,在开启连接时将连接会话根据连接名保存在此Map中,方便后续Kafka发送消息时进行全局调用。
package com.fsl.springbootwebsocket.config; import com.alibaba.fastjson.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * @author 2019年5月10日11:08:16 */ @Component @ServerEndpoint("/websocket/{socketname}") public class WebSocketServer { private Logger logger = LoggerFactory.getLogger(this.getClass()); /** * 以通道名称为key,连接会话为对象保存起来 */ public static Map<String, Session> websocketClients = new ConcurrentHashMap<String, Session>(); /** * 会话 */ private Session session; /** * 通道名称 */ private String socketname; /** * 发送消息到指定连接 * @param socketname 连接名 * @param jsonString 消息 */ public static void sendMessage(String socketname,String jsonString){ Session nowsession = websocketClients.get(socketname); if(nowsession!=null){ try { nowsession.getBasicRemote().sendText(jsonString); } catch (IOException e) { e.printStackTrace(); } } } @OnOpen public void onOpen(@PathParam("socketname") String socketname, Session session) { this.socketname = socketname; this.session = session; if(websocketClients.get(socketname)==null){ websocketClients.put(socketname, session); System.out.println("当前socket通道"+socketname+"已加入连接"); } } @OnError public void onError(Session session, Throwable error) { logger.info("服务端发生了错误"+error.getMessage()); } /** * 连接关闭 */ @OnClose public void onClose() { websocketClients.remove(socketname); System.out.println("当前socket通道"+socketname+"已退出连接"); } /** * 收到客户端的消息 * * @param message 消息 * @param session 会话 */ @OnMessage public void onMessage(String message, Session session){ System.out.println("当前收到了消息:"+message); session.getAsyncRemote().sendText("来自服务器:"+this.socketname+"你的消息我收到啦"); }; /** * 向所有连接主动推送消息 * @param jsonObject 消息体 * @throws IOException */ public void sendMessageAll(JSONObject jsonObject) throws IOException { for (Session item : websocketClients.values()) { item.getAsyncRemote().sendText(jsonObject.toJSONString()); } } }
三、Kafka实现
此消费者在消费消息时,会调用WebSockerServer类中的sendMessage函数,将消息发送到websocket中
此类继承了Thread类,因为Kafka运行时会一直监听通道中的消息,为了避免进程阻塞,我们将其作为单独的线程来运行
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.*; import static com.fsl.springbootwebsocket.config.WebSocketServer.sendMessage; public class SocketConsumer extends Thread { @Override public void run(){ Properties prop = new Properties(); System.out.println("启动kafka消费者...."); prop.put("bootstrap.servers","cdh3:9092"); prop.put("group.id","socket"); prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //如果是之前存在的group.id Consumer consumer = new KafkaConsumer(prop); consumer.subscribe(Arrays.asList("zeek_test")); while (true) { ConsumerRecords<String, String> c = consumer.poll(100); for(ConsumerRecord<String, String> c1: c) { System.out.println(c1.value()); sendMessage("socket",c1.value()); } } } }
在此类在SpringBoot注册并启动
import org.springframework.stereotype.Component; @Component public class ConsumerLinster { public ConsumerLinster(){ System.out.println("启用Kafka监听器"); SocketConsumer socketConsumer = new SocketConsumer(); socketConsumer.start(); System.out.println("Kafka监听器启用成功"); } }
此项目整体的目录结构如下图所示
四、测试
将此项目运行后,打开http://www.websocket-test.com/此网址即可进行在线测试