AIO实现TimeServer

简介: 目标- 实现返回当前服务器时间的功能- 采用半双工模式(主要是因为采用telnet为客户端,然后telnet默认情况下是半双工)- 客户端连接后,可以发送 time,stop命令. 只有客户端发送stop命令后,服务段才主动断开链路。

目标

- 实现返回当前服务器时间的功能
- 采用半双工模式(主要是因为采用telnet为客户端,然后telnet默认情况下是半双工)
- 客户端连接后,可以发送 time,stop命令. 只有客户端发送stop命令后,服务段才主动断开链路。
- 不考虑读半包和写半包的情况

代码和注释

废话不多说,尽在代码中。直接拷贝即可运行(jdk7或以上)

package com.aio;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.Date;
import java.util.concurrent.CountDownLatch;

/**
 * Author :  Rocky
 * Date : 21/12/2016 15:17
 * Description :
 * Test :
 */
public class TimeServer {

    public static void main(String[] args) throws InterruptedException {
        AsynchronousServerSocketChannel assc = null;
        try {
            assc = AsynchronousServerSocketChannel.open();
            assc.bind(new InetSocketAddress(8888));
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }

        doAccept(assc);

        CountDownLatch latch = new CountDownLatch(1);
        latch.await();
    }

    private static void doAccept(AsynchronousServerSocketChannel assc) {
        assc.accept(assc, new AcceptCompletionHandle());
    }

    private static class AcceptCompletionHandle implements CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel> {

        @Override
        public void completed(AsynchronousSocketChannel result, AsynchronousServerSocketChannel assc) {
            //继续监听accept事件
            assc.accept(assc, this);

            //开始监听可读时间
            ByteBuffer readBuf = ByteBuffer.allocate(1024);
            result.read(readBuf, readBuf, new ReadCompletionHandler(result));
        }

        @Override
        public void failed(Throwable exc, AsynchronousServerSocketChannel assc) {
            System.out.println("accept异常,继续accept");
            assc.accept(assc, this);
        }
    }

    private static class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {

        private AsynchronousSocketChannel asc;

        public ReadCompletionHandler(AsynchronousSocketChannel asc) {
            this.asc = asc;
        }

