Reactor实战,创建一个简单的单线程Reactor(理解了就相当于理解了多线程的Reactor)

简介: 本文通过一个简单的单线程Reactor模式的Java代码示例,展示了如何使用NIO创建一个服务端,处理客户端的连接和数据读写,帮助理解Reactor模式的核心原理。

单线程Reactor

package org.example.utils.echo.single;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

public class EchoServerReactor implements Runnable{
    Selector selector;
    ServerSocketChannel serverSocketChannel;
    EchoServerReactor() throws IOException {
        //Reactor初始化
        selector = Selector.open();

        serverSocketChannel = ServerSocketChannel.open();

        InetSocketAddress address =
                new InetSocketAddress("localhost",
                        8848);

        //非阻塞
        serverSocketChannel.configureBlocking(false);


        //分步处理,第一步,接收accept事件
        SelectionKey sk =
                serverSocketChannel.register(selector,0,new AcceptorHandler());

        // SelectionKey.OP_ACCEPT
        serverSocketChannel.socket().bind(address);
        System.out.println("服务端已经开始监听:"+address);


        sk.interestOps(SelectionKey.OP_ACCEPT);
    }

    @Override
    public void run() {
       try {
           while (!Thread.interrupted()){
               selector.select();
               Set<SelectionKey>  selected=selector.selectedKeys();
               Iterator<SelectionKey> it=selected.iterator();
               while (it.hasNext()){
                   SelectionKey sk=it.next();
                   dispatch(sk);
               }
               selected.clear();
            }
       } catch (IOException e) {
           throw new RuntimeException(e);
       }
    }

    private void dispatch(SelectionKey sk) {
        Runnable handler=(Runnable) sk.attachment();
        if (handler!=null){
            handler.run();
        }
    }

    class AcceptorHandler implements Runnable{


        @Override
        public void run() {
            try {
                SocketChannel channel=serverSocketChannel.accept();
                if (channel!=null)
                    new EchoHandler(selector,channel);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

    }

    public static void main(String[] args) throws IOException {
        new Thread(new EchoServerReactor()).start();
    }
}
package org.example.utils.echo.single;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

public class EchoHandler implements Runnable{
    final SocketChannel channel;
    final SelectionKey sk;
    final ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
    static final int RECIEVING=0,SENDING=1;
    int state=RECIEVING;

    EchoHandler(Selector selector,SocketChannel c) throws IOException {
        channel=c;
        c.configureBlocking(false);
        sk=channel.register(selector,0);
        sk.attach(this);
        sk.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }
    @Override
    public void run() {
        try {
            if (state==SENDING){
                channel.write(byteBuffer);
                byteBuffer.clear();
                sk.interestOps(SelectionKey.OP_READ);
                state=RECIEVING;
            }else if (state==RECIEVING){
                int length=0;
                while ((length=channel.read(byteBuffer))>0)
                {
                    System.out.println(new String(byteBuffer.array(),0,length));
                }
                byteBuffer.flip();
                sk.interestOps(SelectionKey.OP_WRITE);
                state=SENDING;
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

结果:

原理无非就是:

多线程,无非就是搞多个Reactor , 一个专门接受accept , 一个专门dispatch , 再搞一个多线程池处理handle

这里面最主要的就是

handle类,sk.attach(this);把对象传回reactor

参考文献:

java高并发核心编程. 卷1,NIO、Netty、Redis、ZooKeeper (尼恩)

目录
相关文章
|
4月前
|
负载均衡 算法 安全
基于Reactor模式的高性能网络库之线程池组件设计篇
EventLoopThreadPool 是 Reactor 模式中实现“一个主线程 + 多个工作线程”的关键组件,用于高效管理多个 EventLoop 并在多核 CPU 上分担高并发 I/O 压力。通过封装 Thread 类和 EventLoopThread,实现线程创建、管理和事件循环的调度,形成线程池结构。每个 EventLoopThread 管理一个子线程与对应的 EventLoop(subloop),主线程(base loop)通过负载均衡算法将任务派发至各 subloop,从而提升系统性能与并发处理能力。
264 3
|
4月前
|
安全 算法 Java
Java 多线程:线程安全与同步控制的深度解析
本文介绍了 Java 多线程开发的关键技术,涵盖线程的创建与启动、线程安全问题及其解决方案,包括 synchronized 关键字、原子类和线程间通信机制。通过示例代码讲解了多线程编程中的常见问题与优化方法,帮助开发者提升程序性能与稳定性。
198 0
|
5月前
|
存储 SQL 安全
Java 无锁方式实现高性能线程实战操作指南
本文深入探讨了现代高并发Java应用中单例模式的实现方式,分析了传统单例(如DCL)的局限性,并提出了多种无锁实现方案。包括基于ThreadLocal的延迟初始化、VarHandle原子操作、Record不可变对象、响应式编程(Reactor)以及CDI依赖注入等实现方式。每种方案均附有代码示例及适用场景,同时通过JMH性能测试对比各实现的优劣。最后,结合实际案例设计了一个高性能配置中心,展示了无锁单例在实际开发中的应用。总结中提出根据场景选择合适的实现方式,并遵循现代单例设计原则以优化性能和安全性。文中还提供了代码获取链接,便于读者实践与学习。
111 0
|
1月前
|
Java 调度 数据库
Python threading模块:多线程编程的实战指南
本文深入讲解Python多线程编程,涵盖threading模块的核心用法:线程创建、生命周期、同步机制(锁、信号量、条件变量)、线程通信(队列)、守护线程与线程池应用。结合实战案例,如多线程下载器,帮助开发者提升程序并发性能,适用于I/O密集型任务处理。
212 0
|
6月前
|
设计模式 运维 监控
并发设计模式实战系列(4):线程池
需要建立持续的性能剖析(Profiling)和调优机制。通过以上十二个维度的系统化扩展,构建了一个从。设置合理队列容量/拒绝策略。动态扩容/优化任务处理速度。检查线程栈定位热点代码。调整最大用户进程数限制。CPU占用率100%
438 0
|
3月前
|
数据采集 消息中间件 并行计算
Python多线程与多进程性能对比:从原理到实战的深度解析
在Python编程中,多线程与多进程是提升并发性能的关键手段。本文通过实验数据、代码示例和通俗比喻,深入解析两者在不同任务类型下的性能表现,帮助开发者科学选择并发策略,优化程序效率。
251 1
|
4月前
|
数据采集 监控 调度
干货分享“用 多线程 爬取数据”:单线程 + 协程的效率反超 3 倍,这才是 Python 异步的正确打开方式
在 Python 爬虫中,多线程因 GIL 和切换开销效率低下,而协程通过用户态调度实现高并发,大幅提升爬取效率。本文详解协程原理、实战对比多线程性能,并提供最佳实践,助你掌握异步爬虫核心技术。
|
5月前
|
Java 数据挖掘 调度
Java 多线程创建零基础入门新手指南:从零开始全面学习多线程创建方法
本文从零基础角度出发,深入浅出地讲解Java多线程的创建方式。内容涵盖继承`Thread`类、实现`Runnable`接口、使用`Callable`和`Future`接口以及线程池的创建与管理等核心知识点。通过代码示例与应用场景分析,帮助读者理解每种方式的特点及适用场景,理论结合实践,轻松掌握Java多线程编程 essentials。
359 5

热门文章

最新文章