编写一个简易的Java NIO Reactor库

简介: 开源地址https://github.com/sea-boat/net-reactor源码设计接收器Acceptor/** * * @author seaboat * @date 2016-08-25 * @version 1.

开源地址

https://github.com/sea-boat/net-reactor

源码设计

接收器Acceptor

/**
 * 
 * @author seaboat
 * @date 2016-08-25
 * @version 1.0
 * <pre><b>email: </b>849586227@qq.com</pre>
 * <pre><b>blog: </b>http://blog.csdn.net/wangyangzhizhou</pre>
 * <p>This Acceptor provides a NIO mode to accept client sockets.</p>
 */
public final class Acceptor extends Thread {

    private static final Logger LOGGER = LoggerFactory
            .getLogger(Acceptor.class);
    private final int port;
    private final Selector selector;
    private final ServerSocketChannel serverChannel;
    private long acceptCount;
    private static final AcceptIdGenerator IdGenerator = new AcceptIdGenerator();
    private ReactorPool reactorPool;

    public Acceptor(ReactorPool reactorPool, String name, String bindIp,
            int port) throws IOException {
        super.setName(name);
        this.port = port;
        this.selector = Selector.open();
        this.serverChannel = ServerSocketChannel.open();
        this.serverChannel.configureBlocking(false);
        this.serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
        this.serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 1024);
        this.serverChannel.bind(new InetSocketAddress(bindIp, port), 100);
        this.serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        this.reactorPool = reactorPool;
    }

    public int getPort() {
        return port;
    }

    public long getAcceptCount() {
        return acceptCount;
    }

    @Override
    public void run() {
        final Selector selector = this.selector;
        for (;;) {
            ++acceptCount;
            try {
                selector.select(1000L);
                Set<SelectionKey> keys = selector.selectedKeys();
                try {
                    for (SelectionKey key : keys) {
                        if (key.isValid() && key.isAcceptable()) {
                            accept();
                        } else {
                            key.cancel();
                        }
                    }
                } finally {
                    keys.clear();
                }
            } catch (Throwable e) {
                LOGGER.warn(getName(), e);
            }
        }
    }

    /**
     * Accept client sockets.
     */
    private void accept() {
        SocketChannel channel = null;
        try {
            channel = serverChannel.accept();
            channel.configureBlocking(false);
            channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
            channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
            channel.setOption(StandardSocketOptions.SO_RCVBUF, 1024);
            channel.setOption(StandardSocketOptions.SO_SNDBUF, 1024);
            channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
            reactorPool.getNextReactor().postRegister(
                    new FrontendConnection(channel, IdGenerator.getId()));
        } catch (Throwable e) {
            closeChannel(channel);
            LOGGER.warn(getName(), e);
        }
    }

    /**
     * Close a channel.
     * 
     * @param channel
     */
    private static void closeChannel(SocketChannel channel) {
        if (channel == null) {
            return;
        }
        Socket socket = channel.socket();
        if (socket != null) {
            try {
                socket.close();
                LOGGER.info("channel close.");
            } catch (IOException e) {
                LOGGER.warn("IOException happens when closing socket : ", e);
            }
        }
        try {
            channel.close();
        } catch (IOException e) {
            LOGGER.warn("IOException happens when closing channel : ", e);
        }
    }

    /**
     * ID Generator.
     */
    private static class AcceptIdGenerator {
        private static final long MAX_VALUE = 0xffffffffL;
        private long acceptId = 0L;
        private final Object lock = new Object();

        private long getId() {
            synchronized (lock) {
                if (acceptId >= MAX_VALUE) {
                    acceptId = 0L;
                }
                return ++acceptId;
            }
        }
    }
}

Reactor类

/**
 * 
 * @author seaboat
 * @date 2016-08-25
 * @version 1.0
 * <pre><b>email: </b>849586227@qq.com</pre>
 * <pre><b>blog: </b>http://blog.csdn.net/wangyangzhizhou</pre>
 * <p>Reactor reacts all sockets.</p>
 */
