netty案例,netty4.1中级拓展篇七《Netty请求响应同步通信》

简介: 在我们实现开发RPC框架的时候,需要选择socket的通信方式。而我们知道一般情况下socket通信类似与qq聊天,发过去消息,什么时候回复都可以。但是我们RPC框架通信,从感觉上类似http调用,需要在一定时间内返回,否则就会发生超时断开。这里我们选择netty作为我们的socket框架,采用future方式进行通信。

0.jpg

前言介绍

在我们实现开发RPC框架的时候,需要选择socket的通信方式。而我们知道一般情况下socket通信类似与qq聊天,发过去消息,什么时候回复都可以。但是我们RPC框架通信,从感觉上类似http调用,需要在一定时间内返回,否则就会发生超时断开。这里我们选择netty作为我们的socket框架,采用future方式进行通信。

Dubbo:国内最早开源的 RPC 框架,由阿里巴巴公司开发并于 2011 年末对外开源,仅支持 Java 语言。

Motan:微博内部使用的 RPC 框架,于 2016 年对外开源,仅支持 Java 语言。

Tars:腾讯内部使用的 RPC 框架,于 2017 年对外开源,仅支持 C++ 语言。

Spring Cloud:国外 Pivotal 公司 2014 年对外开源的 RPC 框架,仅支持 Java 语言

gRPC:Google 于 2015 年对外开源的跨语言 RPC 框架,支持多种语言。

Thrift:最初是由 Facebook 开发的内部系统跨语言的 RPC 框架,2007 年贡献给了 Apache 基金,成为 Apache 开源项目之一,支持多种语言。

hprose:一个MIT开源许可的新型轻量级跨语言跨平台的面向对象的高性能远程动态通讯中间件。它支持众多语言:nodeJs, C++, .NET, Java, Delphi, Objective-C, ActionScript, JavaScript, ASP, PHP, Python, Ruby, Perl, Golang 。

环境准备

1、jdk1.8【jdk1.7以下只能部分支持netty】

2、Netty4.1.36.Final【netty3.x 4.x 5每次的变化较大,接口类名也随着变化】

代码示例

itstack-demo-rpc-2-07
└── src
    └── main
    │    └── java
    │        └── org.itstack.demo.netty
    │             ├── client
    │             │   ├── ClientSocket.java
    │             │   └── MyClientHandler.java
    │             ├── codec
    │             │   ├── RpcDecoder.java
    │             │   └── RpcEncoder.java
    │             ├── future
    │             │   ├── SyncWrite.java
    │             │   ├── SyncWriteFuture.java
    │             │   ├── SyncWriteMap.java
    │             │   └── WriteFuture.java
    │             ├── msg
    │             │   ├── Request.java
    │             │   └── Response.java
    │             ├── server
    │             │   ├── MyServerHandler.java
    │             │   └── ServerSocket.java
    │             └── util
    │                 └── SerializationUtil.java
    └── test
         └── java
             └── org.itstack.demo.test
                 ├── StartClient.java
                 └── StartServer.java

展示部分重要代码块,完整代码可以关注公众号获取;bugstack虫洞栈

MyClientHandler.java

/**
 * 虫洞栈:https://bugstack.cn
 * 公众号:bugstack虫洞栈  {关注获取学习源码}
 * 虫洞群:①群5398358 ②群5360692
 * Create by fuzhengwei on 2019
 */
