Apache Mina使用手记(四)

简介: Apache Mina使用手记(四) 分类: JAVA 2009-03-13 20:46 13809人阅读 评论(19) 收藏 举报 minaapacheexceptionsessionstringbyte 上一篇中,我们介绍了如何在mina中编写自己的日志过滤器,这一篇我们自己实现一个编解器。

Apache Mina使用手记(四)

分类: JAVA 13809人阅读 评论(19) 收藏 举报

上一篇中,我们介绍了如何在mina中编写自己的日志过滤器,这一篇我们自己实现一个编解器。

实际应用当,很多应用系统应用的都不是标准的web service或XML等,比如象中国移动/联通/电信的短信网关程序,都有自己不同的协议实现,并且都是基于TCP/IP的字节流。Mina自带的编解码器实现了TextLineEncoder和TextLineDecoder,可以进行按行的字符串处理,对于象短信网关程序,就要自己实现编解码过滤器了。

我们定义一个简单的基于TCP/IP字节流的协议,实现在客户端和服务端之间的数据包传输。数据包MyProtocalPack有消息头和消息体组成,消息头包括:length(消息包的总长度,数据类型int),flag(消息包标志位,数据类型byte),消息体content是一个字符串,实际实现的时候按byte流处理。源代码如下:

package com.gftech.mytool.mina;
import com.gftech.util.GFCommon;
public class MyProtocalPack {
    private int length;
    private byte flag;
    private String content;
    
    public MyProtocalPack(){
        
    }
    
    public MyProtocalPack(byte flag,String content){
        this.flag=flag;
        this.content=content;
        int len1=content==null?0:content.getBytes().length;
        this.length=5+len1;
    }
    
    public MyProtocalPack(byte[] bs){
        if(bs!=null && bs.length>=5){
            length=GFCommon.bytes2int(GFCommon.bytesCopy(bs, 0, 4));
            flag=bs[4];
            content=new String(GFCommon.bytesCopy(bs, 5, length-5));
        }
    }
    
    public int getLength() {
        return length;
    }
    public void setLength(int length) {
        this.length = length;
    }
    public byte getFlag() {
        return flag;
    }
    public void setFlag(byte flag) {
        this.flag = flag;
    }
    public String getContent() {
        return content;
    }
    public void setContent(String content) {
        this.content = content;
    }
    
    public String toString(){
        StringBuffer sb=new StringBuffer();
        sb.append(" Len:").append(length);
        sb.append(" flag:").append(flag);
        sb.append(" content:").append(content);
        return sb.toString();
    }
}

回过头来,我们先看一下在MinaTimeServer中,如何使用一个文本的编解码过滤器,它是在过滤器链中添加了一个叫ProtocalCodecFilter的类,其中它调用了一个工厂方法TextLineCodecFactory的工厂类,创建具休的TextLineEncoder和TextLineDecoder编码和解码器。我们看一下具体的源代码:

acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("GBK"))));

package org.apache.mina.filter.codec.textline;
import java.nio.charset.Charset;
import org.apache.mina.core.buffer.BufferDataException;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;
/**
 * A {@link ProtocolCodecFactory} that performs encoding and decoding between
 * a text line data and a Java string object.  This codec is useful especially
 * when you work with a text-based protocols such as SMTP and IMAP.
 *
 * @author The Apache MINA Project (dev@mina.apache.org)
 * @version $Rev$, $Date$
 */
