vertx学习总结5之回调函数及其限制,如网关/边缘服务示例所示未来和承诺——链接异步操作的简单模型响应式扩展——一个更强大的模型,特别适合组合异步事件流Kotlin协程

简介: 本文是Vert.x学习系列的第五部分,讨论了回调函数的限制、Future和Promise在异步操作中的应用、响应式扩展以及Kotlin协程,并通过示例代码展示了如何在Vert.x中使用这些异步编程模式。

这章我们讲回调,英文名:Beyond callbacks

一、章节覆盖:

回调函数及其限制,如网关/边缘服务示例所示
未来和承诺——链接异步操作的简单模型
响应式扩展——一个更强大的模型,特别适合组合异步事件流
Kotlin协程——对异步代码执行流的语言级支持

二、Vert.x中的回调函数是一种非阻塞的异步编程模式,用于处理异步操作的结果。在Vert.x中,回调函数通常用于处理事件,如HTTP请求和数据库查询等,其限制包括:

  1. 回调函数是非阻塞的,不能使用同步代码块或阻塞I/O操作。

  2. 回调函数的执行顺序不可预测,因为它们是异步执行的。

  3. 回调函数需要明确处理错误,不能简单地忽略异常或错误。

  4. 回调函数应该尽量轻量级,避免太复杂的逻辑和操作,以免影响整个应用程序的性能。

  5. 回调函数应该采用良好的编程约定,如命名规范和注释,以便于维护和管理。

回调地狱是指使用嵌套回调来链接异步操作,由于嵌套很深,导致代码更难以理解。对于嵌套回调,错误处理尤其困难。
虽然这是真的,但是可以很容易地为每个异步操作回调使用一个方法来缓解回调地狱,就像我们使用handleRequest、sendToSnapshot和sendResponse方法一样。每个方法只做一件事,我们避免嵌套回调。

二、Futures and promises in Vert.x

类似于:

package furtueAndprimise;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;

public class Test extends AbstractVerticle {
    @Override
    public void start(Promise<Void> startPromise) throws Exception {
        Promise<String> promise=Promise.promise();
        vertx.setTimer(5000,id->{
            if (System.currentTimeMillis()%2L==0L)
                 promise.complete("ok");
            else
                 promise.fail(new RuntimeException("Bad luck...."));

        });
    }
}

这里的异步操作是一个5秒的计时器,之后承诺就完成了。根据当前时间是奇数还是偶数,承诺以一个值完成或以一个异常失败。这很好,但我们如何从承诺中获得价值呢?
想要在结果可用时做出反应的代码需要一个future对象。

完整代码:

package furtueAndprimise;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;

public class Test extends AbstractVerticle {
    @Override
    public void start(Promise<Void> startPromise) throws Exception {
        Promise<String> promise=Promise.promise();
        vertx.setTimer(5000,id->{
            if (System.currentTimeMillis()%2L==0L)
                 promise.complete("ok");
            else
                 promise.fail(new RuntimeException("Bad luck...."));

        });
        Future<String> future = promise.future();
        future.onSuccess(System.out::println)
                .onFailure(err -> System.out.println(err.getMessage()));

    }

    public static void main(String[] args) {
        Vertx vertx1=Vertx.vertx();
        vertx1.deployVerticle(new Test());
    }
}

用未来的方法启动HTTP服务器:

package furtueAndprimise;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.impl.Http1xServerRequestHandler;

public class Test001 extends AbstractVerticle {
    @Override
    public void start(Promise<Void> promise) {
        vertx.createHttpServer()
                .requestHandler(requestHandler())
                .listen(8080)
                .onFailure(fail->{
                    promise.fail("失败");
                })
                .onSuccess(ok -> {
                    System.out.println("http://localhost:8080/");
                    promise.complete();
                });
    }

    Handler<HttpServerRequest> requestHandler(){
        return request -> {
            // 处理HTTP请求的逻辑
            // ...
        };
    }


    public static void main(String[] args) {
        Vertx vertx1 = Vertx.vertx();
        vertx1.deployVerticle(new Test001());

    }
}

与CompletionStage api的互操作性:

package furtueAndprimise;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;

import java.util.concurrent.CompletionStage;

