JAVA并发处理经验(四)并行模式与算法6:NIO网络编程

简介: 一、前言 首先我们必须了解NIO的一些基本概念 channel:是NIO中的一个通道,类似我们说的流。

一、前言

首先我们必须了解NIO的一些基本概念

channel:是NIO中的一个通道,类似我们说的流。---管道

Buffer:理解为byte数组。与channel交流。----水流

Selector:有一个SelectableChancel实现,用线程管理------选择器

二、NIO编程

2.1 NIO服务端

package pattern.nio;


import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.channels.spi.SelectorProvider;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Created by ycy on 16/1/19.
 */
public class NioServer {
    //选择器
    private Selector selector;
    //线程池
    private ExecutorService tp = Executors.newCachedThreadPool();
    //给定大小的map
    public static Map<Socket, Long> time_stat = new HashMap<Socket, Long>(10240);

    public void startServer() throws Exception {
        //1'由selectorPrivider返回一个创建者,并打开一个选择器
        selector = SelectorProvider.provider().openSelector();
        //2'打开套接字通道
        ServerSocketChannel ssc = ServerSocketChannel.open();
        //block - 如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
        ssc.configureBlocking(false);

        InetSocketAddress isa = new InetSocketAddress(65500);
        // InetSocketAddress isa=new InetSocketAddress(8000);
        //获取与此通道关联的服务器套接字。将 ServerSocket 绑定到特定地址(IP 地址和端口号)
        ssc.socket().bind(isa);
        //将通道注册到选择器,
        SelectionKey accpetKey = ssc.register(selector, SelectionKey.OP_ACCEPT);
        try {
            for (; ; ) {
                selector.select();//消费阻塞
                Set readykeys = selector.selectedKeys();//获取已经准备好的keys
                Iterator i = readykeys.iterator();//迭代
                long e = 0;
                while (i.hasNext()) {
                    SelectionKey sk = (SelectionKey) i.next();
                    i.remove();//必须消除,防止重复消费
                    if (sk.isAcceptable()) {
                        doAccept(sk);//如果为接受状态,接受
                    } else if (sk.isValid() && sk.isReadable()) {//如果是可读
                        if (!time_stat.containsKey(((SocketChannel) sk.channel()).socket())) {
                            //将socket方法如map
                            time_stat.put(((SocketChannel) sk.channel()).socket(),
                                    System.currentTimeMillis());//增加一个时间戳
                            //读取
                            doRead(sk);
                        }
                    } else if (sk.isValid() && sk.isWritable()) {
                        //写
                        doWrite(sk);
                        e = System.currentTimeMillis();
                        long b = time_stat.remove(((SocketChannel) sk.channel()).socket());
                        System.out.println("spend:" + (e - b) + "ms");//输入处理写入耗时
                    }
                }

            }
        } catch (ClosedSelectorException e) {
            System.out.println("外面捕捉不做事");
        }

    }

