NIO-Socket通讯,为我们解决了server端多线程设计方面的性能/吞吐量等多方面的问题,它提供了以非阻塞模式 + 线程池的方式来解决Server端高并发问题..NIO并不能显著的提升Client-server的通讯性能(其中包括全局性耗时总和,Server物理机资源开销和实际计算量),但是它可以确保Server端在支撑相应的并发量情况下,对物理资源的使用处于可控状态.对于开发者而言,NIO合理的使用了平台(OS/VM/Http协议)的特性并提供了高效的便捷的编程级别的API.
为了展示,NIO交互的基本特性,我们模拟了一个简单的场景:Client端向server端建立连接,并持续交付大量数据,Server负载client的数据传输和处理.此程序实例并没有太多的关注异常处理和业务性处理,也没有使用线程池作为server端socket句柄管理,不过你可以简单的修改代码也实现它.
- TestMain.java:引导类
- ClientControllor.java:client连接处理类,负责队列化数据提交,并负责维护socket句柄.
- Packet.java:对于读取或者写入的buffer,进行二次封装,使其具有更好的可读性.
- ServerControllor.java:server端连接处理类,负责接收连接和数据处理
- ServerHandler.java:server端连接维护类.
TestMain.java:
- package com.test.web;
- public class TestMain {
- /**
- * @param args
- */
- public static void main(String[] args) throws Exception{
- int port = 30008;
- ServerControllor sc = new ServerControllor(port);
- sc.start();
- Thread.sleep(2000);
- ClientControllor cc = new ClientControllor("127.0.0.1", port);
- cc.start();
- Packet p1 = Packet.wrap("Hello,I am first!");
- cc.put(p1);
- Packet p2 = Packet.wrap("Hello,I am second!");
- cc.put(p2);
- Packet p3 = Packet.wrap("Hello,I am thread!");
- cc.put(p3);
- }
- }
ClientControllor.java
- package com.test.web;
- import java.net.InetSocketAddress;
- import java.net.SocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.SocketChannel;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.zip.Adler32;
- import java.util.zip.Checksum;
- public class ClientControllor {
- private BlockingQueue<Packet> inner = new LinkedBlockingQueue<Packet>(100);//no any more
- private Object lock = new Object();
- private InetSocketAddress remote;
- private Thread thread = new ClientThread(remote);
- public ClientControllor(String host,int port){
- remote = new InetSocketAddress(host, port);
- }
- public void start(){
- if(thread.isAlive() || remote == null){
- return;
- }
- synchronized (lock) {
- thread.start();
- }
- }
- public boolean put(Packet packet){
- return inner.offer(packet);
- }
- public void clear(){
- inner.clear();
- }
- class ClientThread extends Thread {
- SocketAddress remote;
- SocketChannel channel;
- ClientThread(SocketAddress remote){
- this.remote = remote;
- }
- @Override
- public void run(){
- try{
- try{
- channel = SocketChannel.open();
- channel.configureBlocking(true);
- boolean isSuccess = channel.connect(new InetSocketAddress(30008));
- if(!isSuccess){
- while(!channel.finishConnect()){
- System.out.println("Client is connecting...");
- }
- }
- System.out.println("Client is connected.");
- // Selector selector = Selector.open();
- // channel.register(selector, SelectionKey.OP_WRITE);
- // while(selector.isOpen()){
- // selector.select();
- // Iterator<SelectionKey> it = selector.selectedKeys().iterator();
- // while(it.hasNext()){
- // SelectionKey key = it.next();
- // it.remove();
- // if(!key.isValid()){
- // continue;
- // }
- // if(key.isWritable()){
- // write();
- // }
- // }
- // }
- while(channel.isOpen()){
- write();
- }
- }catch(Exception e){
- e.printStackTrace();
- }finally{
- if(channel != null){
- try{
- channel.close();
- }catch(Exception ex){
- ex.printStackTrace();
- }
- }
- }
- }catch(Exception e){
- e.printStackTrace();
- inner.clear();
- }
- }
- private void write() throws Exception{
- Packet packet = inner.take();
- synchronized (lock) {
- ByteBuffer body = packet.getBuffer();//
- ByteBuffer head = ByteBuffer.allocate(4);
- head.putInt(body.limit());
- head.flip();
- while(head.hasRemaining()){
- channel.write(head);
- }
- Checksum checksum = new Adler32();
- while(body.hasRemaining()){
- checksum.update(body.get());
- }
- body.rewind();
- while(body.hasRemaining()){
- channel.write(body);
- }
- long cks = checksum.getValue();
- ByteBuffer tail = ByteBuffer.allocate(8);
- tail.putLong(cks);
- tail.flip();
- while(tail.hasRemaining()){
- channel.write(tail);
- }
- }
- }
- }
- }
Handler.java(接口,面向设计):
- package com.test.web;
- import java.nio.channels.SocketChannel;
- public interface Handler {
- public void handle(SocketChannel channel);
- }
Packet.java
- package com.test.web;
- import java.io.Serializable;
- import java.nio.ByteBuffer;
- import java.nio.charset.Charset;
- public class Packet implements Serializable {
- /**
- *
- */
- private static final long serialVersionUID = 7719389291885063462L;
- private ByteBuffer buffer;
- private static Charset charset = Charset.defaultCharset();
- private Packet(ByteBuffer buffer){
- this.buffer = buffer;
- }
- public String getDataAsString(){
- return charset.decode(buffer).toString();
- }
- public byte[] getData(){
- return buffer.array();
- }
- public ByteBuffer getBuffer(){
- return this.buffer;
- }
- public static Packet wrap(ByteBuffer buffer){
- return new Packet(buffer);
- }
- public static Packet wrap(String data){
- ByteBuffer source = charset.encode(data);
- return new Packet(source);
- }
- }
ServerControllor.java
- package com.test.web;
- import java.net.InetSocketAddress;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.ServerSocketChannel;
- import java.nio.channels.SocketChannel;
- import java.util.Iterator;
- public class ServerControllor {
- private int port;
- private Thread thread = new ServerThread();;
- private Object lock = new Object();
- public ServerControllor(){
- this(0);
- }
- public ServerControllor(int port){
- this.port = port;
- }
- public void start(){
- if(thread.isAlive()){
- return;
- }
- synchronized (lock) {
- thread.start();
- System.out.println("Server starting....");
- }
- }
- class ServerThread extends Thread {
- private static final int TIMEOUT = 3000;
- private ServerHandler handler = new ServerHandler();
- @Override
- public void run(){
- try{
- ServerSocketChannel channel = null;
- try{
- channel = ServerSocketChannel.open();
- channel.configureBlocking(false);
- channel.socket().setReuseAddress(true);
- channel.socket().bind(new InetSocketAddress(port));
- Selector selector = Selector.open();
- channel.register(selector, SelectionKey.OP_ACCEPT);
- while(selector.isOpen()){
- System.out.println("Server is running,port:" + channel.socket().getLocalPort());
- if(selector.select(TIMEOUT) == 0){
- continue;
- }
- Iterator<SelectionKey> it = selector.selectedKeys().iterator();
- while(it.hasNext()){
- SelectionKey key = it.next();
- it.remove();
- if(!key.isValid()){
- continue;
- }
- if(key.isAcceptable()){
- accept(key);
- }else if(key.isReadable()){
- read(key);
- }
- }
- }
- }catch(Exception e){
- e.printStackTrace();
- }finally{
- if(channel != null){
- try{
- channel.close();
- }catch(Exception ex){
- ex.printStackTrace();
- }
- }
- }
- }catch(Exception e){
- e.printStackTrace();
- }
- }
- private void accept(SelectionKey key) throws Exception{
- SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept();
- socketChannel.configureBlocking(true);
- //socketChannel.register(key.selector(), SelectionKey.OP_READ);
- handler.handle(socketChannel);
- }
- private void read(SelectionKey key) throws Exception{
- SocketChannel channel = (SocketChannel)key.channel();
- //handler.handle(channel);
- }
- }
- }
ServerHandler.java
- package com.test.web;
- import java.nio.ByteBuffer;
- import java.nio.channels.SocketChannel;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.concurrent.Semaphore;
- import java.util.zip.Adler32;
- import java.util.zip.Checksum;
- class ServerHandler implements Handler {
- private static Semaphore semaphore = new Semaphore(Runtime.getRuntime().availableProcessors() + 1);
- private static Map<SocketChannel,Thread> holder = new HashMap<SocketChannel,Thread>(32);
- @Override
- public void handle(SocketChannel channel) {
- synchronized (holder) {
- if(holder.containsKey(channel)){
- return;
- }
- Thread t = new ReadThread(channel);
- holder.put(channel, t);
- t.start();
- }
- }
- static class ReadThread extends Thread{
- SocketChannel channel;
- ReadThread(SocketChannel channel){
- this.channel = channel;
- }
- @Override
- public void run(){
- try{
- semaphore.acquire();
- boolean eof = false;
- while(channel.isOpen()){
- //ByteBuffer byteBuffer = new ByteBuffer(1024);
- ByteBuffer head = ByteBuffer.allocate(4);//int for data-size
- while(true){
- int cb = channel.read(head);
- if(cb == -1){
- throw new RuntimeException("EOF error,data lost!");
- }
- if(isFull(head)){
- break;
- }
- }
- head.flip();
- int dataSize = head.getInt();
- if(dataSize <= 0){
- throw new RuntimeException("Data format error,something lost???");
- }
- ByteBuffer body = ByteBuffer.allocate(dataSize);
- while(true){
- int cb = channel.read(body);
- if(cb == -1){
- throw new RuntimeException("EOF error,data lost!");
- }else if(cb == 0 && this.isFull(body)){
- break;
- }
- }
- ByteBuffer tail = ByteBuffer.allocate(8);//int for data-size
- while(true){
- int cb = channel.read(tail);
- if(cb == -1){
- eof = true;
- }
- if(isFull(tail)){
- break;
- }
- }
- tail.flip();
- long sck = tail.getLong();
- Checksum checksum = new Adler32();
- checksum.update(body.array(), 0, dataSize);
- long cck = checksum.getValue();
- if(sck != cck){
- throw new RuntimeException("Sorry,some data lost or be modified,please check!");
- }
- body.flip();
- Packet packet = Packet.wrap(body);
- System.out.println(packet.getDataAsString());
- if(eof){
- break;
- }
- }
- }catch(Exception e){
- e.printStackTrace();
- }finally{
- if(channel != null){
- try{
- channel.close();
- }catch(Exception ex){
- ex.printStackTrace();
- }
- }
- holder.remove(channel);
- semaphore.release();
- }
- }
- private boolean isFull(ByteBuffer byteBuffer){
- return byteBuffer.position() == byteBuffer.capacity() ? true : false;
- }
- }
- }
原文链接:[http://wely.iteye.com/blog/2227865]