vertx学习总结5之回调函数及其限制,如网关/边缘服务示例所示未来和承诺——链接异步操作的简单模型响应式扩展——一个更强大的模型,特别适合组合异步事件流Kotlin协程

简介: 本文是Vert.x学习系列的第五部分,讨论了回调函数的限制、Future和Promise在异步操作中的应用、响应式扩展以及Kotlin协程,并通过示例代码展示了如何在Vert.x中使用这些异步编程模式。

这章我们讲回调,英文名:Beyond callbacks

一、章节覆盖:

回调函数及其限制,如网关/边缘服务示例所示
未来和承诺——链接异步操作的简单模型
响应式扩展——一个更强大的模型,特别适合组合异步事件流
Kotlin协程——对异步代码执行流的语言级支持

二、Vert.x中的回调函数是一种非阻塞的异步编程模式,用于处理异步操作的结果。在Vert.x中,回调函数通常用于处理事件,如HTTP请求和数据库查询等,其限制包括:

  1. 回调函数是非阻塞的,不能使用同步代码块或阻塞I/O操作。

  2. 回调函数的执行顺序不可预测,因为它们是异步执行的。

  3. 回调函数需要明确处理错误,不能简单地忽略异常或错误。

  4. 回调函数应该尽量轻量级,避免太复杂的逻辑和操作,以免影响整个应用程序的性能。

  5. 回调函数应该采用良好的编程约定,如命名规范和注释,以便于维护和管理。

回调地狱是指使用嵌套回调来链接异步操作,由于嵌套很深,导致代码更难以理解。对于嵌套回调,错误处理尤其困难。
虽然这是真的,但是可以很容易地为每个异步操作回调使用一个方法来缓解回调地狱,就像我们使用handleRequest、sendToSnapshot和sendResponse方法一样。每个方法只做一件事,我们避免嵌套回调。

二、Futures and promises in Vert.x

类似于:

package furtueAndprimise;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;

public class Test extends AbstractVerticle {
    @Override
    public void start(Promise<Void> startPromise) throws Exception {
        Promise<String> promise=Promise.promise();
        vertx.setTimer(5000,id->{
            if (System.currentTimeMillis()%2L==0L)
                 promise.complete("ok");
            else
                 promise.fail(new RuntimeException("Bad luck...."));

        });
    }
}

这里的异步操作是一个5秒的计时器,之后承诺就完成了。根据当前时间是奇数还是偶数,承诺以一个值完成或以一个异常失败。这很好,但我们如何从承诺中获得价值呢?
想要在结果可用时做出反应的代码需要一个future对象。

完整代码:

package furtueAndprimise;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;

public class Test extends AbstractVerticle {
    @Override
    public void start(Promise<Void> startPromise) throws Exception {
        Promise<String> promise=Promise.promise();
        vertx.setTimer(5000,id->{
            if (System.currentTimeMillis()%2L==0L)
                 promise.complete("ok");
            else
                 promise.fail(new RuntimeException("Bad luck...."));

        });
        Future<String> future = promise.future();
        future.onSuccess(System.out::println)
                .onFailure(err -> System.out.println(err.getMessage()));

    }

    public static void main(String[] args) {
        Vertx vertx1=Vertx.vertx();
        vertx1.deployVerticle(new Test());
    }
}

用未来的方法启动HTTP服务器:

package furtueAndprimise;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.impl.Http1xServerRequestHandler;

public class Test001 extends AbstractVerticle {
    @Override
    public void start(Promise<Void> promise) {
        vertx.createHttpServer()
                .requestHandler(requestHandler())
                .listen(8080)
                .onFailure(fail->{
                    promise.fail("失败");
                })
                .onSuccess(ok -> {
                    System.out.println("http://localhost:8080/");
                    promise.complete();
                });
    }

    Handler<HttpServerRequest> requestHandler(){
        return request -> {
            // 处理HTTP请求的逻辑
            // ...
        };
    }


    public static void main(String[] args) {
        Vertx vertx1 = Vertx.vertx();
        vertx1.deployVerticle(new Test001());

    }
}

与CompletionStage api的互操作性:

package furtueAndprimise;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;

import java.util.concurrent.CompletionStage;