public class TextLineCodecFactory implements ProtocolCodecFactory {
    private final TextLineEncoder encoder;
    private final TextLineDecoder decoder;
    /**
     * Creates a new instance with the current default {@link Charset}.
     */
    public TextLineCodecFactory() {
        this(Charset.defaultCharset());
    }
    /**
     * Creates a new instance with the specified {@link Charset}.  The
     * encoder uses a UNIX {@link LineDelimiter} and the decoder uses
     * the AUTO {@link LineDelimiter}.
     *
     * @param charset
     *  The charset to use in the encoding and decoding
     */
    public TextLineCodecFactory(Charset charset) {
        encoder = new TextLineEncoder(charset, LineDelimiter.UNIX);
        decoder = new TextLineDecoder(charset, LineDelimiter.AUTO);
    }
    /**
     * Creates a new instance of TextLineCodecFactory.  This constructor
     * provides more flexibility for the developer.
     *
     * @param charset
     *  The charset to use in the encoding and decoding
     * @param encodingDelimiter
     *  The line delimeter for the encoder
     * @param decodingDelimiter
     *  The line delimeter for the decoder
     */
    public TextLineCodecFactory(Charset charset,
            String encodingDelimiter, String decodingDelimiter) {
        encoder = new TextLineEncoder(charset, encodingDelimiter);
        decoder = new TextLineDecoder(charset, decodingDelimiter);
    }
    /**
     * Creates a new instance of TextLineCodecFactory.  This constructor
     * provides more flexibility for the developer.
     *
     * @param charset
     *  The charset to use in the encoding and decoding
     * @param encodingDelimiter
     *  The line delimeter for the encoder
     * @param decodingDelimiter
     *  The line delimeter for the decoder
     */
    public TextLineCodecFactory(Charset charset,
            LineDelimiter encodingDelimiter, LineDelimiter decodingDelimiter) {
        encoder = new TextLineEncoder(charset, encodingDelimiter);
        decoder = new TextLineDecoder(charset, decodingDelimiter);
    }
    public ProtocolEncoder getEncoder(IoSession session) {
        return encoder;
    }
    public ProtocolDecoder getDecoder(IoSession session) {
        return decoder;
    }
       /**
     * Returns the allowed maximum size of the encoded line.
     * If the size of the encoded line exceeds this value, the encoder
     * will throw a {@link IllegalArgumentException}.  The default value
     * is {@link Integer#MAX_VALUE}.
     * <p>
     * This method does the same job with {@link TextLineEncoder#getMaxLineLength()}.
     */
    public int getEncoderMaxLineLength() {
        return encoder.getMaxLineLength();
    }
    /**
     * Sets the allowed maximum size of the encoded line.
     * If the size of the encoded line exceeds this value, the encoder
     * will throw a {@link IllegalArgumentException}.  The default value
     * is {@link Integer#MAX_VALUE}.
     * <p>
     * This method does the same job with {@link TextLineEncoder#setMaxLineLength(int)}.
     */
    public void setEncoderMaxLineLength(int maxLineLength) {
        encoder.setMaxLineLength(maxLineLength);
    }
    /**
     * Returns the allowed maximum size of the line to be decoded.
     * If the size of the line to be decoded exceeds this value, the
     * decoder will throw a {@link BufferDataException}.  The default
     * value is <tt>1024</tt> (1KB).
     * <p>
     * This method does the same job with {@link TextLineDecoder#getMaxLineLength()}.
     */
    public int getDecoderMaxLineLength() {
        return decoder.getMaxLineLength();
    }
    /**
     * Sets the allowed maximum size of the line to be decoded.
     * If the size of the line to be decoded exceeds this value, the
     * decoder will throw a {@link BufferDataException}.  The default
     * value is <tt>1024</tt> (1KB).
     * <p>
     * This method does the same job with {@link TextLineDecoder#setMaxLineLength(int)}.
     */
    public void setDecoderMaxLineLength(int maxLineLength) {
        decoder.setMaxLineLength(maxLineLength);
    }
}

TextLineFactory实现了ProtocalCodecFactory接口,该接口主要有一个编码的方法getEncoder()和一个解码的方法getDecoder():


package org.apache.mina.filter.codec;
import org.apache.mina.core.session.IoSession;
/**
 * Provides {@link ProtocolEncoder} and {@link ProtocolDecoder} which translates
 * binary or protocol specific data into message object and vice versa.
 * <p>
 * Please refer to
 * <a href="../../../../../xref-examples/org/apache/mina/examples/reverser/ReverseProtocolProvider.html" mce_href="xref-examples/org/apache/mina/examples/reverser/ReverseProtocolProvider.html"><code>ReverserProtocolProvider</code></a>
 * example.
 *
 * @author The Apache MINA Project (dev@mina.apache.org)
 * @version $Rev$, $Date$
 */
