版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u010741376/article/details/46367701
通过一个实例来说明:Netty服务端接受到客户端的用户订购请求消息,服务端接受到请求消息,对用户名进行合法性校验,则构造订购成功的应答消息返回给客户端。使用Netty的ObjectEncoder和ObjectDecoder对订购请求和应答消息进行序列化.
订购请求POJO类的定义:
import java.io.Serializable;
/**
* 客户端请求消息
* @author Administrator
*
*/
public class SubscribeReq implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private int subReqID;
private String userName;
private String productName;
private String phoneNumber;
private String address;
public int getSubReqID() {
return subReqID;
}
public void setSubReqID(int subReqID) {
this.subReqID = subReqID;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getProductName() {
return productName;
}
public void setProductName(String productName) {
this.productName = productName;
}
public String getPhoneNumber() {
return phoneNumber;
}
public void setPhoneNumber(String phoneNumber) {
this.phoneNumber = phoneNumber;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
@Override
public String toString() {
return subReqID+"-"+userName+"-"+productName+"-"+phoneNumber+"-"+address;
}
}
订购应答POJO类:
/**
* 服务端应答消息
* @author Administrator
*
*/
public class SubscribeResp implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private int subReqID;
private int respCode;
private String desc;
public int getSubReqID() {
return subReqID;
}
public void setSubReqID(int subReqID) {
this.subReqID = subReqID;
}
public int getRespCode() {
return respCode;
}
public void setRespCode(int respCode) {
this.respCode = respCode;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return subReqID+"-"+respCode+"-"+desc;
}
}
订购服务端主函数:
public class SubReqServer {
public void bind(int port)throws Exception{
//配置服务端的NIO线程组
EventLoopGroup bossGroup=new NioEventLoopGroup();
EventLoopGroup workerGroup=new NioEventLoopGroup();
try {
ServerBootstrap b=new ServerBootstrap();
b.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(new ObjectDecoder(1024*1024, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(new SubReqServerHandler());
}
});
//绑定端口,同步等待成功
ChannelFuture f=b.bind(port).sync();
//等待服务端监听端口关闭
f.channel().closeFuture().sync();
} finally{
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port=8088;
if(args!=null&&args.length>0){
try {
port=Integer.valueOf(args[0]);
} catch (Exception e) {
// TODO: handle exception
}
}
new SubReqServer().bind(port);
}
}
上述代码中,我们首先创建一个新的ObjectDecoder,它负责对实现Serializable的POJO对象进行解码,它有多个构造函数,支持不同的ClassResolver,在此我们使用weakCachingConcurrentResolver创建线程安全的WeakReferenceMap对类加载器进行缓存,它支持多线程并发访问,当虚拟机内存不足时,会释放缓存中的内存,防止内存泄露。
订单处理类:
public class SubReqServerHandler extends ChannelHandlerAdapter{
public void channelRead(ChannelHandlerContext ctx,Object msg){
SubscribeReq req=(SubscribeReq)msg;
if("yxs".equalsIgnoreCase(req.getUserName())){
System.out.println("Server accept client subscript req:["+req.toString()+"]");
ctx.writeAndFlush(resp(req.getSubReqID()));
}
}
private SubscribeResp resp(int subReqID){
SubscribeResp resp=new SubscribeResp();
resp.setSubReqID(subReqID);
resp.setRespCode(0);
resp.setDesc("Netty book order succeed,3 days later ,sent to the designated address");
return resp;
}
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
cause.printStackTrace();
ctx.close();
}
}
客户端开发:将Netty对象解码器和编码器添加到ChannelPipline中,链路被激活的时候构造订购请求消息发送,为了检验Netty的java序列化功能是否支持TCP粘包/拆包,客户端一次构造10条订购请求,最后一次性发送给服务端,客户端订购处理handler将接受到的订购响应消息打印出来.
产品订购客户端:
public class SubReqClient {
public void connect(int port,String host)throws Exception{
//配置客户端NIO 线程组
EventLoopGroup group=new NioEventLoopGroup();
try {
Bootstrap b=new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
arg0.pipeline().addLast(new ObjectDecoder(1024,ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
arg0.pipeline().addLast(new ObjectEncoder());
arg0.pipeline().addLast(new SubReqClientHandler());
}
});
//发起异步链接操作
ChannelFuture f=b.connect(host,port).sync();
//等待客户端链路关闭
f.channel().closeFuture().sync();
} finally{
//优雅的退出,释放NIO线程组
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port=8088;
if(args!=null&&args.length>0){
try {
port=Integer.valueOf(args[0]);
} catch (Exception e) {
// TODO: handle exception
}
}
new SubReqClient().connect(port, "127.0.0.1");
}
}
客户端处理类:
public class SubReqClientHandler extends ChannelHandlerAdapter{
public SubReqClientHandler(){}
public void channelActive(ChannelHandlerContext ctx){
for(int i=0;i<10;i++){
ctx.write(subReq(i));//写入10条记录
}
ctx.flush();//最后一次性发送
}
private SubscribeReq subReq(int i){
SubscribeReq req=new SubscribeReq();
req.setAddress("武汉东湖新技术开发区");
req.setPhoneNumber("121324343535");
req.setProductName("Netty权威指南");
req.setSubReqID(i);
req.setUserName("Mryang");
return req;
}
public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{
System.out.println("Receive server response:["+msg+"]");
}
public void channelReadComplete(ChannelHandlerContext ctx)throws Exception{
ctx.flush();
}
public void exceptionCaght(ChannelHandlerContext ctx,Throwable cause){
cause.printStackTrace();
ctx.close();
}
}