JDK11的新特性:HTTP API和reactive streams

简介: JDK11的新特性:HTTP API和reactive streams

目录



JDK11的新特性:HTTP API和reactive streams


简介


JDK11的新特性:新的HTTP API中,我们介绍了通过新的HTTP API,我们可以发送同步或者异步的请求,并获得的返回的结果。


今天我们想探讨一下这些同步或者异步请求和响应和reactive streams的关系。


更多内容请访问www.flydean.com


怎么在java中使用reactive streams


reactive streams的介绍大家可以参考reactive stream协议详解,使用reactive streams的目的就是为了解决发送者和消费者之间的通信问题,发送者不会发送超出消费者能力的信息。


我们再回顾一下reactive streams中的几个关键概念:


  • Publisher 负责产生消息或者事件,并提供了一个subscribed接口来和Subscriber进行连接。
  • Subscriber 用来subscribe一个Publisher,并提供了onNext方法来处理新的消息,onError来处理异常,onComplete提供给Publisher调用来结束监听。
  • Subscription 负责连接Publisher和Subscriber,可以用来请求消息或者取消收听。

更进一步,如果我们想要自己实现一个reactive streams,我们需要做这些事情:


  1. 创建Publisher和Subscriber
  • 创建Publisher和Subscriber。
  • 调用Publisher.subscribe(Subscriber)建立Publisher和Subscriber之间的连接。
  • Publisher创建一个Subscription,并调用Subscriber.onSubscription(Subscription)方法。
  • Subscriber将Subscription保存起来,供后面使用。


  1. 发送和接收信息
  • Subscriber调用Subscription.request(n) 方法请求n个消息。
  • Publisher调用Subscriber.onNext(item) 将请求的消息发送给Subscriber。
  • 按照需要重复上诉过程。


  1. 取消或者结束
  • Publisher调用Subscriber.OnError(err) 或者 Subscriber.onComplete()方法。
  • Subscriber调用Subscription.cancel()方法。


POST请求的例子


还记得上篇文章我们讲HTTP API新特性的时候,我们使用的例子吗?


例子中,我们使用了一个HttpRequest.BodyPublisher,用来构建Post请求,而BodyPublisher就是一个Flow.Publisher:


public interface BodyPublisher extends Flow.Publisher<ByteBuffer>


也就是说从BodyPublisher开始,就已经在使用reactive streams了。


为了能够更好的了解reactive streams的工作原理,我们创建几个wrapper类将Publisher,Subscriber,Subscription包装起来,输出相应的日志。


代码有点多我们就不一一列出来了,这里只列一个CustBodyPublisher的具体实现:


public class CustBodyPublisher implements HttpRequest.BodyPublisher {
    private final HttpRequest.BodyPublisher bodyPublisher;
    public CustBodyPublisher(HttpRequest.BodyPublisher bodyPublisher){
        this.bodyPublisher=bodyPublisher;
    }
    @Override
    public long contentLength() {
        long contentLength=bodyPublisher.contentLength();
        log.info("contentLength:{}",contentLength);
        return contentLength;
    }
    @Override
    public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
        log.info("CustBodyPublisher subscribe {}",subscriber);
        bodyPublisher.subscribe(new CustSubscriber(subscriber));
    }
}


wrapper类很简单,通过构造函数传入要wrapper的类,然后在相应的方法中调用实际wrapper类的方法。


最后,我们将之前使用的调用HTTP API的例子改造一下:


public void testCustPost() throws IOException, InterruptedException {
        HttpClient client = HttpClient.newBuilder().build();
        HttpRequest.BodyPublisher requestBody = HttpRequest.BodyPublishers
                .ofString("{ 我是body }");
        CustBodyPublisher custBodyPublisher= new CustBodyPublisher(requestBody);
        HttpRequest postRequest = HttpRequest.newBuilder()
                .POST(custBodyPublisher)
                .uri(URI.create("http://www.flydean.com"))
                .build();
        HttpResponse<String> response = client
                .send(postRequest, HttpResponse.BodyHandlers.ofString());
        log.info("response {}",response);
    }


注意这里CustBodyPublisher custBodyPublisher= new CustBodyPublisher(requestBody),我们创建了一个新的wrapper类。


运行它,观察输出结果:


