java nio 网络框架实现(转)

简介: maven项目https://github.com/solq360/common 链式编/解码 链路层链式处理 管道管理socket 多协议处理非常方便 仿netty NioEventLoop 单线程串行处理 ========侍加功能 : 自动化编/解码 rpc 接口增强...

 

maven项目
https://github.com/solq360/common

  • 链式编/解码
  • 链路层链式处理
  • 管道管理socket
  • 多协议处理非常方便
  • 仿netty NioEventLoop 单线程串行处理

========
侍加功能 :

  • 自动化编/解码
  • rpc 接口增强使用

简单聊天例子

server

TestNioServer

//创建session管理工厂
ISessionFactory sessionFactory = new SessionFactory();
//创建编/解码管理
ICoderParserManager coderParserManager = new CoderParserManager();
//注册包编/解码,处理业务
coderParserManager.register(CoderParser.valueOf("server chat", PackageDefaultCoder.valueOf(), new ChatTestServerHandle()));
//创建ServerSocket 实例
ServerSocket serverSocket=ServerSocket.valueOf(SocketChannelConfig.valueOf(6969), 10,20,coderParserManager, sessionFactory);

//启动服务
serverSocket.start();
//阻塞当前线程
serverSocket.sync();
//关闭处理
serverSocket.stop();
	

client

TestNioClient
传统方式连接

	//创建编/解码管理
 	ICoderParserManager coderParserManager = new CoderParserManager();
 	//注册包编/解码,处理业务
	coderParserManager.register(CoderParser.valueOf("chat", PackageDefaultCoder.valueOf(), new ChatHandle()));
	//创建ClientSocket 实例
	final ClientSocket clientSocket = ClientSocket.valueOf(SocketChannelConfig.valueOf(6969), new SocketPool("client", null), coderParserManager, new EmptyHandle());

	//模拟连接之后发送消息
	Timer timer = new Timer();
	timer.schedule(new TimerTask() {

	    @Override
	    public void run() {
		clientSocket.send("连接服务器成功");
		System.out.println("send ");
		this.cancel();
	    }
	}, 1000);
	
	//启动服务
	clientSocket.start();
	//阻塞当前线程
	clientSocket.sync();
	//关闭处理
	clientSocket.stop();

服务器方式连接

	//创建session管理工厂
	ISessionFactory sessionFactory = new SessionFactory();
	//创建编/解码管理
 	ICoderParserManager coderParserManager = new CoderParserManager();
 	//注册包编/解码,处理业务
	coderParserManager.register(CoderParser.valueOf("chat", PackageDefaultCoder.valueOf(), new ChatHandle()));
	//创建ClientSocket 实例
	final ServerSocket serverSocket = ServerSocket.valueOf(SocketChannelConfig.valueOf(8888), 10, 20, coderParserManager, sessionFactory);

	//模拟连接之后发送消息
	Timer timer = new Timer();
	timer.schedule(new TimerTask() {

	    @Override
	    public void run() {
		System.out.println("registerClientSocket");
		//主动连接服务器
		ClientSocket clientSocket = serverSocket.registerClient(SocketChannelConfig.valueOf(6969));
		clientSocket.send("连接服务器成功");
		this.cancel();
	    }
	}, 1000);
	
	//启动服务
	serverSocket.start();
	//阻塞当前线程
	serverSocket.sync();
	//关闭处理
	serverSocket.stop();

源码实现过程

链式编/解码

  • 由 多个 ICoder 输入/输出转换处理
  • CoderParser 类组装多个 ICoder
  • 编/码处理器 注意优先级

  • nio read -> packageCoder -> link coders -> handle

  • handle write -> link coders -> packageCoder -> nio write

  • 由 ICoderParserManager 管理调用处理
public interface ICoderParserManager {

    /**
     * 解码处理
     * 
     * @return CoderResult
     * */
    CoderResult decode(ByteBuffer buffer, ICoderCtx ctx);

    /**
     * 编码处理
     * */
    ByteBuffer encode(Object message, ICoderCtx ctx);

    void error(ByteBuffer buffer, ICoderCtx ctx);

    /** 注册 编/码处理器 */
    void register(CoderParser coderParser);
}

