netty案例,netty4.1源码分析篇六《Netty异步架构监听类Promise源码分析》

简介: Netty是一个异步网络处理框架,在实现中大量使用了Future机制,并在Java自带Future的基础上,增加了Promise机制。这两个实现类的目的都是为了使异步编程更加方便使用。

前言介绍

分析Promise之前我们先来看两个单词;Promise、Future

Promise v. 许诺;承诺;答应;保证;使很可能;预示

Future n. 将来;未来;未来的事;将来发生的事;前景;前途;前程

他们的含义都是对未来即将要发生的事情做相应的处理,这也是在异步编程中非常常见的类名。

Netty是一个异步网络处理框架,在实现中大量使用了Future机制,并在Java自带Future的基础上,增加了Promise机制。这两个实现类的目的都是为了使异步编程更加方便使用。

源码分析

1、了解Java并发包中的Future

java的并发包中提供java.util.concurrent.Future类,用于处理异步操作。在Java中Future是一个未来完成的异步操作,可以获得未来返回的值。如下案例,调用一个获取用户信息的方法,该方法会立刻返回Future对象,调用Future.get()可以同步等待耗时方法的返回,也可以通过调用future的cancel()取消Future任务。

1class TestFuture {
 2
 3    public static void main(String[] args) throws ExecutionException, InterruptedException {
 4        TestFuture testFuture = new TestFuture();
 5        Future<String> future = testFuture.queryUserInfo("10001"); //返回future
 6        String userInfo = future.get();
 7        System.out.println("查询用户信息:" + userInfo);
 8    }
 9
10    private Future<String> queryUserInfo(String userId) {
11        FutureTask<String> future = new FutureTask<>(() -> {
12            try {
13                Thread.sleep(1000);
14                return "微信公众号:bugstack虫洞栈 | 用户ID:" + userId;
15            } catch (InterruptedException ignored) {}
16            return "error";
17        });
18        new Thread(future).start();
19        return future;
20    }
21
22}

2、Netty实现了自己的Future

Netty通过继承java并发包的Future来定义自己的Future接口,为Future加入的功能主要有添加、删除监听事件接口,最后由Promise实现。

io.netty.util.concurrent.Future.java中定义了一些列的异步编程方法 | 经常会使用的>b.bind(port).sync();

1// 只有IO操作完成时才返回true
 2boolean isSuccess();
 3// 只有当cancel(boolean)成功取消时才返回true
 4boolean isCancellable();
 5// IO操作发生异常时,返回导致IO操作以此的原因,如果没有异常,返回null
 6Throwable cause();
 7// 向Future添加事件,future完成时,会执行这些事件,如果add时future已经完成,会立即执行监听事件
 8Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
 9Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
10// 移除监听事件,future完成时,不会触发
11Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
12Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
13// 等待future done
14Future<V> sync() throws InterruptedException;
15// 等待future done,不可打断
16Future<V> syncUninterruptibly();
17// 等待future完成
18Future<V> await() throws InterruptedException;
19// 等待future 完成,不可打断
20Future<V> awaitUninterruptibly();
21boolean await(long timeout, TimeUnit unit) throws InterruptedException;
22boolean await(long timeoutMillis) throws InterruptedException;
23boolean awaitUninterruptibly(long timeout, TimeUnit unit);
24boolean awaitUninterruptibly(long timeoutMillis);
25// 立刻获得结果,如果没有完成,返回null
26V getNow();
27// 如果成功取消,future会失败,导致CancellationException
28@Override
29boolean cancel(boolean mayInterruptIfRunning);

3、Promise机制

Netty的Future与Java的Future虽然类名相同,但功能上略有不同,Netty中引入了Promise机制。在Java的Future中,业务逻辑为一个Callable或Runnable实现类,该类的call()或run()执行完毕意味着业务逻辑的完结;而在Promise机制中,可以在业务逻辑中人工设置业务逻辑的成功与失败,这样更加方便的监控自己的业务逻辑。

io.netty.util.concurrent.Promise.java |