public class MyClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception {
        Response msg = (Response) obj;
        String requestId = msg.getRequestId();
        SyncWriteFuture future = (SyncWriteFuture) SyncWriteMap.syncKey.get(requestId);
        if (future != null) {
            future.setResponse(msg);
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

SyncWrite.java

/**
 * 虫洞栈:https://bugstack.cn
 * 公众号:bugstack虫洞栈  {关注获取学习源码}
 * 虫洞群:①群5398358 ②群5360692
 * Create by fuzhengwei on 2019
 */
public class SyncWrite {
    public Response writeAndSync(final Channel channel, final Request request, final long timeout) throws Exception {
        if (channel == null) {
            throw new NullPointerException("channel");
        }
        if (request == null) {
            throw new NullPointerException("request");
        }
        if (timeout <= 0) {
            throw new IllegalArgumentException("timeout <= 0");
        }
        String requestId = UUID.randomUUID().toString();
        request.setRequestId(requestId);
        WriteFuture<Response> future = new SyncWriteFuture(request.getRequestId());
        SyncWriteMap.syncKey.put(request.getRequestId(), future);
        Response response = doWriteAndSync(channel, request, timeout, future);
        SyncWriteMap.syncKey.remove(request.getRequestId());
        return response;
    }
    private Response doWriteAndSync(final Channel channel, final Request request, final long timeout, final WriteFuture<Response> writeFuture) throws Exception {
        channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) throws Exception {
                writeFuture.setWriteResult(future.isSuccess());
                writeFuture.setCause(future.cause());
                //失败移除
                if (!writeFuture.isWriteSuccess()) {
                    SyncWriteMap.syncKey.remove(writeFuture.requestId());
                }
            }
        });
        Response response = writeFuture.get(timeout, TimeUnit.MILLISECONDS);
        if (response == null) {
            if (writeFuture.isTimeout()) {
                throw new TimeoutException();
            } else {
                // write exception
                throw new Exception(writeFuture.cause());
            }
        }
        return response;
    }
}

MyServerHandler.java

/**
 * 虫洞栈:https://bugstack.cn
 * 公众号:bugstack虫洞栈  {关注获取学习源码}
 * 虫洞群:①群5398358 ②群5360692
 * Create by fuzhengwei on 2019
 */
public class MyServerHandler extends ChannelInboundHandlerAdapter{
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object obj){
        Request msg = (Request) obj;
        //反馈
        Response request = new Response();
        request.setRequestId(msg.getRequestId());
        request.setParam(msg.getResult() + " 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。");
        ctx.writeAndFlush(request);
        //释放
        ReferenceCountUtil.release(msg);
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }
}

StartClient.java

/**
 * 虫洞栈:https://bugstack.cn
 * 公众号:bugstack虫洞栈  {关注获取学习源码}
 * 虫洞群:①群5398358 ②群5360692
 * Create by fuzhengwei on 2019
 */