public class Test002 extends AbstractVerticle {
    @Override
    public void start() throws Exception {
        Promise<String> promise=Promise.promise();
        CompletionStage<String> cs = promise.future().toCompletionStage();
        cs
                .thenApply(String::toUpperCase)
                .thenApply(str -> "~~~ " + str)
                .whenComplete((str, err) -> {
                    if (err == null) {
                        System.out.println(str);
                    } else {
                        System.out.println("Oh... " + err.getMessage());
                    }
                });
    }
}

CompositeFuture 是一种特殊的 Future,它可以包装一个 Future 列表,从而让一组异步操作并行执行;然后协调这一组操作的结果,作为 CompositeFuture 的结果

三、接下来讲响应式的扩展:

ReactiveX计划为后端和前端项目提供了一个通用的API和多种语言的实现(http://reactivex.io/) 。RxJS项目为浏览器中的JavaScript应用程序提供响应式扩展,而像RxJava这样的项目则为Java生态系统提供通用的响应式扩展实现。
vertx提供了RxJava版本1和2的绑定。建议使用版本2,因为它支持背压,而版本1不支持。

单纯讲rxjava,rxjava的五种观察源:

你有时可能会读到热源和冷源。热点源是无论是否存在订阅者都要发出事件的源。冷源是在第一次订阅后才开始发出事件的源。周期计时器是热源,而要读取的文件是冷源。使用冷源,您可以获得所有事件,但使用热源,您只能获得订阅后发出的事件

四、rxjava在vertx里面集成

1.引入依赖

  implementation("io.vertx:vertx-rx-java2:version")
package furtueAndprimise;

import io.reactivex.Completable;
import io.reactivex.Observable;
import io.vertx.reactivex.core.AbstractVerticle;
import io.vertx.reactivex.core.RxHelper;

import java.util.concurrent.TimeUnit;

public class Test003 extends AbstractVerticle {
    @Override
    public Completable rxStart() {
        Observable
                .interval(1, TimeUnit.SECONDS, RxHelper.scheduler(vertx))
                .subscribe(n -> System.out.println("tick"));
        return vertx.createHttpServer()
                .requestHandler(r -> r.response().end("Ok"))
                .rxListen(8080)
                .ignoreElement();
    }
}

rxStart使用Completable而不是Future来通知部署成功。通过 ignoreElement()方法 返回一个Completable。这个例子打开一个经典的HTTP服务器,对任何请求都回复Ok。有趣的部分是AbstractVerticle的RxJava变体具有通知部署成功的rxStart(和rxStop)方法。在我们的示例中,当HTTP服务器启动时,垂直已经成功部署,因此我们返回一个Completable对象。
您可以检查前缀为rx的方法是否与为支持RxJava而生成的方法相对应。如果您检查RxJava api,您将注意到原始方法(包括回调)仍然存在。

这个例子中另一个有趣的部分是每秒发出事件的可观察对象。它本质上表现为一个Vert.X定时器可以。RxJava api中有几个操作符方法接受调度器对象,因为它们需要延迟异步任务。默认情况下,它们从自己管理的内部工作线程池回调,这就打破了vertx线程模型假设。我们总是可以经过vertx调度器,以确保事件仍在原始上下文事件循环中被回调。

五、Kotlin coroutines

引入依赖

plugins {
kotlin("jvm") version "kotlinVersion"
}
dependencies {
// (...)
implementation("io.vertx:vertx-lang-kotlin:${vertxVersion}")
implementation("io.vertx:vertx-lang-kotlin-coroutines:${vertxVersion}")
implementation(kotlin("stdlib-jdk8"))
}

一直都在讲vertx,所以Kotlin coroutines没有怎么讲,算了,我还是搞一个例子吧

import kotlin.coroutines.*;
suspend fun hello():String{
    delay(1000)
    return "Hello!"
}

fun main(){
  runBlocking{
      println(hello())    //这个runBlocking  会一直等协程完成
}

注:这个系列的截图一直都是vertx in Action 那本英文书里面的。。

目录
相关文章
|
23天前
|
弹性计算 人工智能 架构师
阿里云携手Altair共拓云上工业仿真新机遇
2024年9月12日,「2024 Altair 技术大会杭州站」成功召开,阿里云弹性计算产品运营与生态负责人何川,与Altair中国技术总监赵阳在会上联合发布了最新的“云上CAE一体机”。
阿里云携手Altair共拓云上工业仿真新机遇
|
15天前
|
存储 关系型数据库 分布式数据库
GraphRAG:基于PolarDB+通义千问+LangChain的知识图谱+大模型最佳实践
本文介绍了如何使用PolarDB、通义千问和LangChain搭建GraphRAG系统,结合知识图谱和向量检索提升问答质量。通过实例展示了单独使用向量检索和图检索的局限性,并通过图+向量联合搜索增强了问答准确性。PolarDB支持AGE图引擎和pgvector插件,实现图数据和向量数据的统一存储与检索,提升了RAG系统的性能和效果。
|
19天前
|
机器学习/深度学习 算法 大数据
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
2024“华为杯”数学建模竞赛,对ABCDEF每个题进行详细的分析,涵盖风电场功率优化、WLAN网络吞吐量、磁性元件损耗建模、地理环境问题、高速公路应急车道启用和X射线脉冲星建模等多领域问题,解析了问题类型、专业和技能的需要。
2570 22
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
|
17天前
|
人工智能 IDE 程序员
期盼已久!通义灵码 AI 程序员开启邀测,全流程开发仅用几分钟
在云栖大会上,阿里云云原生应用平台负责人丁宇宣布,「通义灵码」完成全面升级,并正式发布 AI 程序员。
|
1天前
|
存储 人工智能 搜索推荐
数据治理,是时候打破刻板印象了
瓴羊智能数据建设与治理产品Datapin全面升级,可演进扩展的数据架构体系为企业数据治理预留发展空间,推出敏捷版用以解决企业数据量不大但需构建数据的场景问题,基于大模型打造的DataAgent更是为企业用好数据资产提供了便利。
152 2
|
19天前
|
机器学习/深度学习 算法 数据可视化
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码
2024年中国研究生数学建模竞赛C题聚焦磁性元件磁芯损耗建模。题目背景介绍了电能变换技术的发展与应用,强调磁性元件在功率变换器中的重要性。磁芯损耗受多种因素影响,现有模型难以精确预测。题目要求通过数据分析建立高精度磁芯损耗模型。具体任务包括励磁波形分类、修正斯坦麦茨方程、分析影响因素、构建预测模型及优化设计条件。涉及数据预处理、特征提取、机器学习及优化算法等技术。适合电气、材料、计算机等多个专业学生参与。
1566 16
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码
|
2天前
|
JSON 自然语言处理 数据管理
阿里云百炼产品月刊【2024年9月】
阿里云百炼产品月刊【2024年9月】,涵盖本月产品和功能发布、活动,应用实践等内容,帮助您快速了解阿里云百炼产品的最新动态。
阿里云百炼产品月刊【2024年9月】
|
21天前
|
编解码 JSON 自然语言处理
通义千问重磅开源Qwen2.5,性能超越Llama
击败Meta,阿里Qwen2.5再登全球开源大模型王座
922 14
|
16天前
|
人工智能 开发框架 Java
重磅发布!AI 驱动的 Java 开发框架:Spring AI Alibaba
随着生成式 AI 的快速发展,基于 AI 开发框架构建 AI 应用的诉求迅速增长,涌现出了包括 LangChain、LlamaIndex 等开发框架,但大部分框架只提供了 Python 语言的实现。但这些开发框架对于国内习惯了 Spring 开发范式的 Java 开发者而言,并非十分友好和丝滑。因此,我们基于 Spring AI 发布并快速演进 Spring AI Alibaba,通过提供一种方便的 API 抽象,帮助 Java 开发者简化 AI 应用的开发。同时,提供了完整的开源配套,包括可观测、网关、消息队列、配置中心等。
690 9
|
15天前
|
存储 监控 调度
云迁移中心CMH:助力企业高效上云实践全解析
随着云计算的发展,企业上云已成为创新发展的关键。然而,企业上云面临诸多挑战,如复杂的应用依赖梳理、成本效益分析等。阿里云推出的云迁移中心(CMH)旨在解决这些问题,提供自动化的系统调研、规划、迁移和割接等功能,简化上云过程。CMH通过评估、准备、迁移和割接四个阶段,帮助企业高效完成数字化转型。未来,CMH将继续提升智能化水平,支持更多行业和复杂环境,助力企业轻松上云。