public interface ProtocolCodecFactory {
    /**
     * Returns a new (or reusable) instance of {@link ProtocolEncoder} which
     * encodes message objects into binary or protocol-specific data.
     */
    ProtocolEncoder getEncoder(IoSession session) throws Exception;
    /**
     * Returns a new (or reusable) instance of {@link ProtocolDecoder} which
     * decodes binary or protocol-specific data into message objects.
     */
    ProtocolDecoder getDecoder(IoSession session) throws Exception;
}

我们主要是仿照TextLineEncoder实现其中的encode()方法,仿照TextLineDecoder实现其中的decode()即可,它们分别实现了ProtocalEncoder和ProtocalDecoder接口。我们要编写三个类分别是:MyProtocalCodecFactory,MyProtocalEncoder,MyProtocalDecoder对应TextLineCodecFactory,TextLineEncoder,TextLineDecoder。

package com.gftech.mytool.mina;
import java.nio.charset.Charset;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;
public class MyProtocalCodecFactory   implements ProtocolCodecFactory {
        private final MyProtocalEncoder encoder;
        private final MyProtocalDecoder decoder;
        
        public MyProtocalCodecFactory(Charset charset) {
            encoder=new MyProtocalEncoder(charset);
            decoder=new MyProtocalDecoder(charset);
        }
         
        public ProtocolEncoder getEncoder(IoSession session) {
            return encoder;
        }
        public ProtocolDecoder getDecoder(IoSession session) {
            return decoder;
        }
        
}

package com.gftech.mytool.mina;
import java.nio.charset.Charset;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
public class MyProtocalEncoder extends ProtocolEncoderAdapter {
    private final Charset charset;
    public MyProtocalEncoder(Charset charset) {
        this.charset = charset;
    }
    //在此处实现对MyProtocalPack包的编码工作,并把它写入输出流中
    public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
        MyProtocalPack value = (MyProtocalPack) message;
        IoBuffer buf = IoBuffer.allocate(value.getLength());
        buf.setAutoExpand(true);
        buf.putInt(value.getLength());
        buf.put(value.getFlag());
        if (value.getContent() != null)
            buf.put(value.getContent().getBytes());
        buf.flip();
        out.write(buf);
    }
    public void dispose() throws Exception {
    }
}

