package com.baoy.server; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.charset.Charset; import org.apache.log4j.Logger; import org.apache.mina.core.service.IoAcceptor; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; import com.baoy.util.Constants; public class ServerSocket { private static Logger log = Logger.getLogger(ServerSocket.class); private IoAcceptor acceptor; private ServerSocket(){} private static ServerSocket instance = new ServerSocket(); public static ServerSocket getServerSocket(){ return instance; } public void start(){ acceptor = new NioSocketAcceptor(); acceptor.getFilterChain().addLast( "logger", new LoggingFilter() ); acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8")))); ServerHandler handler = new ServerHandler(); acceptor.setHandler(handler); try { acceptor.getSessionConfig().setReadBufferSize(Constants.SERVER_SOCKET_BUFFERSIZE); acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, Constants.SERVER_SOCKET_HEARTBEAT_TIMEOUT); acceptor.bind( new InetSocketAddress(Constants.SERVER_SOCKET_PORT)); } catch (IOException e) { log.error("Cannot start server", e); } } public void stop(){ if(acceptor != null){ acceptor.unbind(); acceptor.dispose(); } } }
package com.baoy.server; import net.sf.json.JSONObject; import org.apache.log4j.Logger; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IoSession; import com.baoy.message.Message; import com.baoy.message.MessageFactory; import com.baoy.queue.QueueHolder; import com.baoy.queue.ServerQueue; public class ServerHandler extends IoHandlerAdapter{ private static Logger log = Logger.getLogger(ServerHandler.class); @Override public void messageReceived(IoSession session, Object json) throws Exception { super.messageReceived(session, json); log.debug("server recieve msg:" + json.toString()); if (ServerQueue.getSession() != session) ServerQueue.setSession(session); MessageFactory facotry = new MessageFactory(); Message message = facotry.build(json.toString()); QueueHolder.getHolder().getQueue(QueueHolder.STC).send(message); } }
package com.baoy.client; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.charset.Charset; import org.apache.log4j.Logger; import org.apache.mina.core.service.IoAcceptor; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; import com.baoy.util.Constants; public class ClientSocket { private static Logger log = Logger.getLogger(ClientSocket.class); private IoAcceptor acceptor; private ClientSocket(){} private static ClientSocket instance = new ClientSocket(); public static ClientSocket getServerSocket(){ return instance; } public void start(){ acceptor = new NioSocketAcceptor(); acceptor.getFilterChain().addLast( "logger", new LoggingFilter() ); acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8")))); ClientHandler handler = new ClientHandler(); acceptor.setHandler(handler); try { acceptor.getSessionConfig().setReadBufferSize(Constants.CLIENT_SOCKET_BUFFERSIZE); acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, Constants.CLIENT_SOCKET_HEARTBEAT_TIMEOUT); acceptor.bind( new InetSocketAddress(Constants.CLIENT_SOCKET_PORT)); } catch (IOException e) { log.error("Cannot start client", e); } } public void stop(){ if(acceptor != null){ acceptor.unbind(); acceptor.dispose(); } } }
package com.baoy.client; import org.apache.log4j.Logger; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IoSession; import com.baoy.message.Message; import com.baoy.message.MessageFactory; import com.baoy.queue.QueueHolder; import com.baoy.queue.SessionWrap; import com.baoy.queue.UserSessionHolder; public class ClientHandler extends IoHandlerAdapter{ private static Logger log = Logger.getLogger(ClientHandler.class); @Override public void messageReceived(IoSession session, Object json) throws Exception { super.messageReceived(session, json); log.debug("client recieve msg:" + json.toString()); MessageFactory facotry = new MessageFactory(); Message message = facotry.build(json.toString()); switch(message.getTagEnum()){ case connection: SessionWrap sessionWrapConn = new SessionWrap(message.getUserKey(), session); UserSessionHolder.getHolder().putSession(sessionWrapConn); session.write(message.successMessage()); break; case heartbreat: SessionWrap sessionWrapHB = UserSessionHolder.getHolder().getSession(message.getUserKey()); if(sessionWrapHB == null){ sessionWrapHB = new SessionWrap(message.getUserKey(), session); UserSessionHolder.getHolder().putSession(sessionWrapHB); }else{ sessionWrapHB.updateHeartbreatTime(); } QueueHolder.getHolder().getQueue(QueueHolder.CTS).send(message); break; default: QueueHolder.getHolder().getQueue(QueueHolder.CTS).send(message); } } }
package com.baoy.queue; import org.apache.log4j.Logger; import org.apache.mina.core.session.IoSession; import com.baoy.message.Message; import com.baoy.message.MessageFactory; import com.baoy.message.ShortMessage; import com.baoy.server.ServerSocket; public class ServerQueue extends MessageQueue{ private static Logger log = Logger.getLogger(ServerSocket.class); public ServerQueue(){ resendSleepInterval = 1000 * 60 ; maxFailedCount = 15; } private static IoSession session = null; public static IoSession getSession(){ return ServerQueue.session; } public static void setSession(IoSession session){ if (session == ServerQueue.session) return; if (ServerQueue.session != null){ ServerQueue.session.close(false); } ServerQueue.session = session; log.info("server connected"); } public boolean doSend(Message message){ log.debug(" STC -> ServerQueue doSend :"+message.toString()); try { SessionWrap session; String to =""; if(message instanceof ShortMessage){ to = ((ShortMessage) message).getTo(); session = UserSessionHolder.getHolder().getSession(to); }else{ session = UserSessionHolder.getHolder().getSession(message.getUserKey()); } if(session == null || session.isSessionClosed()){ return false; } return session.send(message); } catch (Exception e) { return false; } } }
package com.baoy.queue; import org.apache.log4j.Logger; import org.apache.mina.core.session.IoSession; import com.baoy.message.Message; public class ClientQueue extends MessageQueue{ private static Logger log = Logger.getLogger(ClientQueue.class); public ClientQueue(){ resendSleepInterval = 1000 * 60 * 5; maxFailedCount = 3; } public boolean doSend(Message message){ log.debug(" CTS -> ClientQueue doSend :"+message.toString()); try { IoSession session = ServerQueue.getSession(); if (session == null){ return false; } session.write(message); return true; } catch (Exception e) { } return false; } }
捐助开发者
在兴趣的驱动下,写一个免费
的东西,有欣喜,也还有汗水,希望你喜欢我的作品,同时也能支持一下。 当然,有钱捧个钱场(右上角的爱心标志,支持支付宝和PayPal捐助),没钱捧个人场,谢谢各位。
谢谢您的赞助,我会做的更好!