public final class Reactor extends Thread {
    private static final Logger LOGGER = LoggerFactory.getLogger(Reactor.class);
    private final String name;
    private final Selector selector;
    private final ConcurrentLinkedQueue<FrontendConnection> queue;
    private long doCount;
    private Handler handler;

    public Reactor(String name, Handler handler) throws IOException {
        this.name = name;
        this.selector = Selector.open();
        this.queue = new ConcurrentLinkedQueue<FrontendConnection>();
        this.handler = handler;
    }

    final void postRegister(FrontendConnection frontendConnection) {
        queue.offer(frontendConnection);
        this.selector.wakeup();
    }

    @Override
    public void run() {
        final Selector selector = this.selector;
        Set<SelectionKey> keys = null;
        for (;;) {
            ++doCount;
            try {
                selector.select(500L);
                register(selector);
                keys = selector.selectedKeys();
                for (SelectionKey key : keys) {
                    FrontendConnection connection = null;
                    Object attach = key.attachment();
                    if (attach != null && key.isValid()) {
                        connection = (FrontendConnection) attach;
                        if (key.isReadable()) {
                            try {
                                connection.read();
                                handler.handle(connection);
                            } catch (IOException e) {
                                connection.close();
                                LOGGER.warn("IOException happens : ", e);
                                continue;
                            } catch (Throwable e) {
                                LOGGER.warn("Throwable happens : ", e);
                                continue;
                            }
                        }
                        if (key.isValid() && key.isWritable()) {
                            connection.write();
                        }
                    } else {
                        key.cancel();
                    }
                }
            } catch (Throwable e) {
                LOGGER.warn("exception happens selecting : ", e);
            } finally {
                if (keys != null) {
                    keys.clear();
                }
            }
        }
    }

    private void register(Selector selector) {
        FrontendConnection c = null;
        if (queue.isEmpty()) {
            return;
        }
        while ((c = queue.poll()) != null) {
            try {
                c.register(selector);
            } catch (Throwable e) {
                LOGGER.warn("ClosedChannelException happens : ", e);
            }
        }
    }

    final Queue<FrontendConnection> getRegisterQueue() {
        return queue;
    }

    final long getReactCount() {
        return doCount;
    }

}

Reactor池

/**
 * 
 * @author seaboat
 * @date 2016-08-25
 * @version 1.0
 * <pre><b>email: </b>849586227@qq.com</pre>
 * <pre><b>blog: </b>http://blog.csdn.net/wangyangzhizhou</pre>
 * <p>Reactor pool. Socket connections are polling to the reactor of this pool. </p>
 */
public class ReactorPool {
    private final Reactor[] reactors;
    private volatile int nextReactor;
    private String name = "reactor";

    public ReactorPool(int poolSize, Handler handler) throws IOException {
        reactors = new Reactor[poolSize];
        for (int i = 0; i < poolSize; i++) {
            Reactor reactor = new Reactor(name + "-" + i,handler);
            reactors[i] = reactor;
            reactor.start();
        }
    }

    public Reactor getNextReactor() {
        if (++nextReactor == reactors.length) {
            nextReactor = 0;
        }
        return reactors[nextReactor];
    }
}

前端连接抽象

/**
 * 
 * @author seaboat
 * @date 2016-08-25
 * @version 1.0
 * <pre><b>email: </b>849586227@qq.com</pre>
 * <pre><b>blog: </b>http://blog.csdn.net/wangyangzhizhou</pre>
 * <p>This is a abstraction of frontend.</p>
 */
public class FrontendConnection {
    private static final Logger LOGGER = LoggerFactory
            .getLogger(FrontendConnection.class);
    private long id;
    private SocketChannel channel;
    private SelectionKey selectionKey;
    private ByteBuffer readBuffer;
    private static int BYFFERSIZE = 1024;
    protected ConcurrentLinkedQueue<ByteBuffer> writeQueue = new ConcurrentLinkedQueue<ByteBuffer>();