1public interface Promise<V> extends Future<V> {
 2
 3    // 设置future执行结果为成功
 4    Promise<V> setSuccess(V result);
 5
 6    // 尝试设置future执行结果为成功,返回是否设置成功
 7    boolean trySuccess(V result);
 8
 9    // 设置失败
10    Promise<V> setFailure(Throwable cause);
11
12    // 尝试设置future执行结果为失败,返回是否设置成功 
13    boolean tryFailure(Throwable cause);
14
15    // 设置为不能取消
16    boolean setUncancellable();
17
18    // 源码中,以下为覆盖了Future的方法,例如;
19
20    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
21
22    @Override
23    Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
24
25}

TestPromise.java | 一个查询用户信息的Promise列子,加入监听再operationComplete完成后,获取查询信息

1class TestPromise {
 2
 3    public static void main(String[] args) throws ExecutionException, InterruptedException {
 4        TestPromise testPromise = new TestPromise();
 5        Promise<String> promise = testPromise.queryUserInfo("10001");
 6        promise.addListener(new GenericFutureListener<Future<? super String>>() {
 7            @Override
 8            public void operationComplete(Future<? super String> future) throws Exception {
 9                System.out.println("addListener.operationComplete > 查询用户信息完成: " + future.get());
10            }
11        });
12    }
13
14    private Promise<String> queryUserInfo(String userId) {
15        NioEventLoopGroup loop = new NioEventLoopGroup();
16        // 创建一个DefaultPromise并返回,将业务逻辑放入线程池中执行
17        DefaultPromise<String> promise = new DefaultPromise<String>(loop.next());
18        loop.schedule(() -> {
19            try {
20                Thread.sleep(1000);
21                promise.setSuccess("微信公众号:bugstack虫洞栈 | 用户ID:" + userId);
22                return promise;
23            } catch (InterruptedException ignored) {
24            }
25            return promise;
26        }, 0, TimeUnit.SECONDS);
27        return promise;
28    }
29
30}

通过这个例子可以看到,Promise能够在业务逻辑线程中通知Future成功或失败,由于Promise继承了Netty的Future,因此可以加入监听事件。而Future和Promise的好处在于,获取到Promise对象后可以为其设置异步调用完成后的操作,然后立即继续去做其他任务。

4、Promise类组织结构&常用方法

DefaultChannelPromise类组织结构图 | 承接Java并发包Future并增强实现

35.jpg

DefaultChannelPromise类组织结构图

Netty中DefalutPromise是一个非常常用的类,这是Promise实现的基础。DefaultChannelPromise是DefalutPromise的子类,加入了channel这个属性。

DefaultPromise | 使用

在Netty中使用到Promise的地方会非常多,例如在前面一节《一行简单的writeAndFlush都做了哪些事》分析HeadContext.write中unsafe.write(msg, promise);结合这一章节可以继续深入了解Netty的异步框架原理。另外,服务器/客户端启动时的注册任务,最终会调用unsafe的register,调用过程中会传入一个promise,unsafe进行事件的注册时调用promise可以设置成功/失败。

SingleThreadEventLoop.java | 注册服务事件循环组

1@Override
 2public ChannelFuture register(Channel channel) {
 3    return register(new DefaultChannelPromise(channel, this));
 4}
 5
 6@Override
 7public ChannelFuture register(final ChannelPromise promise) {
 8    ObjectUtil.checkNotNull(promise, "promise");
 9    promise.channel().unsafe().register(this, promise);
10    return promise;
11}

DefaultPromise | 实现

DefaultChannelPromise提供的功能可以分为两个部分;

  • 为调用者提供get()和addListener()用于获取Future任务执行结果和添加监听事件。
  • 为业务处理任务提供setSuccess()等方法设置任务的成功或失败。

AbstractFuture.java | get()方法

1public abstract class AbstractFuture<V> implements Future<V> {
 2
 3    @Override
 4    public V get() throws InterruptedException, ExecutionException {
 5        await();
 6
 7        Throwable cause = cause();
 8        if (cause == null) {
 9            return getNow();
10        }
11        if (cause instanceof CancellationException) {
12            throw (CancellationException) cause;
13        }
14        throw new ExecutionException(cause);
15    }
16
17    @Override
18    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
19        if (await(timeout, unit)) {
20            Throwable cause = cause();
21            if (cause == null) {
22                return getNow();
23            }
24            if (cause instanceof CancellationException) {
25                throw (CancellationException) cause;
26            }
27            throw new ExecutionException(cause);
28        }
29        throw new TimeoutException();
30    }
31}

