Reactor实战,创建一个简单的单线程Reactor(理解了就相当于理解了多线程的Reactor)

简介: 本文通过一个简单的单线程Reactor模式的Java代码示例,展示了如何使用NIO创建一个服务端,处理客户端的连接和数据读写,帮助理解Reactor模式的核心原理。

单线程Reactor

package org.example.utils.echo.single;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

public class EchoServerReactor implements Runnable{
    Selector selector;
    ServerSocketChannel serverSocketChannel;
    EchoServerReactor() throws IOException {
        //Reactor初始化
        selector = Selector.open();

        serverSocketChannel = ServerSocketChannel.open();

        InetSocketAddress address =
                new InetSocketAddress("localhost",
                        8848);

        //非阻塞
        serverSocketChannel.configureBlocking(false);


        //分步处理,第一步,接收accept事件
        SelectionKey sk =
                serverSocketChannel.register(selector,0,new AcceptorHandler());

        // SelectionKey.OP_ACCEPT
        serverSocketChannel.socket().bind(address);
        System.out.println("服务端已经开始监听:"+address);


        sk.interestOps(SelectionKey.OP_ACCEPT);
    }

    @Override
    public void run() {
       try {
           while (!Thread.interrupted()){
               selector.select();
               Set<SelectionKey>  selected=selector.selectedKeys();
               Iterator<SelectionKey> it=selected.iterator();
               while (it.hasNext()){
                   SelectionKey sk=it.next();
                   dispatch(sk);
               }
               selected.clear();
            }
       } catch (IOException e) {
           throw new RuntimeException(e);
       }
    }

    private void dispatch(SelectionKey sk) {
        Runnable handler=(Runnable) sk.attachment();
        if (handler!=null){
            handler.run();
        }
    }

    class AcceptorHandler implements Runnable{


        @Override
        public void run() {
            try {
                SocketChannel channel=serverSocketChannel.accept();
                if (channel!=null)
                    new EchoHandler(selector,channel);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

    }

    public static void main(String[] args) throws IOException {
        new Thread(new EchoServerReactor()).start();
    }
}
package org.example.utils.echo.single;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

public class EchoHandler implements Runnable{
    final SocketChannel channel;
    final SelectionKey sk;
    final ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
    static final int RECIEVING=0,SENDING=1;
    int state=RECIEVING;

    EchoHandler(Selector selector,SocketChannel c) throws IOException {
        channel=c;
        c.configureBlocking(false);
        sk=channel.register(selector,0);
        sk.attach(this);
        sk.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }
    @Override
    public void run() {
        try {
            if (state==SENDING){
                channel.write(byteBuffer);
                byteBuffer.clear();
                sk.interestOps(SelectionKey.OP_READ);
                state=RECIEVING;
            }else if (state==RECIEVING){
                int length=0;
                while ((length=channel.read(byteBuffer))>0)
                {
                    System.out.println(new String(byteBuffer.array(),0,length));
                }
                byteBuffer.flip();
                sk.interestOps(SelectionKey.OP_WRITE);
                state=SENDING;
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

结果:

原理无非就是:

多线程,无非就是搞多个Reactor , 一个专门接受accept , 一个专门dispatch , 再搞一个多线程池处理handle

这里面最主要的就是

handle类,sk.attach(this);把对象传回reactor

参考文献:

java高并发核心编程. 卷1,NIO、Netty、Redis、ZooKeeper (尼恩)

目录
相关文章
|
2月前
|
存储 监控 Java
Java多线程优化:提高线程池性能的技巧与实践
Java多线程优化:提高线程池性能的技巧与实践
70 1
|
12天前
|
数据采集 负载均衡 安全
LeetCode刷题 多线程编程九则 | 1188. 设计有限阻塞队列 1242. 多线程网页爬虫 1279. 红绿灯路口
本文提供了多个多线程编程问题的解决方案,包括设计有限阻塞队列、多线程网页爬虫、红绿灯路口等,每个问题都给出了至少一种实现方法,涵盖了互斥锁、条件变量、信号量等线程同步机制的使用。
LeetCode刷题 多线程编程九则 | 1188. 设计有限阻塞队列 1242. 多线程网页爬虫 1279. 红绿灯路口
|
19天前
|
Java Spring
spring多线程实现+合理设置最大线程数和核心线程数
本文介绍了手动设置线程池时的最大线程数和核心线程数配置方法,建议根据CPU核数及程序类型(CPU密集型或IO密集型)来合理设定。对于IO密集型,核心线程数设为CPU核数的两倍;CPU密集型则设为CPU核数加一。此外,还讨论了`maxPoolSize`、`keepAliveTime`、`allowCoreThreadTimeout`和`queueCapacity`等参数的设置策略,以确保线程池高效稳定运行。
87 10
spring多线程实现+合理设置最大线程数和核心线程数
|
28天前
|
Java 数据库 Android开发
一个Android App最少有几个线程?实现多线程的方式有哪些?
本文介绍了Android多线程编程的重要性及其实现方法,涵盖了基本概念、常见线程类型(如主线程、工作线程)以及多种多线程实现方式(如`Thread`、`HandlerThread`、`Executors`、Kotlin协程等)。通过合理的多线程管理,可大幅提升应用性能和用户体验。
51 15
一个Android App最少有几个线程?实现多线程的方式有哪些?
|
10天前
|
NoSQL 网络协议 Unix
1)Redis 属于单线程还是多线程?不同版本之间有什么区别?
1)Redis 属于单线程还是多线程?不同版本之间有什么区别?
29 1
|
13天前
|
Python
5-5|python开启多线程入口必须在main,从python线程(而不是main线程)启动pyQt线程有什么坏处?...
5-5|python开启多线程入口必须在main,从python线程(而不是main线程)启动pyQt线程有什么坏处?...
|
21小时前
|
设计模式 Java 物联网
【多线程-从零开始-玖】内核态,用户态,线程池的参数、使用方法详解
【多线程-从零开始-玖】内核态,用户态,线程池的参数、使用方法详解
8 0
|
21小时前
|
Java 调度
【多线程-从零开始-贰】线程的构造方法和常见属性
【多线程-从零开始-贰】线程的构造方法和常见属性
10 0
|
1月前
|
Java 数据库 Android开发
一个Android App最少有几个线程?实现多线程的方式有哪些?
本文介绍了Android应用开发中的多线程编程,涵盖基本概念、常见实现方式及最佳实践。主要内容包括主线程与工作线程的作用、多线程的多种实现方法(如 `Thread`、`HandlerThread`、`Executors` 和 Kotlin 协程),以及如何避免内存泄漏和合理使用线程池。通过有效的多线程管理,可以显著提升应用性能和用户体验。
47 10
|
12天前
|
Java
COMATE插件实现使用线程池高级并发模型简化多线程编程
本文介绍了COMATE插件的使用,该插件通过线程池实现高级并发模型,简化了多线程编程的过程,并提供了生成结果和代码参考。