其中核心
decode
encode

  @Override
    public CoderResult decode(ByteBuffer buffer, ICoderCtx ctx) {
	final SocketChannelCtx socketChannelCtx = (SocketChannelCtx) ctx;
	final ClientSocket clientSocket = socketChannelCtx.getClientSocket();

	for (CoderParser coderParser : coderParsers.values()) {
	    final IPackageCoder packageCoder = coderParser.getPackageCoder();
	    final ICoder<?, ?>[] linkCoders = coderParser.getCoders();
	    final IHandle handle = coderParser.getHandle();
	    Object value = null;
	    synchronized (buffer) {
		// 已解析完
		if (socketChannelCtx.getCurrPackageIndex() >= buffer.limit()) {
		    return CoderResult.valueOf(ResultValue.UNFINISHED);
		}
		// 包协议处理
		if (!packageCoder.verify(buffer, ctx)) {
		    continue;
		}
		// 包解析
		value = packageCoder.decode(buffer, ctx);
		if (value == null) {
		    // 包未读完整
		    return CoderResult.valueOf(ResultValue.UNFINISHED);
		}
	    }
	    // 链式处理
	    if (linkCoders != null) {
		for (ICoder coder : linkCoders) {
		    value = coder.decode(value, ctx);
		    if (value == null) {
			throw new CoderException("解码出错 : " + coder.getClass());
		    }
		}
	    }
	    // 业务解码处理
	    value = handle.decode(value, ctx);
	    clientSocket.readBefore(socketChannelCtx, value);
	    handle.handle(value, ctx);
	    clientSocket.readAfter(socketChannelCtx, value);

	    return CoderResult.valueOf(ResultValue.SUCCEED);
	}
	return CoderResult.valueOf(ResultValue.NOT_FIND_CODER);
    }

    @Override
    public ByteBuffer encode(Object message, ICoderCtx ctx) {

	for (CoderParser coderParser : coderParsers.values()) {
	    final IPackageCoder packageCoder = coderParser.getPackageCoder();
	    final ICoder<?, ?>[] linkCoders = coderParser.getCoders();
	    final IHandle handle = coderParser.getHandle();
	    // 业务检查
	    if (!handle.verify(message, ctx)) {
		continue;
	    }
	    // 业务编码处理
	    Object value = handle.encode(message, ctx);
	    // 链式处理
	    if (linkCoders != null) {
		for (int i = linkCoders.length - 1; i >= 0; i--) {
		    ICoder coder = linkCoders[i];
		    value = coder.encode(value, ctx);
		    if (value == null) {
			throw new CoderException("编码出错 : " + coder.getClass());
		    }
		}
	    }
	    // 打包消息处理
	    value = packageCoder.encode(value, ctx);
	    if (value != null) {
		return (ByteBuffer) value;
	    }
	    throw new CoderException("编码出错  :" + packageCoder.getClass());
	}

	throw new CoderException("未找到编/解码处理器 ");
   }   
  • 半包/帖包处理 : AbstractISocketChannel doRead方法摘要,根据解码返回的状态做处理。
  • 半包:当不是完成状态时,继续解码,从最后一次包索引开始处理
  • 帖包:当完成包解码移动包索引,等侍下轮解码处理

       boolean run = true;
    	    // 粘包处理
    	    while (run) {
    		ByteBuffer cpbuffer = socketChannelCtx.coderBegin();
    		cpbuffer.mark();
    		CoderResult coderResult = coderParserManager.decode(cpbuffer, socketChannelCtx);
    		switch (coderResult.getValue()) {
    		case SUCCEED:
    		    break;
    		case NOT_FIND_CODER:
    		    final int readySize = socketChannelCtx.getWriteIndex() - socketChannelCtx.getCurrPackageIndex();
    		    final int headLimit = 255;
    		    if (readySize >= headLimit) {
    			throw new CoderException("未找到编/解码处理器 ");
    		    }
    		    run = false;
    		    break;
    		case UNFINISHED:
    		case UNKNOWN:
    		case ERROR:
    		default:
    		    run = false;
    		    // TODO throw
    		    break;
    		}
    	  }

http://www.cnblogs.com/solq/p/4585496.html