DefaultPromise父类AbstractFuture提供了两个get方法;1、无参数的get会阻塞等待;2、有参数的get会等待指定事件,若未结束抛出超时异常。


DefaultPromise.java | DefaultPromise.await()方法

1@Override
 2public Promise<V> await() throws Interrupt
 3    // 判断Future任务是否结束,内部根据result是否为null判断,setSuccess或setFailure时会通过CAS修改result
 4    if (isDone()) {
 5        return this;
 6    }
 7    // 线程是否被中断
 8    if (Thread.interrupted()) {
 9        throw new InterruptedException(toS
10    }
11    // 检查当前线程是否与线程池运行的线程是一个
12    checkDeadLock();
13    synchronized (this) {
14        while (!isDone()) {
15           /* waiters计数加1
16            * private void incWaiters() {
17            *   if (waiters == Short.MAX_VALUE) {
18            *       throw new IllegalStateException("too many waiters: " + this);
19            *   }
20            *   ++waiters;
21            * }
22            */
23            incWaiters();
24            try {
25                // Object的方法,让出CPU,加入等待队列
26                wait();
27            } finally {
28                // waiters计数减1
29                decWaiters();
30            }
31        }
32    }
33    return this;
34}

await(long timeout, TimeUnit unit)与awite类似,只是调用了Object对象的wait(long timeout, int nanos)方法awaitUninterruptibly()方法在内部catch住了等待线程的中断异常,因此不会抛出中断异常。


DefaultPromise.java | DefaultPromise.addListener0() / DefaultPromise.removeListener0()

1private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
 2    if (listeners == null) {
 3        listeners = listener;
 4    } else if (listeners instanceof DefaultFutureListeners) {
 5        ((DefaultFutureListeners) listeners).add(listener);
 6    } else {
 7        listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
 8    }
 9}
10private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) {
11    if (listeners instanceof DefaultFutureListeners) {
12        ((DefaultFutureListeners) listeners).remove(listener);
13    } else if (listeners == listener) {
14        listeners = null;
15    }
16}
  • addListener0方法被调用时,将传入的回调类传入到listeners对象中,如果监听多于1个,会创建DefaultFutureListeners对象将回调方法保存在一个数组中。
  • removeListener0会将listeners设置为null(只有一个时)或从数组中移除(多个回调时)。

DefaultPromise.java | DefaultPromise.notifyListener0() 通知侦听器

1@SuppressWarnings({ "unchecked", "rawtypes" })
 2private static void notifyListener0(Future future, GenericFutureListener l) {
 3    try {
 4        l.operationComplete(future);
 5    } catch (Throwable t) {
 6        if (logger.isWarnEnabled()) {
 7            logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
 8        }
 9    }
10}
  • 在添加监听器时,如果任务刚好执行完毕,则会立即触发监听事件,触发监听通过notifyListeners()实现。
  • addListener和setSuccess都会调用notifyListeners()和Promise内的线程池当前执行的线程是同一个线程,则放在线程池中执行,否则提交到线程池去执行;例如,main线程中调用addListener时任务完成,notifyListeners()执行回调,会提交到线程池中执行;而如果是执行Future任务的线程池中setSuccess()时调用notifyListeners(),会放在当前线程中执行。
  • 内部维护了notifyingListeners用来记录是否已经触发过监听事件,只有未触发过且监听列表不为空,才会依次便利并调用operationComplete

DefaultPromise.java | DefaultPromise.setSuccess0()、setFailure0() 唤起等待线程通知成功/失败

1// 设置成功后唤醒等待线程
 2private boolean setSuccess0(V result) {
 3    return setValue0(result == null ? SUCCESS : result);
 4}
 5
 6// 设置成功后唤醒等待线程
 7private boolean setFailure0(Throwable cause) {
 8    return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
 9}
10
11// 通知成功时将结果保存在变量result,通知失败时,使用CauseHolder包装Throwable赋值给result
12// RESULT_UPDATER 是一个使用CAS更新内部属性result的类,
13// 如果result为null或UNCANCELLABLE,更新为成功/失败结果;UNCANCELLABLE是不可取消状态
14private boolean setValue0(Object objResult) {
15    if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
16        RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
17        // 检查是否有服务,如果有,通知他们。
18        if (checkNotifyWaiters()) {
19            notifyListeners();  // 通知
20        }
21        return true;
22    }
23    return false;
24}

