IoFilter:过滤器层
这里我们做一个解码的编码的过滤层,这也是mina中最常用的。首先我们需要定义属于我们自己的协议,也就是数据包的格式:别以为这很复杂,其实很简单的。
我们知道数据都是字节类型的,那么我们的协议格式如下:前两位表示数据包的长度(一个short类型正好两个字节),第三位是闲置位,后面的是数据。长度是闲置位和
数据长度的和。这样我们就可以根据前两位确定,我们的数据包到那里结束。那么我们循环这么读,就会取得所有的数据包。是不是很简单啊,这个格式就是我们的协议。
为了更简单,这里我们客户端发往服务端的数据进行编码和解码,服务端发往客户端的就不编码了,客户端也就不用解码。服务端使用mina,客户端我们就使用基本的socket nio。
编码工厂类:
public class CodecFactory extends DemuxingProtocolCodecFactory{ public CodecFactory(){ super.addMessageEncoder(String.class, Encoder.class); super.addMessageDecoder(Decoder.class); } }
解码类:
import java.util.ArrayList; import java.util.List; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolDecoderOutput; import org.apache.mina.filter.codec.demux.MessageDecoder; import org.apache.mina.filter.codec.demux.MessageDecoderResult; public class Decoder implements MessageDecoder { private byte[] r_curPkg = null; private int r_pos = -1; // 包计数器 static private final int PKG_SIZE_BYTES = 2;//包长度 public Decoder() { } @Override public MessageDecoderResult decodable(IoSession session, IoBuffer in) { return MessageDecoderResult.OK; } @Override public MessageDecoderResult decode(IoSession session, IoBuffer in,ProtocolDecoderOutput out) throws Exception { List<String> list = new ArrayList<String>(); while (in.remaining() >= PKG_SIZE_BYTES || (r_pos >= 0 && in.hasRemaining())) {// 循环接收包,4为一个整型,表示包长度b, 如果上一个包未接收完成时,继续接收 // 如果上个包已收完整,则创建新的包 if (r_pos == -1) { //得到下一个包的长度,长度不包括前两位,即包的长度=压缩位长度+数据位长度 int pkgLen = in.getShort(); //如果包长度小于0,那么此包错误,解码失败,返回。 if (pkgLen < 0) { return MessageDecoderResult.NOT_OK; } in.get(); r_curPkg = new byte[pkgLen-1]; //数组长度为数据长度 r_pos = 0; } int need = r_curPkg.length - r_pos; //需要读取的数据长度 int length = in.remaining();//缓冲区中可读的数据长度 if (length >= need) {// 可以把当前包读完整 in.get(r_curPkg, r_pos, need); // 复制缓冲区中的数据到r_curPkg中 // 处理接收到一个完整的包数据后,把包添加到池中,判断是否需要需要解压 byte[] data = r_curPkg; String str = new String(data); list.add(str); r_curPkg = null; r_pos = -1; } else { // 如果剩下的字节数,不够一个包则 int remainBytes = in.remaining(); in.get(r_curPkg, r_pos, remainBytes); r_pos += remainBytes; return MessageDecoderResult.NEED_DATA; } } for (String protocol : list) { out.write(protocol); } return MessageDecoderResult.OK; } @Override public void finishDecode(IoSession session, ProtocolDecoderOutput out) { } }
编码类:(没有进行编码,只进行了数据发送)
import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolEncoderOutput; import org.apache.mina.filter.codec.demux.MessageEncoder; public class Encoder implements MessageEncoder<String>{ public Encoder(){ } @Override public void encode(IoSession session, String message, ProtocolEncoderOutput out) throws Exception { System.out.println("encode.................."); String value = (String) message; IoBuffer buf = IoBuffer.allocate(value.getBytes().length); buf.setAutoExpand(true); if (value != null){ buf.put(value.trim().getBytes()); } buf.flip(); out.write(buf); out.flush(); } }
IoService层:
import java.io.IOException; import java.net.InetSocketAddress; import org.apache.mina.core.service.IoAcceptor; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; public class MinaServer { private static final int PORT = 9123; public static void main(String [] args) throws IOException{ IoAcceptor acceptor = new NioSocketAcceptor(); acceptor.getFilterChain().addLast( "logger", new LoggingFilter() ); acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new CodecFactory())); acceptor.setHandler(new ServerHandler()); acceptor.getSessionConfig().setReadBufferSize( 3 ); acceptor.getSessionConfig().setIdleTime( IdleStatus.BOTH_IDLE, 10 ); acceptor.bind( new InetSocketAddress(PORT) ); } }
到这里我们的服务端代码就写完了,
客户端实现
<span style="font-size:12px">public class SocketClient { public static void main(String...args)throws Exception{ SocketChannel socketChannel = SocketChannel.open(); socketChannel.connect(new InetSocketAddress("localhost",9123)); byte [] bytes = "aaaa".getBytes(); //对数据包进行编码 ByteBuffer buffer = ByteBuffer.allocate(bytes.length+3); buffer.putShort((short)(bytes.length+1)); //包长度 buffer.put((byte)1);//闲置位 buffer.put(bytes);//数据 buffer.flip(); socketChannel.write(buffer); socketChannel.socket().shutdownOutput(); String obj = receive(socketChannel); System.out.println(obj); } private static String receive(SocketChannel socketChannel)throws Exception{ ByteBuffer buffer = ByteBuffer.allocate(1024); ByteArrayOutputStream baos = new ByteArrayOutputStream(); int size = 0; byte [] bytes = null; while((size = socketChannel.read(buffer))>=0){ buffer.flip(); bytes = new byte[size]; buffer.get(bytes); baos.write(bytes); buffer.clear(); } bytes = baos.toByteArray(); baos.close(); return new String(bytes); } } </span>