        @Override
        public void completed(Integer result, ByteBuffer readedData) {
            //如果对端链路关闭
            if (result < 0) {
                try {
                    asc.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                return;
            }
            //如果读取到对端发送过来的数据
            if (result > 0) {
                readedData.flip();
                byte[] data = new byte[readedData.remaining()];
                readedData.get(data);
                String command = null;
                try {
                    command = new String(data, "UTF-8");
                    if ("time\r\n".equalsIgnoreCase(command)) {
                        doWrite(new Date().toString() + "\r\n");
                    } else if ("stop\r\n".equalsIgnoreCase(command)) {
                        doWriteAndClose("bye.\r\n");
                    } else if ("\r\n".equalsIgnoreCase(command)) {
                        doWrite("\r\n");
                    } else {
                        doWrite("unknown command\r\n");
                    }
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                    doWrite("server error\r\n");
                }
            }
            //如果未读取到数据
            else {
                //继续尝试读取对端发送的数据
                ByteBuffer readBuf = ByteBuffer.allocate(1024);
                asc.read(readBuf, readBuf, this);
            }
        }

        private void doWriteAndClose(String response) {
            ByteBuffer repBuf = null;
            try {
                repBuf = ByteBuffer.wrap(response.getBytes("UTF-8"));
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
            if (repBuf != null) {
                asc.write(repBuf, repBuf, new CompletionHandler<Integer, ByteBuffer>() {
                    @Override
                    public void completed(Integer result, ByteBuffer repBuf) {
                        if (repBuf.hasRemaining()) {
                            asc.write(repBuf, repBuf, this);
                        }
                        //写完成后,关闭链路
                        else {
                            try {
                                asc.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    }

                    @Override
                    public void failed(Throwable exc, ByteBuffer repBuf) {
                        exc.printStackTrace();
                        try {
                            asc.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }

        private void doWrite(String response) {
            ByteBuffer repBuf = null;
            try {
                repBuf = ByteBuffer.wrap(response.getBytes("UTF-8"));
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
            if (repBuf != null) {
                asc.write(repBuf, repBuf, new WriteCompletionHandler(asc, this));
            }
        }

        @Override
        public void failed(Throwable exc, ByteBuffer readedData) {
            exc.printStackTrace();
            try {
                asc.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }


    }


    private static class WriteCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {

        private AsynchronousSocketChannel asc;

        private ReadCompletionHandler rch;

        public WriteCompletionHandler(AsynchronousSocketChannel asc, ReadCompletionHandler rch) {
            this.asc = asc;
            this.rch = rch;
        }

        @Override
        public void completed(Integer result, ByteBuffer repBuf) {
            if (repBuf.hasRemaining()) {
                asc.write(repBuf, repBuf, this);
            }
            //写完成后(对端读取完成),再尝试读(半双工模式)
            else {
                //继续尝试读取对端发送的数据
                ByteBuffer readBuf = ByteBuffer.allocate(1024);
                asc.read(readBuf, readBuf, rch);
            }
        }

        @Override
        public void failed(Throwable exc, ByteBuffer repBuf) {
            exc.printStackTrace();
            try {
                asc.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}

测试

telnet 127.0.0.1 8888
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.


time
Wed Dec 21 17:05:44 CST 2016
time
Wed Dec 21 17:07:44 CST 2016


s
unknown command
f
unknown command


stop
bye.
Connection closed by foreign host.
目录
相关文章
|
5月前
|
Java
BIO,NIO,AIO 有什么区别?
BIO,NIO,AIO 有什么区别? BIO:Block IO 同步阻塞式 IO,就是我们平常使用的传统 IO,它的特点是模式简单使用方便,并发处理能力低。
29 0
|
12天前
|
消息中间件 网络协议 Java
一文彻底理解BIO、NIO、AIO
一文彻底理解BIO、NIO、AIO
17 0
|
9月前
|
消息中间件 存储 网络协议
Linux五种I/O模式 NIO BIO AIO IO多路复用 信号驱动 I/O
Linux五种I/O模式 NIO BIO AIO IO多路复用 信号驱动 I/O
128 0
|
Java Linux API
BIO&NIO&AIO
BIO&NIO&AIO
75 0
|
Java
简论BIO NIO AIO
简论BIO NIO AIO
79 0
|
NoSQL 搜索推荐 网络协议
Java NIO、BIO、 AIO 与 同步、阻塞、非阻塞、异步IO 简析
我相信大部分人看到这些名词,都是一头雾水的,如果你去搜索引擎搜索,那么恭喜你,你又会被各种文章中的高大上的名词搞得云里雾里。那么,我们应该怎么理清这么名词之间的关系呢? 所谓 同步/异步/阻塞/非阻塞 IO ,是指操作系统中的对 IO 处理的不同方法,而 Java 对这些不同操作方法做了一些包装,由此有了 BIO / NIO / AIO 几种操作接口。 我不想复制一些高大上的概念,只是想尽量好好说话,说清楚他们之间的关系。 需求 有 A、B、C、D 四个线程可以生产文件,假设他们的返回的文件是一样的,对应我们的服务端 有 E、F、G、H 四个线程在随机时间向服务端上传一个文本,并且要求
|
Java 网络架构
BIO、NIO、AIO的区别
BIO、NIO、AIO的区别
BIO、NIO、AIO的区别
|
存储 弹性计算 Java
|
JSON 前端开发 安全
Java网络编程IO模型 --- BIO、NIO、AIO详解
Java网络编程IO模型 --- BIO、NIO、AIO详解
282 0
Java网络编程IO模型 --- BIO、NIO、AIO详解
|
存储 Java Linux
Linux异步IO(AIO)
异步输入/输出 (AIO) 接口允许并行提交许多 I/O 请求,而不会产生每个请求的线程开销。 本文档的目的是解释如何使用 Linux AIO 接口,即函数家族 `io_setup`、`io_submit`、`io_getevents`、`io_destroy`。 目前,AIO 接口最适合直接“O_DIRECT”访问原始块设备,如磁盘、闪存驱动器或存储阵列。(访问裸盘)
801 2
Linux异步IO(AIO)