相关文章
|
12天前
|
Java 程序员
JAVA程序员的进阶之路:掌握URL与URLConnection,轻松玩转网络资源!
在Java编程中,网络资源的获取与处理至关重要。本文介绍了如何使用URL与URLConnection高效、准确地获取网络资源。首先,通过`java.net.URL`类定位网络资源;其次,利用`URLConnection`类实现资源的读取与写入。文章还提供了最佳实践,包括异常处理、连接池、超时设置和请求头与响应头的合理配置,帮助Java程序员提升技能,应对复杂网络编程场景。
37 9
|
9天前
|
人工智能 前端开发 Java
基于开源框架Spring AI Alibaba快速构建Java应用
本文旨在帮助开发者快速掌握并应用 Spring AI Alibaba,提升基于 Java 的大模型应用开发效率和安全性。
基于开源框架Spring AI Alibaba快速构建Java应用
|
9天前
|
消息中间件 Java 数据库连接
Java 反射最全详解 ,框架设计必掌握!
本文详细解析Java反射机制,包括反射的概念、用途、实现原理及应用场景。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
Java 反射最全详解 ,框架设计必掌握!
|
12天前
|
安全 Java API
深入探索Java网络编程中的HttpURLConnection:从基础到进阶
本文介绍了Java网络编程中HttpURLConnection的高级特性,包括灵活使用不同HTTP方法、处理重定向、管理Cookie、优化安全性以及处理大文件上传和下载。通过解答五个常见问题,帮助开发者提升网络编程的效率和安全性。
|
12天前
|
JSON 安全 算法
JAVA网络编程中的URL与URLConnection:那些你不知道的秘密!
在Java网络编程中,URL与URLConnection是连接网络资源的两大基石。本文通过问题解答形式,揭示了它们的深层秘密,包括特殊字符处理、请求头设置、响应体读取、支持的HTTP方法及性能优化技巧,帮助你掌握高效、安全的网络编程技能。
42 9
|
5天前
|
消息中间件 缓存 Java
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
零拷贝技术 Zero-Copy 是指计算机执行操作时,可以直接从源(如文件或网络套接字)将数据传输到目标缓冲区, 而不需要 CPU 先将数据从某处内存复制到另一个特定区域,从而减少上下文切换以及 CPU 的拷贝时间。
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
|
11天前
|
缓存 Java 数据库连接
Hibernate:Java持久层框架的高效应用
通过上述步骤,可以在Java项目中高效应用Hibernate框架,实现对关系数据库的透明持久化管理。Hibernate提供的强大功能和灵活配置,使得开发者能够专注于业务逻辑的实现,而不必过多关注底层数据库操作。
10 1
|
4天前
|
存储 Java 开发者
Java中的集合框架深入解析
【10月更文挑战第32天】本文旨在为读者揭开Java集合框架的神秘面纱,通过深入浅出的方式介绍其内部结构与运作机制。我们将从集合框架的设计哲学出发,探讨其如何影响我们的编程实践,并配以代码示例,展示如何在真实场景中应用这些知识。无论你是Java新手还是资深开发者,这篇文章都将为你提供新的视角和实用技巧。
7 0
|
1天前
|
SQL 安全 算法
网络安全与信息安全:漏洞、加密与意识的交织
【10月更文挑战第35天】在数字化时代,网络安全不再是可选项,而是每个网民的必修课。本文旨在深入探讨网络安全的核心要素,包括常见的安全漏洞、先进的加密技术以及不可或缺的安全意识。通过分析这些方面,我们将揭示如何保护个人和组织免受网络攻击的策略,同时提供实用的代码示例,以增强读者的实践能力。文章将引导您思考如何在日益复杂的网络环境中保持警惕,并采取积极措施以确保数据的安全。
13 4
|
1天前
|
SQL 安全 网络安全
网络安全与信息安全:漏洞、加密与安全意识的交织
在数字化时代,网络安全和信息安全的重要性日益凸显。本文深入探讨了网络安全漏洞、加密技术以及安全意识等关键要素,分析了它们之间的相互作用和对维护网络安全的影响。通过实例和代码示例,揭示了网络攻击的常见手段,展示了如何利用加密技术保护数据,以及提升个人和组织的安全意识。本文旨在为读者提供有价值的信息和建议,帮助在复杂的网络环境中更好地保护自己的数字资产。
下一篇
无影云桌面