上一篇中,我们介绍了如何在mina中编写自己的日志过滤器,这一篇我们自己实现一个编解器。
实际应用当,很多应用系统应用的都不是标准的web service或XML等,比如象中国移动/联通/电信的短信网关程序,都有自己不同的协议实现,并且都是基于TCP/IP的字节流。Mina自带的编解码器实现了TextLineEncoder和TextLineDecoder,可以进行按行的字符串处理,对于象短信网关程序,就要自己实现编解码过滤器了。
我们定义一个简单的基于TCP/IP字节流的协议,实现在客户端和服务端之间的数据包传输。数据包MyProtocalPack有消息头和消息体组成,消息头包括:length(消息包的总长度,数据类型int),flag(消息包标志位,数据类型byte),消息体content是一个字符串,实际实现的时候按byte流处理。源代码如下:
package com.gftech.mytool.mina; import com.gftech.util.GFCommon; public class MyProtocalPack { private int length; private byte flag; private String content; public MyProtocalPack(){ } public MyProtocalPack(byte flag,String content){ this.flag=flag; this.content=content; int len1=content==null?0:content.getBytes().length; this.length=5+len1; } public MyProtocalPack(byte[] bs){ if(bs!=null && bs.length>=5){ length=GFCommon.bytes2int(GFCommon.bytesCopy(bs, 0, 4)); flag=bs[4]; content=new String(GFCommon.bytesCopy(bs, 5, length-5)); } } public int getLength() { return length; } public void setLength(int length) { this.length = length; } public byte getFlag() { return flag; } public void setFlag(byte flag) { this.flag = flag; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } public String toString(){ StringBuffer sb=new StringBuffer(); sb.append(" Len:").append(length); sb.append(" flag:").append(flag); sb.append(" content:").append(content); return sb.toString(); } }
回过头来,我们先看一下在MinaTimeServer中,如何使用一个文本的编解码过滤器,它是在过滤器链中添加了一个叫ProtocalCodecFilter的类,其中它调用了一个工厂方法TextLineCodecFactory的工厂类,创建具休的TextLineEncoder和TextLineDecoder编码和解码器。我们看一下具体的源代码:
acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("GBK"))));
package org.apache.mina.filter.codec.textline; import java.nio.charset.Charset; import org.apache.mina.core.buffer.BufferDataException; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFactory; import org.apache.mina.filter.codec.ProtocolDecoder; import org.apache.mina.filter.codec.ProtocolEncoder; /** * A {@link ProtocolCodecFactory} that performs encoding and decoding between * a text line data and a Java string object. This codec is useful especially * when you work with a text-based protocols such as SMTP and IMAP. * * @author The Apache MINA Project (dev@mina.apache.org) * @version $Rev$, $Date$ */ public class TextLineCodecFactory implements ProtocolCodecFactory { private final TextLineEncoder encoder; private final TextLineDecoder decoder; /** * Creates a new instance with the current default {@link Charset}. */ public TextLineCodecFactory() { this(Charset.defaultCharset()); } /** * Creates a new instance with the specified {@link Charset}. The * encoder uses a UNIX {@link LineDelimiter} and the decoder uses * the AUTO {@link LineDelimiter}. * * @param charset * The charset to use in the encoding and decoding */ public TextLineCodecFactory(Charset charset) { encoder = new TextLineEncoder(charset, LineDelimiter.UNIX); decoder = new TextLineDecoder(charset, LineDelimiter.AUTO); } /** * Creates a new instance of TextLineCodecFactory. This constructor * provides more flexibility for the developer. * * @param charset * The charset to use in the encoding and decoding * @param encodingDelimiter * The line delimeter for the encoder * @param decodingDelimiter * The line delimeter for the decoder */ public TextLineCodecFactory(Charset charset, String encodingDelimiter, String decodingDelimiter) { encoder = new TextLineEncoder(charset, encodingDelimiter); decoder = new TextLineDecoder(charset, decodingDelimiter); } /** * Creates a new instance of TextLineCodecFactory. This constructor * provides more flexibility for the developer. * * @param charset * The charset to use in the encoding and decoding * @param encodingDelimiter * The line delimeter for the encoder * @param decodingDelimiter * The line delimeter for the decoder */ public TextLineCodecFactory(Charset charset, LineDelimiter encodingDelimiter, LineDelimiter decodingDelimiter) { encoder = new TextLineEncoder(charset, encodingDelimiter); decoder = new TextLineDecoder(charset, decodingDelimiter); } public ProtocolEncoder getEncoder(IoSession session) { return encoder; } public ProtocolDecoder getDecoder(IoSession session) { return decoder; } /** * Returns the allowed maximum size of the encoded line. * If the size of the encoded line exceeds this value, the encoder * will throw a {@link IllegalArgumentException}. The default value * is {@link Integer#MAX_VALUE}. * <p> * This method does the same job with {@link TextLineEncoder#getMaxLineLength()}. */ public int getEncoderMaxLineLength() { return encoder.getMaxLineLength(); } /** * Sets the allowed maximum size of the encoded line. * If the size of the encoded line exceeds this value, the encoder * will throw a {@link IllegalArgumentException}. The default value * is {@link Integer#MAX_VALUE}. * <p> * This method does the same job with {@link TextLineEncoder#setMaxLineLength(int)}. */ public void setEncoderMaxLineLength(int maxLineLength) { encoder.setMaxLineLength(maxLineLength); } /** * Returns the allowed maximum size of the line to be decoded. * If the size of the line to be decoded exceeds this value, the * decoder will throw a {@link BufferDataException}. The default * value is <tt>1024</tt> (1KB). * <p> * This method does the same job with {@link TextLineDecoder#getMaxLineLength()}. */ public int getDecoderMaxLineLength() { return decoder.getMaxLineLength(); } /** * Sets the allowed maximum size of the line to be decoded. * If the size of the line to be decoded exceeds this value, the * decoder will throw a {@link BufferDataException}. The default * value is <tt>1024</tt> (1KB). * <p> * This method does the same job with {@link TextLineDecoder#setMaxLineLength(int)}. */ public void setDecoderMaxLineLength(int maxLineLength) { decoder.setMaxLineLength(maxLineLength); } }
TextLineFactory实现了ProtocalCodecFactory接口,该接口主要有一个编码的方法getEncoder()和一个解码的方法getDecoder():
package org.apache.mina.filter.codec; import org.apache.mina.core.session.IoSession; /** * Provides {@link ProtocolEncoder} and {@link ProtocolDecoder} which translates * binary or protocol specific data into message object and vice versa. * <p> * Please refer to * <a href="../../../../../xref-examples/org/apache/mina/examples/reverser/ReverseProtocolProvider.html" mce_href="xref-examples/org/apache/mina/examples/reverser/ReverseProtocolProvider.html"><code>ReverserProtocolProvider</code></a> * example. * * @author The Apache MINA Project (dev@mina.apache.org) * @version $Rev$, $Date$ */ public interface ProtocolCodecFactory { /** * Returns a new (or reusable) instance of {@link ProtocolEncoder} which * encodes message objects into binary or protocol-specific data. */ ProtocolEncoder getEncoder(IoSession session) throws Exception; /** * Returns a new (or reusable) instance of {@link ProtocolDecoder} which * decodes binary or protocol-specific data into message objects. */ ProtocolDecoder getDecoder(IoSession session) throws Exception; }
我们主要是仿照TextLineEncoder实现其中的encode()方法,仿照TextLineDecoder实现其中的decode()即可,它们分别实现了ProtocalEncoder和ProtocalDecoder接口。我们要编写三个类分别是:MyProtocalCodecFactory,MyProtocalEncoder,MyProtocalDecoder对应TextLineCodecFactory,TextLineEncoder,TextLineDecoder。
package com.gftech.mytool.mina; import java.nio.charset.Charset; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFactory; import org.apache.mina.filter.codec.ProtocolDecoder; import org.apache.mina.filter.codec.ProtocolEncoder; public class MyProtocalCodecFactory implements ProtocolCodecFactory { private final MyProtocalEncoder encoder; private final MyProtocalDecoder decoder; public MyProtocalCodecFactory(Charset charset) { encoder=new MyProtocalEncoder(charset); decoder=new MyProtocalDecoder(charset); } public ProtocolEncoder getEncoder(IoSession session) { return encoder; } public ProtocolDecoder getDecoder(IoSession session) { return decoder; } }
package com.gftech.mytool.mina; import java.nio.charset.Charset; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolEncoderAdapter; import org.apache.mina.filter.codec.ProtocolEncoderOutput; public class MyProtocalEncoder extends ProtocolEncoderAdapter { private final Charset charset; public MyProtocalEncoder(Charset charset) { this.charset = charset; } //在此处实现对MyProtocalPack包的编码工作,并把它写入输出流中 public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { MyProtocalPack value = (MyProtocalPack) message; IoBuffer buf = IoBuffer.allocate(value.getLength()); buf.setAutoExpand(true); buf.putInt(value.getLength()); buf.put(value.getFlag()); if (value.getContent() != null) buf.put(value.getContent().getBytes()); buf.flip(); out.write(buf); } public void dispose() throws Exception { } }
package com.gftech.mytool.mina; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.AttributeKey; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolDecoder; import org.apache.mina.filter.codec.ProtocolDecoderOutput; public class MyProtocalDecoder implements ProtocolDecoder { private final AttributeKey CONTEXT = new AttributeKey(getClass(), "context"); private final Charset charset; private int maxPackLength = 100; public MyProtocalDecoder() { this(Charset.defaultCharset()); } public MyProtocalDecoder(Charset charset) { this.charset = charset; } public int getMaxLineLength() { return maxPackLength; } public void setMaxLineLength(int maxLineLength) { if (maxLineLength <= 0) { throw new IllegalArgumentException("maxLineLength: " + maxLineLength); } this.maxPackLength = maxLineLength; } private Context getContext(IoSession session) { Context ctx; ctx = (Context) session.getAttribute(CONTEXT); if (ctx == null) { ctx = new Context(); session.setAttribute(CONTEXT, ctx); } return ctx; } public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { final int packHeadLength = 5; //先获取上次的处理上下文,其中可能有未处理完的数据 Context ctx = getContext(session); // 先把当前buffer中的数据追加到Context的buffer当中 ctx.append(in); //把position指向0位置,把limit指向原来的position位置 IoBuffer buf = ctx.getBuffer(); buf.flip(); // 然后按数据包的协议进行读取 while (buf.remaining() >= packHeadLength) { buf.mark(); // 读取消息头部分 int length = buf.getInt(); byte flag = buf.get(); //检查读取的包头是否正常,不正常的话清空buffer if (length<0 ||length > maxPackLength) { buf.clear(); break; } //读取正常的消息包,并写入输出流中,以便IoHandler进行处理 else if (length >= packHeadLength && length - packHeadLength <= buf.remaining()) { int oldLimit2 = buf.limit(); buf.limit(buf.position() + length - packHeadLength); String content = buf.getString(ctx.getDecoder()); buf.limit(oldLimit2); MyProtocalPack pack = new MyProtocalPack(flag, content); out.write(pack); } else { // 如果消息包不完整 // 将指针重新移动消息头的起始位置 buf.reset(); break; } } if (buf.hasRemaining()) { // 将数据移到buffer的最前面 IoBuffer temp = IoBuffer.allocate(maxPackLength).setAutoExpand(true); temp.put(buf); temp.flip(); buf.clear(); buf.put(temp); } else {// 如果数据已经处理完毕,进行清空 buf.clear(); } } public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception { } public void dispose(IoSession session) throws Exception { Context ctx = (Context) session.getAttribute(CONTEXT); if (ctx != null) { session.removeAttribute(CONTEXT); } } //记录上下文,因为数据触发没有规模,很可能只收到数据包的一半 //所以,需要上下文拼起来才能完整的处理 private class Context { private final CharsetDecoder decoder; private IoBuffer buf; private int matchCount = 0; private int overflowPosition = 0; private Context() { decoder = charset.newDecoder(); buf = IoBuffer.allocate(80).setAutoExpand(true); } public CharsetDecoder getDecoder() { return decoder; } public IoBuffer getBuffer() { return buf; } public int getOverflowPosition() { return overflowPosition; } public int getMatchCount() { return matchCount; } public void setMatchCount(int matchCount) { this.matchCount = matchCount; } public void reset() { overflowPosition = 0; matchCount = 0; decoder.reset(); } public void append(IoBuffer in) { getBuffer().put(in); } } }
在MyProtocalServer中,添加自己实现的Log4jFilter和编解码过滤器:
package com.gftech.mytool.mina; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.charset.Charset; import org.apache.log4j.Logger; import org.apache.log4j.PropertyConfigurator; import org.apache.mina.core.service.IoAcceptor; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; public class MyProtocalServer { private static final int PORT = 2500; static Logger logger = Logger.getLogger(MyProtocalServer.class); public static void main(String[] args) throws IOException { PropertyConfigurator.configure("conf//log4j.properties"); IoAcceptor acceptor = new NioSocketAcceptor(); Log4jFilter lf = new Log4jFilter(logger); acceptor.getFilterChain().addLast("logger", lf); acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MyProtocalCodecFactory(Charset.forName("GBK")))); acceptor.getSessionConfig().setReadBufferSize(1024); acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); acceptor.setHandler(new MyHandler()); acceptor.bind(new InetSocketAddress(PORT)); System.out.println("start server ..."); } } class MyHandler extends IoHandlerAdapter { static Logger logger = Logger.getLogger(MyHandler.class); @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { cause.printStackTrace(); } @Override public void messageReceived(IoSession session, Object message) throws Exception { MyProtocalPack pack=(MyProtocalPack)message; logger.debug("Rec:" + pack); } @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { logger.debug("IDLE " + session.getIdleCount(status)); } }
编写一个客户端程序进行测试:
package com.gftech.mytool.mina; import java.io.DataOutputStream; import java.net.Socket; public class MyProtocalClient { public static void main(String[] args) { try { Socket socket = new Socket("127.0.0.1", 2500); DataOutputStream out = new DataOutputStream( socket.getOutputStream() ) ; for (int i = 0; i < 1000; i++) { MyProtocalPack pack=new MyProtocalPack((byte)i,i+"测试MyProtocalaaaaaaaaaaaaaa"); out.writeInt(pack.getLength()); out.write(pack.getFlag()); out.write(pack.getContent().getBytes()); out.flush(); System.out.println(i + " sended"); } Thread.sleep(1000 ); } catch (Exception e) { e.printStackTrace(); } } }
也可以用IoConnector实现自己的客户端:
package com.gftech.mytool.mina; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.charset.Charset; import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.future.IoFutureListener; import org.apache.mina.core.service.IoConnector; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.transport.socket.nio.NioSocketConnector; public class MyProtocalClient2 { private static final String HOST = "192.168.10.8"; private static final int PORT = 2500; static long counter = 0; final static int FC1 = 100; static long start = 0; /** * 使用Mina的框架结构进行测试 * * @param args */ public static void main(String[] args) throws IOException { start = System.currentTimeMillis(); IoConnector connector = new NioSocketConnector(); connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MyProtocalCodecFactory(Charset.forName("GBK")))); connector.setHandler(new TimeClientHandler2()); connector.getSessionConfig().setReadBufferSize(100); connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); ConnectFuture connFuture = connector.connect(new InetSocketAddress(HOST, PORT)); connFuture.addListener(new IoFutureListener<ConnectFuture>() { public void operationComplete(ConnectFuture future) { try { if (future.isConnected()) { IoSession session = future.getSession(); sendData(session); } else { System.out.println("连接不存在 "); } } catch (Exception e) { e.printStackTrace(); } } }); System.out.println("start client ..."); } public static void sendData(IoSession session) throws IOException { for (int i = 0; i < FC1; i++) { String content = "afdjkdafk张新波测试" + i; MyProtocalPack pack = new MyProtocalPack((byte) i, content); session.write(pack); System.out.println("send data:" + pack); } } } class TimeClientHandler2 extends IoHandlerAdapter { @Override public void sessionOpened(IoSession session) { // Set reader idle time to 10 seconds. // sessionIdle(...) method will be invoked when no data is read // for 10 seconds. session.getConfig().setIdleTime(IdleStatus.READER_IDLE, 60); } @Override public void sessionClosed(IoSession session) { // Print out total number of bytes read from the remote peer. System.err.println("Total " + session.getReadBytes() + " byte(s)"); } @Override public void sessionIdle(IoSession session, IdleStatus status) { // Close the connection if reader is idle. if (status == IdleStatus.READER_IDLE) { session.close(true); } } @Override public void messageReceived(IoSession session, Object message) { MyProtocalPack pack = (MyProtocalPack) message; System.out.println("rec:" + pack); } }