package com.netty.core; import java.security.cert.CertificateException; import javax.net.ssl.SSLException; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.util.SelfSignedCertificate; import org.apache.log4j.Logger; import org.springframework.beans.factory.InitializingBean; import com.netty.util.NettyPropertiesUtil; public class NettyServer implements InitializingBean { Logger logger = Logger.getLogger(NettyServer.class); public boolean isSSL = true; public int port = 8443; public int threadSize = 1; public int connnecttimeout = 10000; public int sotimeout = 10000; public void init() { isSSL = Integer .parseInt(NettyPropertiesUtil.getProperty("netty.isSSL")) == 1 ? true : false; port = Integer.parseInt(NettyPropertiesUtil.getProperty("netty.port")); threadSize = Integer.parseInt(NettyPropertiesUtil .getProperty("netty.nioeventloopgroup.size")); connnecttimeout = Integer.parseInt(NettyPropertiesUtil .getProperty("netty.serverbootstrap.connnecttimeout")); sotimeout = Integer.parseInt(NettyPropertiesUtil .getProperty("netty.serverbootstrap.sotimeout")); } @SuppressWarnings({ "deprecation" }) public void start() throws CertificateException, SSLException, InterruptedException { logger.info("netty server start"); final SslContext sslCtx; if (isSSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContext.newServerContext(ssc.certificate(), ssc.privateKey()); } else { sslCtx = null; } EventLoopGroup bossGroup = new NioEventLoopGroup(threadSize); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.option(ChannelOption.SO_BACKLOG, 1024); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new NettyServerInitializer(sslCtx)); b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connnecttimeout); b.option(ChannelOption.SO_TIMEOUT, sotimeout); Channel ch = b.bind(port).sync().channel(); logger.info("netty server end"); ch.closeFuture().sync(); logger.info("netty end "); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } @Override public void afterPropertiesSet() throws Exception { try { logger.info(" netty conf init start"); init(); logger.info(" netty conf init end"); } catch (Exception e) { logger.info(" netty conf init fail"); e.printStackTrace(); } } }
package com.netty.core; import org.apache.log4j.Logger; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.HttpHeaders.Names; public class NettyServerHandler extends ChannelInboundHandlerAdapter { Logger logger = Logger.getLogger(NettyServerHandler.class); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof HttpRequest){ HttpRequest req = (HttpRequest) msg; logger.info("RECV MSG: "+req.getUri()); if(HttpHeaders.is100ContinueExpected(req)){ ctx.write(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE)); } String resp="ok"; FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(resp.getBytes())); response.headers().set(Names.CONTENT_TYPE, "text/html; charset=utf-8"); response.headers().set(Names.CONTENT_LENGTH, response.content().readableBytes()); ctx.write(response).addListener(ChannelFutureListener.CLOSE); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } }
package com.netty.core; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.ssl.SslContext; public class NettyServerInitializer extends ChannelInitializer<SocketChannel> { private final SslContext sslCtx; public NettyServerInitializer() { sslCtx=null; } public NettyServerInitializer(SslContext sslCtx) { this.sslCtx = sslCtx; } @Override public void initChannel(SocketChannel ch) { ChannelPipeline p = ch.pipeline(); if (null!=sslCtx) { p.addLast(sslCtx.newHandler(ch.alloc())); } p.addLast(new HttpServerCodec()); //完整参数封装 p.addLast("aggregator", new HttpObjectAggregator(1048576)); p.addLast(new NettyServerHandler()); } }
package test.netty; import java.net.URI; import org.junit.Test; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpRequestEncoder; import io.netty.handler.codec.http.HttpResponseDecoder; import io.netty.handler.codec.http.HttpVersion; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpResponse; public class NettyClientTest { public void connect(String host, int port) throws Exception { EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(workerGroup); b.channel(NioSocketChannel.class); b.option(ChannelOption.SO_KEEPALIVE, true); b.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { // 客户端接收到的是httpResponse响应,所以要使用HttpResponseDecoder进行解码 ch.pipeline().addLast(new HttpResponseDecoder()); // 客户端发送的是httprequest,所以要使用HttpRequestEncoder进行编码 ch.pipeline().addLast(new HttpRequestEncoder()); ch.pipeline().addLast(new HttpClientInboundHandler()); } }); // Start the client. ChannelFuture f = b.connect(host, port).sync(); URI uri = new URI("http://127.0.0.1:8443"); String msg = "Are you ok?"; DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toASCIIString(), Unpooled.wrappedBuffer(msg.getBytes("UTF-8"))); // 构建http请求 request.headers().set(HttpHeaders.Names.HOST, host); request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); request.headers().set(HttpHeaders.Names.CONTENT_LENGTH, request.content().readableBytes()); // 发送http请求 f.channel().write(request); f.channel().flush(); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } @Test public void Test(){ try{ NettyClientTest client = new NettyClientTest(); client.connect("127.0.0.1", 8443); }catch(Exception e){ e.printStackTrace(); } } } class HttpClientInboundHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof HttpResponse) { HttpResponse response = (HttpResponse) msg; System.out.println("CONTENT_TYPE:" + response.headers().get(HttpHeaders.Names.CONTENT_TYPE)); } if(msg instanceof HttpContent) { HttpContent content = (HttpContent)msg; ByteBuf buf = content.content(); System.out.println(buf.toString(io.netty.util.CharsetUtil.UTF_8)); buf.release(); } } }