欢迎来到我的博客,代码的世界里,每一行都是一个故事
@[TOC](springboot+netty化身Udp服务端,go化身客户端模拟设备实现指令联动) ## 🔗涉及链接
🔗:CompletableFuture探秘:解锁Java并发编程的新境界
前言
在通信的大舞台上,UDP是一位默默贡献的明星。而当它与Spring Boot和Netty联手,再搭配Go语言的模拟设备,将掀起异步通信的新篇章。今天,我们将一同踏入这个奇妙的领域,揭开Spring Boot和Netty在UDP通信中的神秘面纱。
异步通信的优势
异步通信具有许多优势,特别是在处理大量连接、高并发和I/O密集型操作时。
异步通信的优势:
- 高并发处理: 异步通信使得系统可以在一个线程中处理多个请求,提高了系统的并发处理能力,特别适用于高并发的网络应用场景。
- 资源节约: 相比于同步阻塞模型,异步通信可以减少线程的创建和管理,节省系统资源,提高系统的性能和可伸缩性。
- 响应性: 异步通信允许系统在处理请求的同时继续接受新的请求,提高了系统的响应性,用户在得到响应之前不需要一直等待。
- 非阻塞I/O: 异步通信中,I/O操作是非阻塞的,一个线程可以处理多个I/O操作,避免了线程在等待I/O完成时的阻塞。
异步通信的应用场景:
- 网络服务: 适用于网络服务,特别是需要高并发和低延迟的场景,如实时通信、在线游戏等。
- 大规模连接: 适用于需要处理大量连接的场景,如聊天服务器、消息推送服务器等。
- I/O密集型任务: 适用于处理大量I/O密集型任务,如文件操作、数据库操作等。
- 事件驱动: 适用于事件驱动的应用,如消息队列、日志系统等。
项目实现逻辑图
springboot与Netty结合
将Spring Boot与Netty结合是为了利用Netty的高性能网络通信能力,而Spring Boot则提供了便捷的开发和集成环境。下面是详细介绍如何搭建一个高效的UDP服务端,使用Spring Boot和Netty实现。
1. 添加依赖
首先,在Spring Boot项目的pom.xml
中添加Netty的依赖:
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.69.Final</version> <!-- 替换为最新版本 --> </dependency>
2. 创建UDP服务端
创建一个UDP服务端,使用Netty实现。下面是一个简单的示例:
package com.todoitbo.baseSpringbootDasmart.netty.server; import com.todoitbo.baseSpringbootDasmart.netty.handler.UdpHandler; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioDatagramChannel; import lombok.extern.slf4j.Slf4j; import java.net.InetSocketAddress; /** * @author todoitbo * @date 2023/11/29 */ @Slf4j public class NettyUdpServer { private final int nettyPort; public static Channel channel; public NettyUdpServer(int port) { this.nettyPort = port; } /** * 启动服务 * * @throws InterruptedException */ public void start() throws InterruptedException { // 连接管理线程池 EventLoopGroup mainGroup = new NioEventLoopGroup(2); EventLoopGroup workGroup = new NioEventLoopGroup(8); try { // 工作线程池 Bootstrap bootstrap = new Bootstrap(); bootstrap.group(mainGroup) // 指定 nio 通道,支持 UDP .channel(NioDatagramChannel.class) // 广播模式 .option(ChannelOption.SO_BROADCAST, true) // 设置读取缓冲区大小为 10M .option(ChannelOption.SO_RCVBUF, 1024 * 1024 * 10) // 设置发送缓冲区大小为 10M .option(ChannelOption.SO_SNDBUF, 1024 * 1024 * 10) // 线程池复用缓冲区 .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) // 指定 socket 地址和端口 .localAddress(new InetSocketAddress(nettyPort)) // 添加通道 handler .handler(new ChannelInitializer<NioDatagramChannel>() { @Override protected void initChannel(NioDatagramChannel nioDatagramChannel) throws Exception { nioDatagramChannel.pipeline() // 指定工作线程,提高并发性能 .addLast(workGroup,new UdpHandler()); } }); // 异步绑定服务器,调用sync()方法阻塞等待直到绑定完成 ChannelFuture sync = bootstrap.bind().sync(); channel = sync.channel(); log.info("---------- [init] UDP netty server start ----------"); // 阻塞等待服务器关闭 channel.closeFuture().sync(); } finally { // 释放资源 mainGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } }
3. 创建UDP消息处理器
创建一个简单的UDP消息处理器,用于处理接收到的消息,且使用CompletableFuture来实现异步收发
package com.todoitbo.baseSpringbootDasmart.netty.handler; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.socket.DatagramPacket; import java.net.InetSocketAddress; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import static com.todoitbo.baseSpringbootDasmart.controller.SysUploadController.socketAddressMap; /** * @author todoitbo * @date 2023/11/29 */ public class UdpHandler extends SimpleChannelInboundHandler<DatagramPacket> { // 使用 CompletableFuture 用于异步获取客户端的响应 public static CompletableFuture<String> responseFuture = new CompletableFuture<>(); @Override protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception { // 从DatagramPacket中获取数据和发送者信息 byte[] data; int len = packet.content().readableBytes(); if (packet.content().hasArray()) { data = packet.content().array(); } else { data = new byte[len]; packet.content().getBytes(packet.content().readerIndex(), data); } String senderAddress = packet.sender().getAddress().getHostAddress(); int senderPort = packet.sender().getPort(); // 处理接收到的数据 String message = new String(data); System.out.println("Received message from " + senderAddress + ":" + senderPort + " - " + message); if (message.contains("test")) { responseFuture.complete(message); } // 构建响应消息 String response = "Hello, client!"; byte[] responseData = response.getBytes(); // 创建响应的DatagramPacket并发送给发送者 InetSocketAddress senderSocketAddress = new InetSocketAddress(senderAddress, senderPort); socketAddressMap.put("test", senderSocketAddress); DatagramPacket responsePacket = new DatagramPacket(Unpooled.copiedBuffer(responseData), senderSocketAddress); ctx.writeAndFlush(responsePacket); } /*@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 处理异常情况 cause.printStackTrace(); ctx.close(); }*/ // 在接口调用后等待客户端响应的方法 public static String waitForClientResponse() { try { // 使用 CompletableFuture 的 get 方法来阻塞等待客户端的响应 String s = responseFuture.get(500, TimeUnit.MILLISECONDS); responseFuture = new CompletableFuture<>(); return s; // 等待时间为 1 秒 } catch (Exception e) { // 发生超时或其他异常,可以根据实际情况处理 return "456"; // 超时返回默认值 "456" } } }
⚠️:注意
在某些上下文中,将 CompletableFuture 声明为 public static 可行,但请注意这并不总是一个最佳实践。做出这个决定时需要考虑以下几点:
线程安全性 - CompletableFuture 是线程安全的,但是如果你在多个线程中设置其结果,你可能会遇到异常,因为 CompletableFuture 的结果只能被设置一次。
共享状态 - 任何可以访问这个 public static 变量的代码都可以改变其状态。这可能会导致你的代码难于理解和维护。
生命周期 - 这个 CompletableFuture 的生命周期与应用程序的生命周期一致,除非显式地设置为 null。 这可能在某些情况下会导致内存泄漏。
如果是为了协调或表示一个跨类或跨方法的异步操作的结果,使用 public static CompletableFuture 是可以接受的。但你需要意识到在静态上下文中共享的状态可能会导致的问题,并以适当的同步机制处理它们。
4. 在Spring Boot中集成UDP服务端
创建一个Spring Boot应用,并在应用启动时启动UDP服务端:
import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; @SpringBootApplication public class BaseSpringbootDasmartApplication { /* static { AspectLogEnhance.enhance();}//进行日志增强,自动判断日志框架*/ public static void main(String[] args) { // System.setProperty("log4j2.isThreadContextMapInheritable", Boolean.TRUE.toString()); SpringApplication.run(BaseSpringbootDasmartApplication.class, args); try { // new NettyWebsocketServer(13025).run(); new NettyUdpServer(13026).start(); } catch (Exception e) { throw new BusinessException("-----启动失败-----", e.getMessage()).setCause(e).setLog(); } } }
5.controller实现
@GetMapping("/login/{message}") public String login(@PathVariable String message) throws NacosException, InterruptedException { byte[] responseData = message.getBytes(); // 创建响应的DatagramPacket并发送给发送者 DatagramPacket responsePacket = new DatagramPacket(Unpooled.copiedBuffer(responseData), socketAddressMap.get("test")); NettyUdpServer.channel.writeAndFlush(responsePacket); // 客户端是否响应,响应返回传入值,否则返回456,响应时间不超过0.5s,如果10.5s还未响应,则返回456 return UdpHandler.waitForClientResponse(); }
Go语言模拟设备
下面是一个简单的Go语言程序,用于模拟UDP客户端,发送和接收指令。在这个例子中,我们使用Go的
net
包来处理UDP通信。下面的代码可以直接放到main中
// @Author todoitbo 2023/11/29 14:26:00 package utils import ( "context" "fmt" "net" "strings" "sync" ) // UDPClient 是一个简单的 UDP 客户端 type UDPClient struct { conn *net.UDPConn mu sync.Mutex } // NewUDPClient 创建一个新的 UDP 客户端 func NewUDPClient(serverAddr string) (*UDPClient, error) { client := &UDPClient{} addr, err := net.ResolveUDPAddr("udp", serverAddr) if err != nil { return nil, err } conn, err := net.DialUDP("udp", nil, addr) if err != nil { return nil, err } client.conn = conn message := []byte("你好") _, err = conn.Write(message) return client, nil } // Close 关闭 UDP 客户端连接 func (c *UDPClient) Close() { c.mu.Lock() defer c.mu.Unlock() if c.conn != nil { c.conn.Close() } } // ListenForMessages 启动 Goroutine 监听服务端的实时消息 func (c *UDPClient) ListenForMessages(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() // 在 Goroutine 结束时通知 WaitGroup buffer := make([]byte, 1024) for { select { case <-ctx.Done(): // 收到关闭信号,结束 Goroutine return default: c.mu.Lock() conn := c.conn c.mu.Unlock() if conn == nil { // 客户端连接已关闭 return } n, _, err := conn.ReadFromUDP(buffer) if err != nil { fmt.Println("Error reading from server:", err) return } // 处理收到的消息,可以根据实际需求进行逻辑处理 message := string(buffer[:n]) if strings.Contains(message, "test") { c.SendMessage(message) } // hexString := hex.EncodeToString(message) // 将 3600 转换为字符串 /*expectedValue := "3600" if hexString == expectedValue { c.SendMessage("test") }else { c.SendMessage(message) }*/ fmt.Println("Received message from server: ", message) } } } // SendMessage 向服务端发送消息 func (c *UDPClient) SendMessage(message string) error { c.mu.Lock() defer c.mu.Unlock() if c.conn == nil { return fmt.Errorf("client connection is closed") } _, err := c.conn.Write([]byte(message)) return err } func InitUDPClient(ctx context.Context, wg *sync.WaitGroup, serverAddr string) (*UDPClient, error) { client, err := NewUDPClient(serverAddr) if err != nil { return nil, err } // 启动 Goroutine 监听服务端的实时消息 wg.Add(1) go client.ListenForMessages(ctx, wg) return client, nil } func init() { InitUDPClient(context.Background(), &sync.WaitGroup{}, "127.0.0.1:13026") }
⚠️:上面代码需要注意的地方
1️⃣:通用udp客户端建立
2️⃣:ListenForMessages 启动 Goroutine 监听服务端的实时消息
3️⃣:消息的处理,这里我使用的是字符串来接收,真正的设备应该是接收16进制的指令。
运行和测试
1️⃣:运行Spring Boot应用,UDP服务端将会在13026端口启动。你可以使用UDP客户端发送消息到该端口,然后在控制台看到服务端输出的消息。
2️⃣:运行go程序,可以在springboot控制台看到打印如下
3️⃣:调用接口,可以同时看到go程序,与springboot打印数据如下
这只是一个简单的示例,实际应用中可能需要根据具体需求进行更复杂的处理和逻辑。 Netty提供了强大的异步事件模型,适用于构建高性能、可伸缩的网络应用程序,而Spring Boot则为我们提供了更便捷的开发体验和集成环境。通过整合Spring Boot和Netty,你可以在网络通信方面获得更好的性能和灵活性。
性能优化与调优
性能优化和调优在高负载环境中是至关重要的,特别是在UDP通信这种无连接、不可靠的场景中。以下是一些性能优化的技巧和在高负载环境中调整UDP通信以获得最佳性能的建议:
性能优化技巧:
- 使用连接池: 对于UDP通信中的连接,考虑使用连接池来减少连接的创建和销毁开销,提高资源利用率。
- 调整缓冲区大小: 根据实际情况调整UDP通信中的缓冲区大小,以优化数据传输效率。
- 合并和拆分消息: 对于小消息,可以考虑合并多个小消息为一个大消息发送,减少网络开销。相反,对于大消息,可以考虑拆分为多个小消息发送,避免一次传输过大数据。
- 异步处理: 使用异步编程模型,将耗时的操作放在异步任务中处理,避免阻塞主线程。
- 压缩数据: 在需要传输大量数据时,可以考虑对数据进行压缩,减少数据传输的大小。
- 避免频繁GC: 减少对象的创建,特别是在高频率的UDP通信中,频繁的垃圾回收会对性能产生不利影响。
在高负载环境中调整UDP通信:
- 调整线程池大小: 在高负载环境中,适当调整线程池的大小,确保有足够的线程处理并发请求,避免线程池饱和。
- 优化消息处理逻辑: 对消息的处理逻辑进行优化,确保处理时间短,避免阻塞,提高处理能力。
- 调整超时设置: 对于需要等待响应的场景,调整超时设置,以适应高负载的情况,避免长时间的等待。
- 流量控制: 在高负载环境中,考虑实施流量控制,限制每个连接的最大流量,防止过多的数据堆积。
- 网络拓扑优化: 对于涉及多台服务器的场景,优化网络拓扑,减少数据传输的跳数,提高数据传输效率。
- 监控和调优: 使用性能监控工具对UDP通信进行监控,识别潜在的性能瓶颈,并进行相应的调优。
- 负载均衡: 对于UDP通信的负载均衡,确保负载均衡器能够合理地分发请求,避免某个节点过载。
在进行性能优化和调优时,需要根据具体的应用场景和性能测试结果进行调整。优化的效果可能因应用的特性而异,因此在实施之前最好进行充分的性能测试。
安全性考量与加密通信
UDP通信的主要特性是无连接和不可靠,相对于TCP,它缺乏内建的安全性机制,因此在UDP通信中需要额外关注安全性问题。以下是一些UDP通信中的安全性问题和如何实现UDP通信的加密传输的建议:
UDP通信的安全性问题:
- 数据完整性: UDP不提供数据完整性验证,因此数据在传输过程中可能会被篡改。攻击者可以修改、删除或注入数据。
- 数据机密性: UDP通信默认是明文传输的,攻击者可以轻松截取和查看通信中的数据,这对于敏感信息是一种风险。
- 重放攻击: 由于UDP通信不具备连接的概念,攻击者可以通过重放已经捕获的UDP数据包来模拟合法的通信。
如何实现UDP通信的加密传输:
- 使用加密算法: 选择合适的加密算法,如AES、DES等,对通信中的数据进行加密。确保使用足够强度的加密算法,并定期更新密钥。
- 消息认证码(MAC): 使用消息认证码对消息进行签名,以验证消息的完整性和真实性。HMAC(基于散列的消息认证码)是一个常见的选择。
- 密钥交换: 定期更换加密密钥,可以通过安全的密钥交换协议,如Diffie-Hellman密钥交换,来确保密钥的安全性。
- 防重放攻击: 使用时间戳或一次性令牌(One-Time Token)等机制防止重放攻击。在通信中引入时序元素,可以有效地防止攻击者重放过期的数据包。
- 数字签名: 对通信中的重要信息进行数字签名,确保数据的真实性和完整性。公钥基础设施(PKI)可以用于验证数字签名。
- 实现安全通信协议: 考虑使用已有的安全通信协议,如DTLS(Datagram Transport Layer Security),它是基于UDP的TLS版本,提供了加密和认证。
- 使用VPN或隧道: 在通信的底层使用安全的VPN(Virtual Private Network)或隧道技术,将UDP数据包进行封装,提供额外的安全性保障。
- 防止拒绝服务攻击: 在UDP通信中,由于缺少连接状态,可能容易受到拒绝服务攻击。采用流量限制、频率控制等手段来减缓拒绝服务攻击的影响。
实现UDP通信的加密传输需要综合考虑数据的机密性、完整性和身份验证等因素。选择合适的安全机制和协议取决于具体的应用场景和安全需求。
结语
深深感谢你阅读完整篇文章,希望你从中获得了些许收获。如果觉得有价值,欢迎点赞、收藏,并关注我的更新,期待与你共同分享更多技术与思考。