package com.gftech.mytool.mina;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.AttributeKey;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
public class MyProtocalDecoder implements ProtocolDecoder {
    private final AttributeKey CONTEXT = new AttributeKey(getClass(), "context");
    private final Charset charset;
    private int maxPackLength = 100;
    public MyProtocalDecoder() {
        this(Charset.defaultCharset());
    }
    public MyProtocalDecoder(Charset charset) {
        this.charset = charset;
    }
    public int getMaxLineLength() {
        return maxPackLength;
    }
    public void setMaxLineLength(int maxLineLength) {
        if (maxLineLength <= 0) {
            throw new IllegalArgumentException("maxLineLength: " + maxLineLength);
        }
        this.maxPackLength = maxLineLength;
    }
    private Context getContext(IoSession session) {
        Context ctx;
        ctx = (Context) session.getAttribute(CONTEXT);
        if (ctx == null) {
            ctx = new Context();
            session.setAttribute(CONTEXT, ctx); 
        } 
        return ctx;
    }
    public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
        final int packHeadLength = 5;
        //先获取上次的处理上下文,其中可能有未处理完的数据
        Context ctx = getContext(session);
        // 先把当前buffer中的数据追加到Context的buffer当中 
        ctx.append(in); 
        //把position指向0位置,把limit指向原来的position位置
        IoBuffer buf = ctx.getBuffer();
        buf.flip(); 
        // 然后按数据包的协议进行读取
        while (buf.remaining() >= packHeadLength) {
            buf.mark();
            // 读取消息头部分
            int length = buf.getInt();
            byte flag = buf.get();
            //检查读取的包头是否正常,不正常的话清空buffer
            if (length<0 ||length > maxPackLength) {
                buf.clear(); 
                break;
            } 
            //读取正常的消息包,并写入输出流中,以便IoHandler进行处理
            else if (length >= packHeadLength && length - packHeadLength <= buf.remaining()) {
                int oldLimit2 = buf.limit();
                buf.limit(buf.position() + length - packHeadLength);
                String content = buf.getString(ctx.getDecoder());
                buf.limit(oldLimit2);
                MyProtocalPack pack = new MyProtocalPack(flag, content);
                out.write(pack);
            } else {
                // 如果消息包不完整
                // 将指针重新移动消息头的起始位置 
                buf.reset(); 
                break;
            }
        }
        if (buf.hasRemaining()) {
            // 将数据移到buffer的最前面 
                IoBuffer temp = IoBuffer.allocate(maxPackLength).setAutoExpand(true);
                temp.put(buf);
                temp.flip();
                buf.clear();
                buf.put(temp);
                 
        } else {// 如果数据已经处理完毕,进行清空
            buf.clear(); 
        }
        
        
    }
    public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception {
    }
    public void dispose(IoSession session) throws Exception { 
        Context ctx = (Context) session.getAttribute(CONTEXT);
        if (ctx != null) {
            session.removeAttribute(CONTEXT);
        }
    }
    //记录上下文,因为数据触发没有规模,很可能只收到数据包的一半
    //所以,需要上下文拼起来才能完整的处理
    private class Context {
        private final CharsetDecoder decoder;
        private IoBuffer buf;
        private int matchCount = 0;
        private int overflowPosition = 0;
        private Context() {
            decoder = charset.newDecoder();
            buf = IoBuffer.allocate(80).setAutoExpand(true);
        }
        public CharsetDecoder getDecoder() {
            return decoder;
        }
        public IoBuffer getBuffer() {
            return buf;
        }
        public int getOverflowPosition() {
            return overflowPosition;
        }
        public int getMatchCount() {
            return matchCount;
        }
        public void setMatchCount(int matchCount) {
            this.matchCount = matchCount;
        }
        public void reset() {
            overflowPosition = 0;
            matchCount = 0;
            decoder.reset();
        }
        public void append(IoBuffer in) { 
            getBuffer().put(in);
          
        }
 
    }
}

在MyProtocalServer中,添加自己实现的Log4jFilter和编解码过滤器:

package com.gftech.mytool.mina;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
public class MyProtocalServer {
    private static final int PORT = 2500;
    static Logger logger = Logger.getLogger(MyProtocalServer.class);
    public static void main(String[] args) throws IOException {
        PropertyConfigurator.configure("conf//log4j.properties");
        IoAcceptor acceptor = new NioSocketAcceptor();
        Log4jFilter lf = new Log4jFilter(logger);
        acceptor.getFilterChain().addLast("logger", lf);
    
        acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MyProtocalCodecFactory(Charset.forName("GBK"))));
        acceptor.getSessionConfig().setReadBufferSize(1024);
        acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
        acceptor.setHandler(new MyHandler());
        acceptor.bind(new InetSocketAddress(PORT));
        System.out.println("start server ...");
    }
}
class MyHandler extends IoHandlerAdapter {
    static Logger logger = Logger.getLogger(MyHandler.class);
    @Override
    public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
        cause.printStackTrace();
    }
    @Override
    public void messageReceived(IoSession session, Object message) throws Exception {
        MyProtocalPack pack=(MyProtocalPack)message;
        logger.debug("Rec:" + pack);
    }
    @Override
    public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
        logger.debug("IDLE " + session.getIdleCount(status));
    }
}

编写一个客户端程序进行测试:

package com.gftech.mytool.mina;
import java.io.DataOutputStream;
import java.net.Socket;
public class MyProtocalClient {
    