Future任务在执行完成后调用setSuccess()或setFailure()通知Future执行结果;主要逻辑是:修改result的值,若有等待线程则唤醒,通知监听事件。


DefaultChannelPromise实现

1/**
 2 * The default {@link ChannelPromise} implementation.  It is recommended to use {@link Channel#newPromise()} to create
 3 * a new {@link ChannelPromise} rather than calling the constructor explicitly.
 4 */
 5public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {
 6
 7    private final Channel channel;
 8    private long checkpoint;
 9
10    ...
11}
  • 从继承关系可以看到DefaultChannelPromise是DefaultPromise的实现类,内部维护了一个通道变量Channel。
  • 另外还实现了FlushCheckpoint接口,给ChannelFlushPromiseNotifier使用,我们可以将ChannelFuture注册到ChannelFlushPromiseNotifier类,当有数据写入或到达checkpoint时使用。
1interface FlushCheckpoint {
2    long flushCheckpoint();
3    void flushCheckpoint(long checkpoint)
4    ChannelPromise promise();
5}
目录
相关文章
|
2月前
|
前端开发 JavaScript
如何处理 JavaScript 中的异步操作和 Promise?
如何处理 JavaScript 中的异步操作和 Promise?
28 1
|
2月前
|
JavaScript 前端开发
如何处理 Vue 中的异步操作和 Promise?
如何处理 Vue 中的异步操作和 Promise?
97 1
|
20天前
|
消息中间件 传感器 Cloud Native
事件驱动作为分布式异步服务架构
【6月更文挑战第25天】本文介绍事件驱动架构(EDA)是异步分布式设计的关键模式,适用于高扩展性需求。EDA提升服务韧性,支持CQRS、数据通知、开放式接口和事件流处理。然而,其脆弱性包括组件控制、数据交换、逻辑关系复杂性、潜在死循环和高并发挑战。EDA在云原生环境,如Serverless,中尤其适用。
42 2
事件驱动作为分布式异步服务架构
|
18天前
|
前端开发 JavaScript
Promise是JavaScript解决异步问题的构造器,代表未来的不确定值。
【6月更文挑战第27天】Promise是JavaScript解决异步问题的构造器,代表未来的不确定值。它避免了回调地狱,通过链式调用`.then()`和`.catch()`使异步流程清晰。
18 2
|
1月前
|
JSON 前端开发 JavaScript
ES6引入Promise和async/await解决异步问题
【6月更文挑战第12天】ES6引入Promise和async/await解决异步问题。Promise处理异步操作,有pending、fulfilled、rejected三种状态,支持链式调用和并行处理。async/await是基于Promise的语法糖,使异步代码更同步化,提高可读性。两者都是处理回调地狱的有效工具,开发者应根据需求选择合适的方式。
41 3
|
11天前
|
前端开发 小程序 API
【微信小程序】使用 Promise、async 和 await 将异步API 改写为同步
【微信小程序】使用 Promise、async 和 await 将异步API 改写为同步
20 0
|
2月前
|
前端开发 JavaScript 安全
Promise/A+ 规范详解:打造健壮异步代码的必备知识(上)
Promise/A+ 规范详解:打造健壮异步代码的必备知识(上)
Promise/A+ 规范详解:打造健壮异步代码的必备知识(上)
|
2月前
|
前端开发 JavaScript
掌握 Promise.all:优雅处理多个异步操作
掌握 Promise.all:优雅处理多个异步操作
|
2月前
|
前端开发 JavaScript
Promise的链式调用案例讲解
Promise的链式调用案例讲解
|
2月前
|
前端开发 JavaScript UED
使用Promise或者async/await处理游戏中的异步操作。
JavaScript中的Promise和async/await常用于处理游戏开发中的异步操作,如加载资源、网络请求和动画帧更新。Promise代表异步操作的结果,通过.then()和.catch()处理回调。async/await提供了一种更简洁的语法,使异步代码看起来更同步。在游戏循环中,使用async/await管理资源加载可提高代码可读性,但需注意避免阻塞UI线程,并妥善处理加载顺序、错误和资源管理,以保证游戏性能和稳定性。
25 2