public class StartClient {
    private static ChannelFuture future;
    public static void main(String[] args) {
        ClientSocket client = new ClientSocket();
        new Thread(client).start();
        while (true) {
            try {
                //获取future,线程有等待处理时间
                if (null == future) {
                    future = client.getFuture();
                    Thread.sleep(500);
                    continue;
                }
                //构建发送参数
                Request request = new Request();
                request.setResult("查询{bugstack虫洞栈}用户信息");
                SyncWrite s = new SyncWrite();
                Response response = s.writeAndSync(future.channel(), request, 1000);
                System.out.println("调用结果:" + JSON.toJSON(response));
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

StartServer.java

/**
 * 虫洞栈:https://bugstack.cn
 * 公众号:bugstack虫洞栈  {关注获取学习源码}
 * 虫洞群:①群5398358 ②群5360692
 * Create by fuzhengwei on 2019
 */
public class StartServer {
    public static void main(String[] args) {
        new Thread(new ServerSocket()).start();
        System.out.println("itstack-demo-netty server start done. {关注公众号:bugstack虫洞栈,获取源码}");
    }
}

测试结果

启动StartServer

itstack-demo-netty server start done. {关注公众号:bugstack虫洞栈,获取源码}

启动StartClient

调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"188ba00e-3a0d-4094-9475-c7ee93104011"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"bc9d63d4-9d37-406a-9c0f-a68211ac466f"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"cceb510b-8179-46ab-abc6-eb7d5b6c0ac2"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"f65aafd0-07b7-4bdb-be80-a57b4c58ad2d"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"e6700074-380b-441f-ae0d-f71dcd7f84c9"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"25a0a9d5-46d5-4da6-ad3f-1496ca20bb17"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"052dce21-dfb9-42d7-bbcf-46137b9933df"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"4fd968fa-3171-4e4f-838d-4a215a90da00"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"97f6eb6e-8c5d-49f4-beba-2ba7e1ff953f"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"ffc48115-8e62-43a8-b3f7-035390427d37"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"4bafefc9-4beb-49f6-8126-29e0e03a55d1"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"10367786-24fd-4f11-9465-bcd2c87b4027"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"82293ad3-a694-405c-ab03-01624f38b1ad"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"0c163cb8-be5f-4697-931f-61f5bf487bae"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"0f1e7611-4fd0-43b4-86dc-fce09965046e"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"f22ac0dc-974c-4de1-bcdf-1566ca0b2305"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"1907456b-2507-4ddd-8c1e-a86c990e3e72"}
调用结果:{"param":"查询{bugstack虫洞栈}用户信息 请求成功,反馈结果请接受处理{公众号:bugstack虫洞栈 博客栈:https://bugstack.cn}。","requestId":"6a99b1f2-5859-4ed7-9d17-98229c13250f"}
Process finished with exit code -1


1.jpg

目录
相关文章
Netty Http服务器接收请求
Netty Http服务器接收请求
77 0
|
6月前
|
前端开发 网络协议 Java
Netty | 工作流程图分析 & 核心组件说明 & 代码案例实践
Netty | 工作流程图分析 & 核心组件说明 & 代码案例实践
359 0
|
网络协议
由浅入深Netty聊天室案例
由浅入深Netty聊天室案例
57 0
|
消息中间件 分布式计算 NoSQL
由浅入深Netty入门案例
由浅入深Netty入门案例
132 0
|
Rust Dubbo 网络协议
通过 HTTP/2 协议案例学习 Java & Netty 性能调优:工具、技巧与方法论
通过 HTTP/2 协议案例学习 Java & Netty 性能调优:工具、技巧与方法论
12655 12
Netty入门到超神系列-聊天室案例
对于服务端而言需要做如下事情 selector监听客户端的链接 如果有“读”事件,就从通道读取数据 把数据转发给其他所有的客户端,要过滤掉发消息过来的客户端不用转发 对于客户端而言需要做如下事情 selector监听服务端的“读”事件 如果有数据从通道中读取数据,打印到控制台 监听键盘输入,向服务端发送消息
115 0
Netty(七)之聊天室小小小案例
Netty(七)之聊天室小小小案例
90 0
|
存储 Java
netty框架的学习笔记 + 一个netty实现websocket通信案例
一、前言 1.什么是netty? 2.netty的使用场景。 3.学习目录 二.java io通信 三.netty入门 四.websocket入门 五.netty实现websocket通信案例。 1.
287 0
|
机器学习/深度学习 存储 Java
【Netty】Netty 入门案例分析 ( Netty 模型解析 | Netty 服务器端代码 | Netty 客户端代码 )
【Netty】Netty 入门案例分析 ( Netty 模型解析 | Netty 服务器端代码 | Netty 客户端代码 )
251 0
【Netty】Netty 入门案例分析 ( Netty 模型解析 | Netty 服务器端代码 | Netty 客户端代码 )
|
Java Maven
【Netty】Netty 入门案例分析 ( Netty 线程模型 | Netty 案例需求 | IntelliJ IDEA 项目导入 Netty 开发库 )(二)
【Netty】Netty 入门案例分析 ( Netty 线程模型 | Netty 案例需求 | IntelliJ IDEA 项目导入 Netty 开发库 )(二)
517 0
【Netty】Netty 入门案例分析 ( Netty 线程模型 | Netty 案例需求 | IntelliJ IDEA 项目导入 Netty 开发库 )(二)