基于java.nio.channels的编程实践-I

简介:

服务端代码

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NIOSocketServer extends Thread {
	private static final Logger LOG = LoggerFactory
			.getLogger(NIOSocketServer.class);
	private static final String CHARSET = "UTF-8";
	private static final int BUFFER_SIZE = 1024;
	private static final int FAIL_TRY_NUM = 3;

	private Selector selector;
	private ServerSocketChannel ssc;
	private static NIOSocketServer server;

	/**
	 * 程序入口
	 * 
	 * @param args
	 */
	public static void main(String[] args) {
		server = new NIOSocketServer();
		try {
			// server.setDaemon(true);
			server.initServer();
			server.start();
		} catch (Exception e) {
			// 如果出现异常,则直接关闭客户端
			server.stopServer();
			System.exit(1);
		}
	}

	@Override
	public void run() {
		int failNum = 0;
		while (true) {
			try {
				int select = selector.select();
				if (select > 0) {
					Set<SelectionKey> keys = selector.selectedKeys();
					Iterator<SelectionKey> iter = keys.iterator();
					while (iter.hasNext()) {
						SelectionKey key = iter.next();
						if (key.isAcceptable()) {
							doAcceptable(key);
						}
						if (key.isWritable()) {
							doWriteMessage(key);
						}
						if (key.isReadable()) {
							doReadMessage(key);
						}
						if (key.isConnectable()) {
							doConnectable(key);
						}
						iter.remove();
					}
				}
			} catch (Exception e) {
				failNum++;
				if (failNum > FAIL_TRY_NUM) {
					server.stopServer();
				}
			}
		}

	}

	/**
	 * 初始化服务器端程序,开始监听端口
	 * 
	 * @throws IOException
	 */
	private void initServer() throws IOException {
		selector = Selector.open();
		ssc = ServerSocketChannel.open();
		ssc.configureBlocking(false);
		ssc.socket().bind(new InetSocketAddress(2181));
		ssc.register(selector, SelectionKey.OP_ACCEPT);
	}

	/**
	 * 停止服务器
	 * 
	 * @throws IOException
	 */
	private void stopServer() {
		try {
			if (selector != null && selector.isOpen()) {
				selector.close();
			}
			if (ssc != null && ssc.isOpen()) {
				ssc.close();
			}
		} catch (IOException e) {
			LOG.info("关闭服务端失败:" + e.getMessage());
		}
	}

	/**
	 * 对新的客户端连接进行处理
	 * 
	 * @param key
	 * @throws IOException
	 */
	private void doAcceptable(SelectionKey keythrows IOException {
		ServerSocketChannel tmpSsc = (ServerSocketChannel) key.channel();
		SocketChannel ss = tmpSsc.accept();
		ss.configureBlocking(false);
		ss.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);

	}

	/**
	 * 已连接
	 * 
	 * @param key
	 */
	private void doConnectable(SelectionKey key) {
		LOG.info("connect is ok");
	}

	/**
	 * 写消息到客户端
	 * 
	 * @param key
	 * @throws IOException
	 */
	private void doWriteMessage(SelectionKey keythrows Exception {
		SocketChannel sc = (SocketChannel) key.channel();
		ByteBuffer buffer = ByteBuffer.wrap("server write msg to client"
				.getBytes(CHARSET));
		while (buffer.hasRemaining()) {
			sc.write(buffer);
		}
		TimeUnit.SECONDS.sleep(1);
	}

	/**
	 * @param key
	 * @throws IOException
	 */
	private void doReadMessage(SelectionKey keythrows Exception {
		SocketChannel sc = (SocketChannel) key.channel();
		ByteBuffer bb = ByteBuffer.allocate(BUFFER_SIZE);
		int read = sc.read(bb);
		while (read > 0) {
			bb.flip();
			byte[] barr = new byte[bb.limit()];
			bb.get(barr);
			LOG.info("server read msg from client:" + new String(barr, CHARSET));
			bb.clear();
			read = sc.read(bb);
		}
		TimeUnit.SECONDS.sleep(1);
	}

}

客户端代码

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NIOSocketClient extends Thread {
	private static final Logger LOG = LoggerFactory
			.getLogger(NIOSocketClient.class);
	private static final String CHARSET = "UTF-8";
	private static final int BUFFER_SIZE = 1024;
	private static final int FAIL_TRY_NUM = 3;

	private SocketChannel socketChannel;
	private Selector selector;
	private static NIOSocketClient client;

	/**
	 * 程序入口
	 * 
	 * @param args
	 */
	public static void main(String[] args) {
		client = new NIOSocketClient();
		try {
			client.initClient();
			client.start();
			// client.setDaemon(true);catch (Exception e) {
			// 如果出现异常,则直接关闭客户端
			client.close();
		}
	}

	public void run() {
		int failNum = 0;
		while (true) {
			try {
				writeMessage();
				int select = selector.select();
				System.out.println(select);
				if (select > 0) {
					Set<SelectionKey> keys = selector.selectedKeys();
					Iterator<SelectionKey> iter = keys.iterator();
					while (iter.hasNext()) {
						SelectionKey sk = iter.next();
						if (sk.isReadable()) {
							readMessage(sk);
						}
						iter.remove();
					}
				}
			} catch (Exception e) {
				// 如果出现三次以上的异常,则关闭客户端
				failNum++;
				if (failNum > FAIL_TRY_NUM) {
					client.close();
					System.exit(1);
				}
			}
		}
	}

	public void readMessage(SelectionKey sk) throws Exception,
			UnsupportedEncodingException {
		SocketChannel curSc = (SocketChannel) sk.channel();
		ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
		while (curSc.read(buffer) > 0) {
			buffer.flip();
			LOG.info("read message from server:"new String(buffer.array(), CHARSET));
			buffer.clear();
		}
		TimeUnit.SECONDS.sleep(1);
	}

	public void writeMessage() throws Exception {
		String ss = "client write msg to server";
		ByteBuffer buffer = ByteBuffer.wrap(ss.getBytes(CHARSET));
		while (buffer.hasRemaining()) {
			socketChannel.write(buffer);
		}

		TimeUnit.SECONDS.sleep(1);
	}

	public void initClient() throws IOException, ClosedChannelException {
		InetSocketAddress addr = new InetSocketAddress(2181);
		socketChannel = SocketChannel.open();

		selector = Selector.open();
		socketChannel.configureBlocking(false);
		socketChannel.register(selector, SelectionKey.OP_READ);

		// 连接到server
		socketChannel.connect(addr);

		while (!socketChannel.finishConnect()) {
			LOG.info("check finish connection");
		}
	}

	/**
	 * 停止客户端
	 */
	private void close() {
		try {
			if (selector != null && selector.isOpen()) {
				selector.close();
			}
			if (socketChannel != null && socketChannel.isOpen()) {
				socketChannel.close();
			}
		} catch (IOException e) {
			LOG.info("关闭客户端失败:" + e.getMessage());
		}
	}

}
目录
相关文章
|
1天前
|
缓存 Java 数据库
Java并发编程学习11-任务执行演示
【5月更文挑战第4天】本篇将结合任务执行和 Executor 框架的基础知识,演示一些不同版本的任务执行Demo,并且每个版本都实现了不同程度的并发性。
20 4
Java并发编程学习11-任务执行演示
|
2天前
|
存储 安全 Java
12条通用编程原则✨全面提升Java编码规范性、可读性及性能表现
12条通用编程原则✨全面提升Java编码规范性、可读性及性能表现
|
2天前
|
缓存 Java 程序员
关于创建、销毁对象⭐Java程序员需要掌握的8个编程好习惯
关于创建、销毁对象⭐Java程序员需要掌握的8个编程好习惯
关于创建、销毁对象⭐Java程序员需要掌握的8个编程好习惯
|
2天前
|
缓存 Java 数据库
Java并发编程中的锁优化策略
【5月更文挑战第9天】 在高负载的多线程应用中,Java并发编程的高效性至关重要。本文将探讨几种常见的锁优化技术,旨在提高Java应用程序在并发环境下的性能。我们将从基本的synchronized关键字开始,逐步深入到更高效的Lock接口实现,以及Java 6引入的java.util.concurrent包中的高级工具类。文中还会介绍读写锁(ReadWriteLock)的概念和实现原理,并通过对比分析各自的优势和适用场景,为开发者提供实用的锁优化策略。
3 0
|
3天前
|
JavaScript 小程序 Java
基于java的少儿编程网上报名系统
基于java的少儿编程网上报名系统
11 2
|
3天前
|
存储 安全 算法
掌握Java并发编程:Lock、Condition与并发集合
掌握Java并发编程:Lock、Condition与并发集合
11 0
|
3天前
|
Java 测试技术 图形学
掌握Java GUI编程基础知识
掌握Java GUI编程基础知识
7 0
|
3天前
|
SQL Java 数据库连接
Java数据库编程实践:连接与操作数据库
Java数据库编程实践:连接与操作数据库
9 0
|
3天前
|
安全 Java 程序员
深入探索Java泛型编程
深入探索Java泛型编程
7 0
|
3天前
|
缓存 算法 Java
Java本地高性能缓存实践
本篇博文将首先介绍常见的本地缓存技术,对本地缓存有个大概的了解;其次介绍本地缓存中号称性能最好的Cache,可以探讨看看到底有多好?怎么做到这么好?最后通过几个实战样例,在日常工作中应用高性能的本地缓存。