    public FrontendConnection(SocketChannel channel, long id) {
        this.id = id;
        this.channel = channel;
    }

    public SocketChannel getChannel() {
        return channel;
    }

    public long getId() {
        return id;
    }

    public void read() throws IOException {
        readBuffer = ByteBuffer.allocate(BYFFERSIZE);
        channel.read(readBuffer);
    }

    public void close() throws IOException {
        channel.close();
    }

    public void write() throws IOException {
        ByteBuffer buffer;
        while ((buffer = writeQueue.poll()) != null) {
            buffer.flip();
            while (buffer.hasRemaining()) {
                int len = channel.write(buffer);
                if (len < 0) {
                    throw new EOFException();
                }
                if (len == 0) {
                    selectionKey.interestOps(selectionKey.interestOps()
                            | SelectionKey.OP_WRITE);
                    selectionKey.selector().wakeup();
                    break;
                }
            }
        }
        selectionKey.interestOps(selectionKey.interestOps()
                & ~SelectionKey.OP_WRITE);
    }

    public ByteBuffer getReadBuffer() {
        return readBuffer;
    }

    public ConcurrentLinkedQueue<ByteBuffer> getWriteQueue() {
        return writeQueue;
    }

    public void register(Selector selector) throws Throwable {
        selectionKey = channel.register(selector, SelectionKey.OP_READ, this);
    }

}

处理

/**
 * 
 * @author seaboat
 * @date 2016-08-25
 * @version 1.0
 * <pre><b>email: </b>849586227@qq.com</pre>
 * <pre><b>blog: </b>http://blog.csdn.net/wangyangzhizhou</pre>
 * <p>This Handler will be call when there is a data having be ready.</p>
 */
public interface Handler {

    public void handle(FrontendConnection connection);

}

定义自己的处理

/**
 * 
 * @author seaboat
 * @date 2016-08-25
 * @version 1.0
 * <pre><b>email: </b>849586227@qq.com</pre>
 * <pre><b>blog: </b>http://blog.csdn.net/wangyangzhizhou</pre>
 * <p>Demo.</p>
 */
public class MyHandler implements Handler {

    private static final Logger LOGGER = LoggerFactory
            .getLogger(MyHandler.class);
    private long readSize;

    /**
     * The logic to deal with the received data.
     *  
     * It means that reactor will trigger this function once the data is received.
     */
    public void handle(FrontendConnection connection) {
        Buffer buff = connection.getReadBuffer();
        readSize = +readSize + buff.position();
        LOGGER.info(connection.getId() + " connection has receive " + readSize);
        if (readSize % 5 == 0) {
            ByteBuffer sendBuffer = ByteBuffer.allocate(10);;
            sendBuffer.wrap("hello".getBytes());
            connection.getWriteQueue().add(sendBuffer);
            try {
                connection.write();
            } catch (IOException e) {
                LOGGER.warn("IOException", e);
            }
        }
    }

}

启动

/**
 * 
 * @author seaboat
 * @date 2016-08-25
 * @version 1.0
 * <pre><b>email: </b>849586227@qq.com</pre>
 * <pre><b>blog: </b>http://blog.csdn.net/wangyangzhizhou</pre>
 * <p>The reactor bootstrap.</p>
 */
public class Bootstrap {
    private static final Logger LOGGER = LoggerFactory
            .getLogger(Bootstrap.class);
    private static String acceptorName = "acceptor-thread";
    private static String host = "localhost";
    private static int port = 6789;

    public static void main(String[] args) {
        try {
            LOGGER.info("starting up ......");
            Handler handler = new MyHandler();
            ReactorPool reactorPool = new ReactorPool(Runtime.getRuntime().availableProcessors(), handler);
            new Acceptor(reactorPool, acceptorName, host, port).start();
            LOGGER.info("started up successfully.");
            while (true) {
                Thread.sleep(300 * 1000);
            }
        } catch (Throwable e) {
            LOGGER.error(" launch error", e);
            System.exit(-1);
        }
    }
}

net-reactor