    /*
    与客户端建立连接
     */
    private void doAccept(SelectionKey sk) {

        try {
            ServerSocketChannel server = (ServerSocketChannel) sk.channel();
            SocketChannel clientChannel;
            clientChannel = server.accept();//生成一个channel表示与客户端通信
            clientChannel.configureBlocking(false);//非阻塞模式
            //Register this channel for reading
            SelectionKey clientKey = clientChannel.register(selector, SelectionKey.OP_READ);
            //Allocate an Echoclient instance adn attach it to this selction key
            EchoClient echoClient = new EchoClient();//回复给客服端口的全部信息
            clientKey.attach(echoClient);//附加实例,整个连接共享实例


            InetAddress clientAddress = clientChannel.socket().getInetAddress();
            System.out.println("Acceprted connection form " + clientAddress.getHostAddress() + ".")
            ;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    //执行读得操作
    private void doRead(SelectionKey sk) {
        SocketChannel channel = (SocketChannel) sk.channel();
        ByteBuffer bb = ByteBuffer.allocate(8192);
        int len;
        try {
            len = channel.read(bb);
            if (len < 0) {
                disconnect(sk);
                return;
            }
        } catch (Exception e) {
            System.out.println("faild to read from client");
            e.printStackTrace();
            disconnect(sk);
            return;
        }

        bb.flip();
        tp.execute(new HanldeMsg(sk, bb));
    }

    private void disconnect(SelectionKey sk) {
        try {

            SocketChannel channel = (SocketChannel) sk.channel();
            channel.close();
            //sk.cancel();
            sk.selector().close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //执行写操作
    private void doWrite(SelectionKey sk) {
        SocketChannel channel = (SocketChannel) sk.channel();
        EchoClient echoClient = (EchoClient) sk.attachment();
        LinkedList<ByteBuffer> outq = echoClient.getOutputQuquq();

        ByteBuffer bb = outq.getLast();

        try {
            int len = channel.write(bb);
            if (len == -1) {
                disconnect(sk);
                return;
            }
            if (bb.remaining() == 0) {
                //已经完成
                outq.removeLast();
            }

        } catch (Exception e) {
            System.out.println("Faild to write to client");
            e.printStackTrace();
            disconnect(sk);
        }
        if (outq.size() == 0) {//很重要
            sk.interestOps(SelectionKey.OP_READ);
        }
    }

    /////////////////内部匿名类/////////////////////
    class EchoClient {
        private LinkedList<ByteBuffer> outq;

        EchoClient() {
            outq = new LinkedList<ByteBuffer>();
        }

        public LinkedList<ByteBuffer> getOutputQuquq() {
            return outq;
        }

        public void enqueue(ByteBuffer bb) {
            outq.addFirst(bb);
        }
    }

    //将接受的数据压入EchClient,需要处理业务在这u处理,处理完成之后重新注册事件op_write
    class HanldeMsg implements Runnable {
        SelectionKey sk;
        ByteBuffer bb;

        public HanldeMsg(SelectionKey sk, ByteBuffer bb) {
            this.sk = sk;
            this.bb = bb;
        }

        public void run() {
            EchoClient echoClient = (EchoClient) sk.attachment();
            echoClient.enqueue(bb);
            sk.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
            //强迫selector立即返回
            selector.wakeup();
        }
    }

}

2.2NIO main方法

package pattern.nio;

/**
 * Created by ycy on 16/1/20.
 */
public class NioMain {
    public static void main(String[] args) throws Exception {
        NioServer nioServer=new NioServer();
        nioServer.startServer();
    }
}

2.3 NIO客户端

package pattern.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;

/**
 * Created by ycy on 16/1/20.
 */
public class NioClient {
    private Selector selector;
    public void init(String ip,int port) throws IOException {
        SocketChannel channel=SocketChannel.open();
        channel.configureBlocking(false);

        this.selector= SelectorProvider.provider().openSelector();
        channel.connect(new InetSocketAddress(ip,port));
        channel.register(selector,SelectionKey.OP_CONNECT);

    }
    public void working() throws IOException {
        while (true){
            if (!selector.isOpen()){
                break;
            }
                selector.select();
                Iterator<SelectionKey> ite=this.selector.selectedKeys().iterator();
                while (ite.hasNext()){
                    SelectionKey key=ite.next();
                    ite.remove();
                    //连接事件
                    if (key.isConnectable()){
                        connect(key);
                    }else if(key.isReadable()){
                        read(key);
                    }
                }

        }
    }
    /*
    连接
     */
    public void connect(SelectionKey key) throws IOException {
       SocketChannel channel=(SocketChannel)key.channel();
        //如果正在连接,则连接完成
        if(channel.isConnectionPending()){
            channel.finishConnect();

        }
        channel.configureBlocking(false);
        channel.write(ByteBuffer.wrap(new String("HELLO" ).getBytes()));
        channel.register(this.selector,SelectionKey.OP_READ);
    }

    public void read(SelectionKey key) throws IOException {
        SocketChannel channel=(SocketChannel)key.channel();
        //读取缓冲区
        ByteBuffer bb=ByteBuffer.allocate(1000);
        channel.read(bb);
        byte[] data=bb.array();
        String msg=new String(data).trim();
        System.out.println("客户端收到信息:"+msg);
        channel.close();
        key.selector().close();
    }

    public static void main(String[] args) throws IOException {
        NioClient nioClient=new NioClient();
        nioClient.init("127.0.0.1",65500);
        nioClient.working();
    }
}



目录
相关文章
|
1月前
|
存储 算法 Java
解锁“分享文件”高效密码:探秘 Java 二叉搜索树算法
在信息爆炸的时代,文件分享至关重要。二叉搜索树(BST)以其高效的查找性能,为文件分享优化提供了新路径。本文聚焦Java环境下BST的应用,介绍其基础结构、实现示例及进阶优化。BST通过有序节点快速定位文件,结合自平衡树、多线程和权限管理,大幅提升文件分享效率与安全性。代码示例展示了文件插入与查找的基本操作,适用于大规模并发场景,确保分享过程流畅高效。掌握BST算法,助力文件分享创新发展。
|
25天前
|
安全 网络协议 Java
Java网络编程封装
Java网络编程封装原理旨在隐藏底层通信细节,提供简洁、安全的高层接口。通过简化开发、提高安全性和增强可维护性,封装使开发者能更高效地进行网络应用开发。常见的封装层次包括套接字层(如Socket和ServerSocket类),以及更高层次的HTTP请求封装(如RestTemplate)。示例代码展示了如何使用RestTemplate简化HTTP请求的发送与处理,确保代码清晰易维护。
|
2月前
|
存储 人工智能 算法
解锁分布式文件分享的 Java 一致性哈希算法密码
在数字化时代,文件分享成为信息传播与协同办公的关键环节。本文深入探讨基于Java的一致性哈希算法,该算法通过引入虚拟节点和环形哈希空间,解决了传统哈希算法在分布式存储中的“哈希雪崩”问题,确保文件分配稳定高效。文章还展示了Java实现代码,并展望了其在未来文件分享技术中的应用前景,如结合AI优化节点布局和区块链增强数据安全。
|
2月前
|
算法 安全 Java
Java线程调度揭秘:从算法到策略,让你面试稳赢!
在社招面试中,关于线程调度和同步的相关问题常常让人感到棘手。今天,我们将深入解析Java中的线程调度算法、调度策略,探讨线程调度器、时间分片的工作原理,并带你了解常见的线程同步方法。让我们一起破解这些面试难题,提升你的Java并发编程技能!
95 16
|
15天前
|
网络协议 测试技术 Linux
Golang 实现轻量、快速的基于 Reactor 模式的非阻塞 TCP 网络库
gev 是一个基于 epoll 和 kqueue 实现的高性能事件循环库,适用于 Linux 和 macOS(Windows 暂不支持)。它支持多核多线程、动态扩容的 Ring Buffer 读写缓冲区、异步读写和 SO_REUSEPORT 端口重用。gev 使用少量 goroutine,监听连接并处理读写事件。性能测试显示其在不同配置下表现优异。安装命令:`go get -u github.com/Allenxuxu/gev`。
|
15天前
|
缓存 网络协议 Java
JAVA网络IO之NIO/BIO
本文介绍了Java网络编程的基础与历史演进,重点阐述了IO和Socket的概念。Java的IO分为设备和接口两部分,通过流、字节、字符等方式实现与外部的交互。
|
2月前
|
存储 监控 算法
剖析基于Java算法驱动的智能局域网管控之道
本文探讨了基于Java语言的局域网控制方案,结合链表数据结构与令牌桶算法,解决设备管理和流量调度难题。通过链表灵活存储网络设备信息,实现高效设备管理;令牌桶算法则精准控制流量,确保网络平稳运行。二者相辅相成,为校园、企业等局域网提供稳固高效的控制体系,保障业务连续性和数据安全。
|
2月前
|
算法 搜索推荐 Java
【潜意识Java】深度解析黑马项目《苍穹外卖》与蓝桥杯算法的结合问题
本文探讨了如何将算法学习与实际项目相结合,以提升编程竞赛中的解题能力。通过《苍穹外卖》项目,介绍了订单配送路径规划(基于动态规划解决旅行商问题)和商品推荐系统(基于贪心算法)。这些实例不仅展示了算法在实际业务中的应用,还帮助读者更好地准备蓝桥杯等编程竞赛。结合具体代码实现和解析,文章详细说明了如何运用算法优化项目功能,提高解决问题的能力。
77 6
|
2月前
|
算法 Java C++
【潜意识Java】蓝桥杯算法有关的动态规划求解背包问题
本文介绍了经典的0/1背包问题及其动态规划解法。
58 5
|
8月前
|
网络协议 安全 Java
Java中的网络编程:Socket编程详解
Java中的网络编程:Socket编程详解