Java异步非阻塞编程的几种方式

简介: Java异步非阻塞编程的几种方式

image.png

作者 | 尘定
来源 | 阿里技术公众号

一 从一个同步的Http调用说起

一个很简单的业务逻辑,其他后端服务提供了一个接口,我们需要通过接口调用,获取到响应的数据。

逆地理接口:通过经纬度获取这个经纬度所在的省市区县以及响应的code:

curl-i"http://xxx?latitude=31.08966221524924&channel=amap7a&near=false&longitude=105.13990312814713"
{"adcode":"510722"}

服务端执行,最简单的同步调用方式:

image.png

服务端响应之前,IO会阻塞在:java.net.SocketInputStream#socketRead0 的native方法上:

image.png

通过jstack日志,可以发现,此时这个Thread会一直在runable的状态:

"main"#1 prio=5 os_prio=31 tid=0x00007fed0c810000 nid=0x1003 runnable [0x000070000ce14000]   java.lang.Thread.State: RUNNABLE
        at java.net.SocketInputStream.socketRead0(Native Method)
        at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
        at java.net.SocketInputStream.read(SocketInputStream.java:171)
        at java.net.SocketInputStream.read(SocketInputStream.java:141)
        at org.apache.http.impl.conn.LoggingInputStream.read(LoggingInputStream.java:84)
        at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137)
        at org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:153)
        at org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:282)
        at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138)
        at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56)
        at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:259)
        at org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:163)
        at org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:165)
        at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:273)
        at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125)
        at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272)
        at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
        at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)
        at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
        at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
        at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
        at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108)
        at com.amap.aos.async.AsyncIO.blockingIO(AsyncIO.java:207)
                .......

线程模型示例:

image.png

同步最大的问题是在IO等待的过程中,线程资源没有得到充分的利用,对于大量IO场景的业务吞吐量会有一定限制。

二 JDK NIO & Future

在JDK 1.5 中,JUC提供了Future抽象:

image.png

image.png

当然并不是所有的Future都是这样实现的,如 io.netty.util.concurrent.AbstractFuture 就是通过线程轮询去。

这样做的好处是,主线程可以不用等待IO响应,可以去做点其他的,比如说再发送一个IO请求,可以等到一起返回:

"main"#1 prio=5 os_prio=31 tid=0x00007fd7a500b000 nid=0xe03 waiting on condition [0x000070000a95d000]   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x000000076ee2d768> (a java.util.concurrent.CountDownLatch$Sync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
        at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
        at org.asynchttpclient.netty.NettyResponseFuture.get(NettyResponseFuture.java:162)
        at com.amap.aos.async.AsyncIO.futureBlockingGet(AsyncIO.java:201)
        .....
"AsyncHttpClient-2-1"#11 prio=5 os_prio=31 tid=0x00007fd7a7247800 nid=0x340b runnable [0x000070000ba94000]   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
        at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
        at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x000000076eb00ef0> (a io.netty.channel.nio.SelectedSelectionKeySet)
- locked <0x000000076eb00f10> (a java.util.Collections$UnmodifiableSet)
- locked <0x000000076eb00ea0> (a sun.nio.ch.KQueueSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:693)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:353)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        at java.lang.Thread.run(Thread.java:748)

image.png

主线程在等待结果返回过程中依然需要等待,没有根本解决此问题。

三 使用Callback回调方式

第二节中,依然需要主线程等待,获取结果,那么可不可以在主线程完成发送请求后,再也不用关心这个逻辑,去执行其他的逻辑?那就可以使用Callback机制。

image.png

如此一来,主线程再也不需要关心发起IO后的业务逻辑,发送完请求后,就可以彻底去干其他事情,或者回到线程池中再供调度。如果是HttpServer,那么需要结合Servlet 3.1的异步Servlet。

image.png
image.png

异步Servelt参考资料
https://www.cnblogs.com/davenkin/p/async-servlet.html

使用Callback方式,从线程模型中看,发现线程资源已经得到了比较充分的利用,整个过程中已经没有线程阻塞。

四 Callback hell

回调地狱,当Callback的线程还需要执行下一个IO调用的时候,这个时候进入回调地狱模式。

典型的应用场景如,通过经纬度获取行政区域adcode(逆地理接口),然后再根据获得的adcode,获取当地的天气信息(天气接口)。

在同步的编程模型中,几乎不会涉及到此类问题。

image.png

Callback方式的核心缺陷

五 JDK 1.8 CompletableFuture

那么有没有办法解决Callback Hell的问题?当然有,JDK 1.8中提供了CompletableFuture,先看看它是怎么解决这个问题的。

将逆地理的Callback逻辑,封装成一个独立的CompletableFuture,当异步线程回调时,调用 future.complete(T) ,将结果封装。

image.png

将天气执行的Call逻辑,也封装成为一个独立的CompletableFuture ,完成之后,逻辑同上。

image.png

compose衔接,whenComplete输出:

image.png

每一个IO操作,均可以封装为独立的CompletableFuture,从而避免回调地狱。

CompletableFuture,只有两个属性:

  • result:Future的执行结果 (Either the result or boxed AltResult)。
  • stack:操作栈,用于定义这个Future接下来操作的行为 (Top of Treiber stack of dependent actions)。

weatherFuture这个方法是如何被调用的呢?

通过堆栈可以发现,是在 reverseCodeFuture.complete(result) 的时候,并且也将获得的adcode作为参数执行接下来的逻辑。