[HttpClient-1-Worker-0] INFO com.flydean.CustBodyPublisher - contentLength:14
[HttpClient-1-Worker-0] INFO com.flydean.CustBodyPublisher - CustBodyPublisher subscribe jdk.internal.net.http.Http1Request$FixedContentSubscriber@672776b6
[HttpClient-1-Worker-0] INFO com.flydean.CustSubscriber - CustSubscriber onSubscribe jdk.internal.net.http.PullPublisher$Subscription@580ce038
[HttpClient-1-Worker-0] INFO com.flydean.CustSubscription - CustSubscription request 1
[HttpClient-1-Worker-0] INFO com.flydean.CustSubscriber - CustSubscriber onNext length 14
[HttpClient-1-Worker-0] INFO com.flydean.CustSubscription - CustSubscription request 1
[HttpClient-1-Worker-0] INFO com.flydean.CustSubscriber - CustSubscriber onComplete
[main] INFO com.flydean.ReactiveHttpUsage - response (POST http://www.flydean.com) 200


可以看到reactive stream的具体工作流程。首先创建了CustBodyPublisher,然后调用了subscribe方法。


接着CustSubscriber调用onSubscribe创建了Subscription。


每次CustSubscription的request方法都会导致CustSubscriber的onNext方法被调用。


最后当CustSubscription再次请求无结果的时候,CustSubscriber调用onComplete方法结束整个流程。


注意,上面的例子中,我们wrapper调用的是BodyPublishers.ofString,其实BodyPublishers中内置了多种BodyPublisher的实现。感兴趣的朋友可以自行探索。


总结


本文讲解了新的HTTP API中reactive Streams的使用。


相关文章
|
1月前
|
容器
jdk8新特性-详情查看文档
jdk8新特性-详情查看文档
48 7
|
3月前
|
存储 安全 Java
jdk21的外部函数和内存API(MemorySegment)(官方翻译)
本文介绍了JDK 21中引入的外部函数和内存API(MemorySegment),这些API使得Java程序能够更安全、高效地与JVM外部的代码和数据进行互操作,包括调用外部函数、访问外部内存,以及使用不同的Arena竞技场来分配和管理MemorySegment。
84 1
jdk21的外部函数和内存API(MemorySegment)(官方翻译)
|
4月前
|
容器
jdk8新特性-详情查看文档
jdk8新特性-详情查看文档
50 3
|
3月前
|
存储 安全 Java
JDK1.8 新的特性
JDK1.8 新的特性
32 0
|
4月前
|
安全 Java API
【性能与安全的双重飞跃】JDK 22外部函数与内存API:JNI的继任者,引领Java新潮流!
【9月更文挑战第7天】JDK 22外部函数与内存API的发布,标志着Java在性能与安全性方面实现了双重飞跃。作为JNI的继任者,这一新特性不仅简化了Java与本地代码的交互过程,还提升了程序的性能和安全性。我们有理由相信,在外部函数与内存API的引领下,Java将开启一个全新的编程时代,为开发者们带来更加高效、更加安全的编程体验。让我们共同期待Java在未来的辉煌成就!
84 11
|
4月前
|
安全 Java API
【本地与Java无缝对接】JDK 22外部函数和内存API:JNI终结者,性能与安全双提升!
【9月更文挑战第6天】JDK 22的外部函数和内存API无疑是Java编程语言发展史上的一个重要里程碑。它不仅解决了JNI的诸多局限和挑战,还为Java与本地代码的互操作提供了更加高效、安全和简洁的解决方案。随着FFM API的逐渐成熟和完善,我们有理由相信,Java将在更多领域展现出其强大的生命力和竞争力。让我们共同期待Java编程新纪元的到来!
131 11
|
4月前
|
监控 Java 大数据
【Java内存管理新突破】JDK 22:细粒度内存管理API,精准控制每一块内存!
【9月更文挑战第9天】虽然目前JDK 22的确切内容尚未公布,但我们可以根据Java语言的发展趋势和社区的需求,预测细粒度内存管理API可能成为未来Java内存管理领域的新突破。这套API将为开发者提供前所未有的内存控制能力,助力Java应用在更多领域发挥更大作用。我们期待JDK 22的发布,期待Java语言在内存管理领域的持续创新和发展。
|
3月前
|
API
使用`System.Net.WebClient`类发送HTTP请求来调用阿里云短信API
使用`System.Net.WebClient`类发送HTTP请求来调用阿里云短信API
49 0
|
4月前
|
编解码 安全 Java
jdk8新特性-接口和日期处理
jdk8新特性-接口和日期处理
|
5月前
|
Oracle Java 关系型数据库
JDK8到JDK29版本升级的新特性问题之未来JDK的升级是否会成为必然趋势,如何理解
JDK8到JDK29版本升级的新特性问题之未来JDK的升级是否会成为必然趋势,如何理解