it’s a simple and easy net framework with nio mode written by java

how-to

just simply like:

Handler handler = new MyHandler();
ReactorPool reactorPool = new ReactorPool(Runtime.getRuntime().availableProcessors(), handler);
new Acceptor(reactorPool, acceptorName, host, port).start();

========广告时间========

鄙人的新书《Tomcat内核设计剖析》已经在京东销售了,有需要的朋友可以到 https://item.jd.com/12185360.html 进行预定。感谢各位朋友。

为什么写《Tomcat内核设计剖析》

=========================

目录
相关文章
|
6月前
|
JavaScript 前端开发 Java
通义灵码 Rules 库合集来了,覆盖Java、TypeScript、Python、Go、JavaScript 等
通义灵码新上的外挂 Project Rules 获得了开发者的一致好评:最小成本适配我的开发风格、相当把团队经验沉淀下来,是个很好功能……
1205 103
|
缓存 Java Maven
Java本地高性能缓存实践问题之SpringBoot中引入Caffeine作为缓存库的问题如何解决
Java本地高性能缓存实践问题之SpringBoot中引入Caffeine作为缓存库的问题如何解决
548 1
|
9月前
|
安全 druid Java
Java 访问数据库的奇妙之旅
本文介绍了Java访问数据库的几种常见方式
126 12
|
8月前
|
缓存 网络协议 Java
JAVA网络IO之NIO/BIO
本文介绍了Java网络编程的基础与历史演进,重点阐述了IO和Socket的概念。Java的IO分为设备和接口两部分,通过流、字节、字符等方式实现与外部的交互。
247 0
|
10月前
|
监控 Java API
探索Java NIO:究竟在哪些领域能大显身手?揭秘原理、应用场景与官方示例代码
Java NIO(New IO)自Java SE 1.4引入,提供比传统IO更高效、灵活的操作,支持非阻塞IO和选择器特性,适用于高并发、高吞吐量场景。NIO的核心概念包括通道(Channel)、缓冲区(Buffer)和选择器(Selector),能实现多路复用和异步操作。其应用场景涵盖网络通信、文件操作、进程间通信及数据库操作等。NIO的优势在于提高并发性和性能,简化编程;但学习成本较高,且与传统IO存在不兼容性。尽管如此,NIO在构建高性能框架如Netty、Mina和Jetty中仍广泛应用。
260 3
|
10月前
|
存储 监控 Java
Java的NIO体系
通过本文的介绍,希望您能够深入理解Java NIO体系的核心组件、工作原理及其在高性能应用中的实际应用,并能够在实际开发中灵活运用这些知识,构建高效的Java应用程序。
290 5
|
11月前
|
Java BI API
Java Excel报表生成:JXLS库的高效应用
在Java应用开发中,经常需要将数据导出到Excel文件中,以便于数据的分析和共享。JXLS库是一个强大的工具,它基于Apache POI,提供了一种简单而高效的方式来生成Excel报表。本文将详细介绍JXLS库的使用方法和技巧,帮助你快速掌握Java中的Excel导出功能。
340 6
|
11月前
|
Java API Apache
|
11月前
|
消息中间件 缓存 Java
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
零拷贝技术 Zero-Copy 是指计算机执行操作时,可以直接从源(如文件或网络套接字)将数据传输到目标缓冲区, 而不需要 CPU 先将数据从某处内存复制到另一个特定区域,从而减少上下文切换以及 CPU 的拷贝时间。
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
|
存储 网络协议 Java
Java NIO 开发
本文介绍了Java NIO(New IO)及其主要组件,包括Channel、Buffer和Selector,并对比了NIO与传统IO的优势。文章详细讲解了FileChannel、SocketChannel、ServerSocketChannel、DatagramChannel及Pipe.SinkChannel和Pipe.SourceChannel等Channel实现类,并提供了示例代码。通过这些示例,读者可以了解如何使用不同类型的通道进行数据读写操作。
208 0
Java NIO 开发

热门文章

最新文章

下一篇
oss教程