image.png

这样一来,就完美解决回调地狱问题,在主的逻辑中,看起来像是在同步的进行编码。

六 Vert.x Future

Info-Service中,大量使用的 Vert.x Future 也是类似的解决的方案,不过设计上使用Handler的概念。

image.png

核心执行的逻辑差不多:

image.png

这当然不是Vertx的全部,当然这是题外话了。

七 Reactive Streams

异步编程对吞吐量以及资源有好处,但是有没有统一的抽象去解决此类问题内,答案是 Reactive Streams。

核心抽象:Publisher Subscriber Processor Subscription ,整个包里面,只有这四个接口,没有实现类。

image.png


在JDK 9里面,已经被作为一种规范封装到 java.util.concurrent.Flow :

image.png
image.png

参考资料
https://www.baeldung.com/java-9-reactive-streams
https://www.reactivemanifesto.org/
https://projectreactor.io/learn

一个简单的例子:

image.png

八 Reactor & Spring 5 & Spring WebFlux

Flux & Mono
image.png
image.png

参考资料
https://projectreactor.io/docs/core/3.1.0.M3/reference/index.html
https://speakerdeck.com/simonbasle/projectreactor-dot-io-reactor3-intro
相关文章
|
2月前
|
Java 开发者
Java多线程编程中的常见误区与最佳实践####
本文深入剖析了Java多线程编程中开发者常遇到的几个典型误区,如对`start()`与`run()`方法的混淆使用、忽视线程安全问题、错误处理未同步的共享变量等,并针对这些问题提出了具体的解决方案和最佳实践。通过实例代码对比,直观展示了正确与错误的实现方式,旨在帮助读者构建更加健壮、高效的多线程应用程序。 ####
|
1月前
|
Java 程序员
Java编程中的异常处理:从基础到高级
在Java的世界中,异常处理是代码健壮性的守护神。本文将带你从异常的基本概念出发,逐步深入到高级用法,探索如何优雅地处理程序中的错误和异常情况。通过实际案例,我们将一起学习如何编写更可靠、更易于维护的Java代码。准备好了吗?让我们一起踏上这段旅程,解锁Java异常处理的秘密!
|
16天前
|
监控 Java
java异步判断线程池所有任务是否执行完
通过上述步骤,您可以在Java中实现异步判断线程池所有任务是否执行完毕。这种方法使用了 `CompletionService`来监控任务的完成情况,并通过一个独立线程异步检查所有任务的执行状态。这种设计不仅简洁高效,还能确保在大量任务处理时程序的稳定性和可维护性。希望本文能为您的开发工作提供实用的指导和帮助。
74 17
|
29天前
|
存储 缓存 Java
Java 并发编程——volatile 关键字解析
本文介绍了Java线程中的`volatile`关键字及其与`synchronized`锁的区别。`volatile`保证了变量的可见性和一定的有序性,但不能保证原子性。它通过内存屏障实现,避免指令重排序,确保线程间数据一致。相比`synchronized`,`volatile`性能更优,适用于简单状态标记和某些特定场景,如单例模式中的双重检查锁定。文中还解释了Java内存模型的基本概念,包括主内存、工作内存及并发编程中的原子性、可见性和有序性。
Java 并发编程——volatile 关键字解析
|
1月前
|
算法 Java 调度
java并发编程中Monitor里的waitSet和EntryList都是做什么的
在Java并发编程中,Monitor内部包含两个重要队列:等待集(Wait Set)和入口列表(Entry List)。Wait Set用于线程的条件等待和协作,线程调用`wait()`后进入此集合,通过`notify()`或`notifyAll()`唤醒。Entry List则管理锁的竞争,未能获取锁的线程在此排队,等待锁释放后重新竞争。理解两者区别有助于设计高效的多线程程序。 - **Wait Set**:线程调用`wait()`后进入,等待条件满足被唤醒,需重新竞争锁。 - **Entry List**:多个线程竞争锁时,未获锁的线程在此排队,等待锁释放后获取锁继续执行。
66 12
|
29天前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
160 2
|
2月前
|
设计模式 Java 开发者
Java多线程编程的陷阱与解决方案####
本文深入探讨了Java多线程编程中常见的问题及其解决策略。通过分析竞态条件、死锁、活锁等典型场景,并结合代码示例和实用技巧,帮助开发者有效避免这些陷阱,提升并发程序的稳定性和性能。 ####
|
2月前
|
缓存 Java 开发者
Java多线程编程的陷阱与最佳实践####
本文深入探讨了Java多线程编程中常见的陷阱,如竞态条件、死锁和内存一致性错误,并提供了实用的避免策略。通过分析典型错误案例,本文旨在帮助开发者更好地理解和掌握多线程环境下的编程技巧,从而提升并发程序的稳定性和性能。 ####
|
1月前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
1月前
|
Java 调度
Java中的多线程编程与并发控制
本文深入探讨了Java编程语言中多线程编程的基础知识和并发控制机制。文章首先介绍了多线程的基本概念,包括线程的定义、生命周期以及在Java中创建和管理线程的方法。接着,详细讲解了Java提供的同步机制,如synchronized关键字、wait()和notify()方法等,以及如何通过这些机制实现线程间的协调与通信。最后,本文还讨论了一些常见的并发问题,例如死锁、竞态条件等,并提供了相应的解决策略。
66 3