public class Test002 extends AbstractVerticle {
    @Override
    public void start() throws Exception {
        Promise<String> promise=Promise.promise();
        CompletionStage<String> cs = promise.future().toCompletionStage();
        cs
                .thenApply(String::toUpperCase)
                .thenApply(str -> "~~~ " + str)
                .whenComplete((str, err) -> {
                    if (err == null) {
                        System.out.println(str);
                    } else {
                        System.out.println("Oh... " + err.getMessage());
                    }
                });
    }
}

CompositeFuture 是一种特殊的 Future,它可以包装一个 Future 列表,从而让一组异步操作并行执行;然后协调这一组操作的结果,作为 CompositeFuture 的结果

三、接下来讲响应式的扩展:

ReactiveX计划为后端和前端项目提供了一个通用的API和多种语言的实现(http://reactivex.io/) 。RxJS项目为浏览器中的JavaScript应用程序提供响应式扩展,而像RxJava这样的项目则为Java生态系统提供通用的响应式扩展实现。
vertx提供了RxJava版本1和2的绑定。建议使用版本2,因为它支持背压,而版本1不支持。

单纯讲rxjava,rxjava的五种观察源:

你有时可能会读到热源和冷源。热点源是无论是否存在订阅者都要发出事件的源。冷源是在第一次订阅后才开始发出事件的源。周期计时器是热源,而要读取的文件是冷源。使用冷源,您可以获得所有事件,但使用热源,您只能获得订阅后发出的事件

四、rxjava在vertx里面集成

1.引入依赖

  implementation("io.vertx:vertx-rx-java2:version")
package furtueAndprimise;

import io.reactivex.Completable;
import io.reactivex.Observable;
import io.vertx.reactivex.core.AbstractVerticle;
import io.vertx.reactivex.core.RxHelper;

import java.util.concurrent.TimeUnit;

public class Test003 extends AbstractVerticle {
    @Override
    public Completable rxStart() {
        Observable
                .interval(1, TimeUnit.SECONDS, RxHelper.scheduler(vertx))
                .subscribe(n -> System.out.println("tick"));
        return vertx.createHttpServer()
                .requestHandler(r -> r.response().end("Ok"))
                .rxListen(8080)
                .ignoreElement();
    }
}

rxStart使用Completable而不是Future来通知部署成功。通过 ignoreElement()方法 返回一个Completable。这个例子打开一个经典的HTTP服务器,对任何请求都回复Ok。有趣的部分是AbstractVerticle的RxJava变体具有通知部署成功的rxStart(和rxStop)方法。在我们的示例中,当HTTP服务器启动时,垂直已经成功部署,因此我们返回一个Completable对象。
您可以检查前缀为rx的方法是否与为支持RxJava而生成的方法相对应。如果您检查RxJava api,您将注意到原始方法(包括回调)仍然存在。

这个例子中另一个有趣的部分是每秒发出事件的可观察对象。它本质上表现为一个Vert.X定时器可以。RxJava api中有几个操作符方法接受调度器对象,因为它们需要延迟异步任务。默认情况下,它们从自己管理的内部工作线程池回调,这就打破了vertx线程模型假设。我们总是可以经过vertx调度器,以确保事件仍在原始上下文事件循环中被回调。

五、Kotlin coroutines

引入依赖

plugins {
kotlin("jvm") version "kotlinVersion"
}
dependencies {
// (...)
implementation("io.vertx:vertx-lang-kotlin:${vertxVersion}")
implementation("io.vertx:vertx-lang-kotlin-coroutines:${vertxVersion}")
implementation(kotlin("stdlib-jdk8"))
}

一直都在讲vertx,所以Kotlin coroutines没有怎么讲,算了,我还是搞一个例子吧

import kotlin.coroutines.*;
suspend fun hello():String{
    delay(1000)
    return "Hello!"
}

fun main(){
  runBlocking{
      println(hello())    //这个runBlocking  会一直等协程完成
}

注:这个系列的截图一直都是vertx in Action 那本英文书里面的。。

目录
相关文章
|
3月前
|
安全 网络安全 网络虚拟化
配置总部采用冗余网关与分支建立IPSec隧道示例
本文介绍了企业总部与分支间通过公网通信的组网需求及配置思路。为提高可靠性,分支网关AR5可接入两台总部网关(AR2和AR3),并建立IPSec隧道保障通信安全。配置步骤包括:1) 配置接口IP地址与静态路由;2) 定义ACL保护数据流;3) 创建IPSec安全提议;4) 配置IKE对等体;5) 创建安全策略;6) 在接口应用安全策略组。最终通过ping测试与查看隧道状态验证配置结果,确保流量安全传输。
配置总部采用冗余网关与分支建立IPSec隧道示例
|
4月前
|
人工智能 负载均衡 API
长连接网关技术专题(十二):大模型时代多模型AI网关的架构设计与实现
随着 AI 技术快速发展,业务对 AI 能力的渴求日益增长。当 AI 服务面对处理大规模请求和高并发流量时,AI 网关从中扮演着至关重要的角色。AI 服务通常涉及大量的计算任务和设备资源占用,此时需要一个 AI 网关负责协调这些请求来确保系统的稳定性与高效性。因此,与传统微服务架构类似,我们将相关 API 管理的功能(如流量控制、用户鉴权、配额计费、负载均衡、API 路由等)集中放置在 AI 网关层,可以降低系统整体复杂度并提升可维护性。 本文要分享的是B站在大模型时代基于多模型AI的网关架构设计和实践总结,希望能带给你启发。
320 4
|
5月前
|
人工智能 算法 网络安全
基于PAI+专属网关+私网连接:构建全链路Deepseek云上私有化部署与模型调用架构
本文介绍了阿里云通过PAI+专属网关+私网连接方案,帮助企业实现DeepSeek-R1模型的私有化部署。方案解决了算力成本高、资源紧张、部署复杂和数据安全等问题,支持全链路零公网暴露及全球低延迟算力网络,最终实现技术可控、成本优化与安全可靠的AI部署路径,满足企业全球化业务需求。
|
2月前
|
数据采集 监控 调度
干货分享“用 多线程 爬取数据”:单线程 + 协程的效率反超 3 倍,这才是 Python 异步的正确打开方式
在 Python 爬虫中,多线程因 GIL 和切换开销效率低下,而协程通过用户态调度实现高并发,大幅提升爬取效率。本文详解协程原理、实战对比多线程性能,并提供最佳实践,助你掌握异步爬虫核心技术。
|
3月前
|
应用服务中间件 网络安全 数据安全/隐私保护
网关服务器配置指南:实现自动DHCP地址分配、HTTP服务和SSH无密码登录。
哇哈哈,道具都准备好了,咱们的魔术秀就要开始了。现在,你的网关服务器已经魔法满满,自动分配IP,提供网页服务,SSH登录如入无人之境。而整个世界,只会知道效果,不会知道是你在幕后操控一切。这就是真正的数字世界魔法师,随手拈来,手到擒来。
186 14
|
2月前
|
网络协议 C++
ASM网关迁移方案示例
本文介绍了在集群拆分过程中,如何通过配置ServiceEntry和流量规则将老网关上的TCP流量转发至新集群的网关,确保部分仍访问老网关的客户端能顺利过渡。内容包括配置Istio组件、回滚方法及内网SLB创建方式,适用于ASM+ACK架构下的平滑迁移场景。
71 0
|
11月前
|
安全 5G 网络性能优化
深入理解5G中的SAEGW:服务网关边界
【10月更文挑战第9天】
369 0
|
9月前
|
NoSQL 前端开发 测试技术
👀探秘微服务:从零开启网关 SSO 服务搭建之旅
单点登录(Single Sign-On,简称SSO)是一种认证机制,它允许用户只需一次登录就可以访问多个应用程序或系统。本文结合网关和SaToken快速搭建可用的Session管理服务。
498 8
|
10月前
|
存储 安全 测试技术
GoLang协程Goroutiney原理与GMP模型详解
本文详细介绍了Go语言中的Goroutine及其背后的GMP模型。Goroutine是Go语言中的一种轻量级线程,由Go运行时管理,支持高效的并发编程。文章讲解了Goroutine的创建、调度、上下文切换和栈管理等核心机制,并通过示例代码展示了如何使用Goroutine。GMP模型(Goroutine、Processor、Machine)是Go运行时调度Goroutine的基础,通过合理的调度策略,实现了高并发和高性能的程序执行。
571 29
|
10月前
|
负载均衡 Java 应用服务中间件
Gateway服务网关
Gateway服务网关
272 1
Gateway服务网关