    public static void main(String[] args) {
        try {
            Socket socket = new Socket("127.0.0.1", 2500);
            DataOutputStream out =  new DataOutputStream( socket.getOutputStream() ) ;
            for (int i = 0; i < 1000; i++) {
                MyProtocalPack pack=new MyProtocalPack((byte)i,i+"测试MyProtocalaaaaaaaaaaaaaa");
                out.writeInt(pack.getLength());
                out.write(pack.getFlag());
                out.write(pack.getContent().getBytes());
                out.flush();
                System.out.println(i + " sended");
            }
            Thread.sleep(1000 );
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

也可以用IoConnector实现自己的客户端:

package com.gftech.mytool.mina;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.future.IoFutureListener;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
public class MyProtocalClient2 {
	private static final String HOST = "192.168.10.8";
	private static final int PORT = 2500;
	static long counter = 0;
	final static int FC1 = 100;
	static long start = 0;
	/**
	 * 使用Mina的框架结构进行测试
	 * 
	 * @param args
	 */
	public static void main(String[] args) throws IOException {
		start = System.currentTimeMillis();
		IoConnector connector = new NioSocketConnector();
		connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MyProtocalCodecFactory(Charset.forName("GBK"))));
		connector.setHandler(new TimeClientHandler2());
		connector.getSessionConfig().setReadBufferSize(100);
		connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); 
		ConnectFuture connFuture = connector.connect(new InetSocketAddress(HOST, PORT));
		connFuture.addListener(new IoFutureListener<ConnectFuture>() {
			public void operationComplete(ConnectFuture future) {
				try {
					if (future.isConnected()) {
						IoSession session = future.getSession(); 
						sendData(session);  
					} else {
						System.out.println("连接不存在 ");
					}
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		});
		System.out.println("start client ...");
	}
	public static void sendData(IoSession session) throws IOException {
		for (int i = 0; i < FC1; i++) {
			String content = "afdjkdafk张新波测试" + i;
			MyProtocalPack pack = new MyProtocalPack((byte) i, content);
			session.write(pack);
			System.out.println("send data:" + pack);
		}
	}
}
class TimeClientHandler2 extends IoHandlerAdapter {
	@Override
	public void sessionOpened(IoSession session) {
		// Set reader idle time to 10 seconds.
		// sessionIdle(...) method will be invoked when no data is read
		// for 10 seconds.
		session.getConfig().setIdleTime(IdleStatus.READER_IDLE, 60);
	}
	@Override
	public void sessionClosed(IoSession session) {
		// Print out total number of bytes read from the remote peer.
		System.err.println("Total " + session.getReadBytes() + " byte(s)");
	}
	@Override
	public void sessionIdle(IoSession session, IdleStatus status) {
		// Close the connection if reader is idle.
		if (status == IdleStatus.READER_IDLE) {
			session.close(true);
		}
	}
	@Override
	public void messageReceived(IoSession session, Object message) {
		MyProtocalPack pack = (MyProtocalPack) message;
		System.out.println("rec:" + pack);
	}
}



目录
相关文章
|
4月前
|
消息中间件 数据采集 编解码
apache mina
apache mina
39 0
|
12月前
|
网络协议 Java API
Apache Mina高性能通信框架研究邮件列表.
Apache Mina高性能通信框架研究邮件列表.
61 0
|
Apache 编解码 网络协议
|
Apache Java Spring
Apache Mina开发手册之四
版权声明:本文为博主chszs的原创文章,未经博主允许不得转载。 https://blog.csdn.net/chszs/article/details/40544275 Apache Mina开发手册之四 作者:chszs,转载需注明。
888 0
|
Apache 编解码
Apache Mina开发手册之三
版权声明:本文为博主chszs的原创文章,未经博主允许不得转载。 https://blog.csdn.net/chszs/article/details/39617969 Apache Mina开发手册之三 作者:chszs,转载需注明。
941 0
|
Java Apache 容器
Apache Mina开发手册之二
版权声明:本文为博主chszs的原创文章,未经博主允许不得转载。 https://blog.csdn.net/chszs/article/details/39550367 Apache Mina开发手册之二 作者:chszs,转载需注明。
899 0
|
API Apache
Apache Mina v2.0.8 API手册
版权声明:本文为博主chszs的原创文章,未经博主允许不得转载。 https://blog.csdn.net/chszs/article/details/39550809 Apache Mina v2.0.8 API手册 作者:chszs,转载需注明。
1030 0
|
Java Apache 编解码
Apache Mina开发手册
版权声明:本文为博主chszs的原创文章,未经博主允许不得转载。 https://blog.csdn.net/chszs/article/details/39525717 Apache Mina开发手册 作者:chszs,转载需注明。
1193 0

推荐镜像

更多