Akka事件驱动新选择

简介: 在高并发场景解决方案中,多从线程角度出发,以解决线程安全问题,锁范围又需要多业务场景考虑,何时上锁,何时解锁,何时自动过期等,而事件驱动是从执行什么操作驱动的,在软件系统的设计层面,两者关联性不大,一个强调安全,一个强调策略,那么有没有两者结合解决并发编程难的事件驱动解决方案呢?带着场景解决方案我们走进Akka。

在高并发场景解决方案中,多从线程角度出发,以解决线程安全问题,锁范围又需要多业务场景考虑,何时上锁,何时解锁,何时自动过期等,而事件驱动是从执行什么操作驱动的,在软件系统的设计层面,两者关联性不大,一个强调安全,一个强调策略,那么有没有两者结合解决并发编程难的事件驱动解决方案呢?带着场景解决方案我们走进Akka。

什么是Akka
官网:https://guobinhit.github.io/akka-guide/
image.png

添加描述

Akka 是一个用 Scala 编写的库,用于在 JVM 平台上简化编写具有可容错的、高可伸缩性的 Java 和 Scala 的 Actor 模型应用,其同时提供了Java 和 Scala 的开发接口。Akka 允许我们专注于满足业务需求,而不是编写初级代码。在 Akka 中,Actor 之间通信的唯一机制就是消息传递。Akka 对 Actor 模型的使用提供了一个抽象级别,使得编写正确的并发、并行和分布式系统更加容易。Actor 模型贯穿了整个 Akka 库,为我们提供了一致的理解和使用它们的方法。

事件驱动
image.png

Actor 模型 VS Reactor模型
以Netty的Reactor模型为例(redis同理),本身的Reactor模型即是从事件驱动(NeetyEventLoop)的设计模式,Netty从io角度出发,分发请求,以Reactor对象分发调用链接,结合线程池以此提高多线程高并发的吞吐量。

Actor模型

image.png

而Akka的Actor模型重在消息传递,但是第一个特性仍然是事件驱动模型。注意这个多次出现的词,说明Akka的侧重点在于事件驱动
事件驱动模型:Event-driven model,Actor 通过响应消息来执行工作。Actor 之间的通信是异步的,允许 Actor 发送消息并继续自己的工作,而不是阻塞等待响应。
强隔离原则:Strong isolation principles,与 Java 中的常规对象不同,Actor 在调用的方法方面,没有一个公共 API。相反,它的公共 API 是通过 Actor 处理的消息来定义的。这可以防止 Actor 之间共享状态;观察另一个 Actor 状态的唯一方法是向其发送请求状态的消息。
位置透明:Location transparency,系统通过工厂方法构造 Actor 并返回对实例的引用。因为位置无关紧要,所以 Actor 实例可以启动、停止、移动和重新启动,以向上和向下扩展以及从意外故障中恢复。
轻量级:Lightweight,每个实例只消耗几百个字节,这实际上允许数百万并发 Actor 存在于一个应用程序中。

image.png
第一个Akka的java程序
在官网下创建第一个Akkademo,点击网站-->create a project for me即可

https://developer.lightbend.com/start/?group=akka&project=akka-quickstart-java
Greet:向Greeter执行问候的指令;
Greeted:Greeter用来确认问候发生时回复的消息;
SayHello:GreeterMain开始执行问候进程的指令;

image.png

这样看的话不如直接进入test

@Test
public void testGreeterActorSendingOfGreeting() {
      //testKit为单元测试提前加入SpringBean而准备的对象     
      TestProbe<Greeter.Greeted> testProbe = testKit.createTestProbe();
      ActorRef<Greeter.Greet> underTest = testKit.spawn(Greeter.create(), "greeter");
      得到Greet问候(tell)回复消息对象
      underTest.tell(new Greeter.Greet("Charles", testProbe.getRef()));
      //发送消息
      testProbe.expectMessage(new Greeter.Greeted("Charles", underTest));
}
com.example.Greeter#createReceive在本类的方法中我们记录日志
@Override
  public Receive<Greet> createReceive() {
    log.info("在此接收.....");
    return newReceiveBuilder().onMessage(Greet.class, this::onGreet).build();
  }
  
 private Behavior<Greet> onGreet(Greet command) {
    getContext().getLog().info("Hello {}!", command.whom);
    //#greeter-send-message
    command.replyTo.tell(new Greeted(command.whom, getContext().getSelf()));
    //#greeter-send-message
    return this;
  }
  
控制台输出
[2023-03-07 14:49:45,909] [INFO] [akka.event.slf4j.Slf4jLogger] [AkkaQuickstartTest-akka.actor.default-dispatcher-3] [] - Slf4jLogger started
[2023-03-07 14:49:46,119] [INFO] [com.example.Greeter] [AkkaQuickstartTest-akka.actor.default-dispatcher-5] [] - 在此接收.....
[2023-03-07 14:49:46,123] [INFO] [com.example.Greeter] [AkkaQuickstartTest-akka.actor.default-dispatcher-5] [akka://AkkaQuickstartTest/user/greeter] - Hello Charles!
[2023-03-07 14:49:46,226] [INFO] [akka.actor.CoordinatedShutdown] [AkkaQuickstartTest-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://AkkaQuickstartTest)] - Running CoordinatedShutdown with reason [ActorSystemTerminateReason]
Disconnected from the target VM, address: '127.0.0.1:64809', transport: 'socket'

这个消息的发送接收,由消息接收方去处理业务逻辑的方式,与MQ中间件思路无疑,而我们应用Akka时应该考虑到应用场景,既然应用于事件驱动,那么其处理的消息必须要考虑到非实时性场景的。即最终一致性的消息数据适配,如集成到Springboot应考虑到接收的Actro是多例的
集成Springboot
1.引入依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-test</artifactId>
   <scope>test</scope>
</dependency>
<dependency>
   <groupId>com.typesafe.akka</groupId>
   <artifactId>akka-slf4j_2.11</artifactId>
   <version>2.5.16</version>
</dependency>

2.Actor生产者

public class ActorProducer implements IndirectActorProducer {
    private ApplicationContext context;
    private String beanName;

    public ActorProducer(ApplicationContext context,String beanName){
        this.context=context;
        this.beanName=beanName;
    }

    @Override
    public Actor produce() {
        return (Actor) context.getBean(beanName);
    }

    @Override
    public Class<? extends Actor> actorClass() {
        return (Class<? extends Actor>) context.getType(beanName);
    }
}

3.构造Props创建ActorRef

public class SpringExt implements Extension {
    private ApplicationContext context;

    public void init(ApplicationContext context) {
        System.out.println("applicationContext初始化...");
        this.context = context;
    }
 
    public Props create(String beanName) {
        return Props.create(ActorProducer.class, this.context, beanName);
    }
}

4.创建Provider继承AbstractExtensionId


public class SpringExtProvider extends AbstractExtensionId<SpringExt> {
    private static SpringExtProvider provider = new SpringExtProvider();
 
    public static SpringExtProvider getInstance() {
        return provider;
    }
 
    @Override
    public SpringExt createExtension(ExtendedActorSystem extendedActorSystem) {
        return new SpringExt();
    }
}

5.初始化ActorSystem

@Configuration
public class ScanConfig {
    private final ApplicationContext context;
 
    @Autowired
    public ScanConfig(ApplicationContext context) {
        this.context = context;
    }
 
    @Bean
    public ActorSystem createSystem() {
        ActorSystem system = ActorSystem.create("system");
        SpringExtProvider.getInstance().get(system).init(context);
        return system;
    }
}

6.消息的接收者

@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class TestActor extends AbstractActor {
    @Override
    public Receive createReceive() {
        return receiveBuilder().matchAny(o -> {
            System.out.println("接受到消息:" + o);
        }).build();
    }
}

@SpringBootTest
@RunWith(SpringRunner.class)
public class LeetcodeApplicationTests {

    @Autowired
    private ActorSystem actorSystem;

    @Test
    public void contextLoads() {
        ActorRef ref = actorSystem.actorOf(SpringExtProvider.getInstance().get(actorSystem).create("testActor"), "testActor");
        ref.tell("Hello,Akka", ActorRef.noSender());
    }  
}

image.png
image.png

End to End Argument
在End to End Argument论文中指出:

端到端的可靠通信,只能通过通信两端的application层来保证,而中间件(比如SQS, Kinesis, ActiveMQ, 到更底层Netty乃至TCP)只能提高效率,而无法保证通信的可靠性
这里将消息传递的安全保证提升到不可企及的高度,当消息接收到ack之后,仍不能保证此条消息的读取者为本人,为了保证消息为本人接收,还需加入令牌/口令来实现密文的反编译。只要中间件都做不到可靠通信,如果我自己理解的有问题,那么对于中间件来说就失去了原本的意义,中间件本身也不是解决可靠性问题,主要解决的是分布式环境下数据传输、数据访问、应用调度、系统构建和系统集成、流程管理等问题。从全局可靠性来考虑,就要从消息的发送,收,传递,确认等流程来确认,从业务角度出发,而不是强调中间件的不可靠性,而且万事都有不可确定性,如果真的提升到如此角度,那么确实不用开发了。

目录
相关文章
|
存储 人工智能 算法
云计算的第三次浪潮:人工智能与云计算的融合
在2023年的云栖大会主论坛上,中国工程院院士、阿里云创始人王坚发表了题为《云计算的第三次浪潮》的主题演讲,他强调了人工智能与云计算的结合带来的重大影响,并认为这将引发云计算的第三次浪潮。云计算的第三次浪潮正在悄然兴起,其与人工智能的结合引发了前所未有的技术革命。那么本文将聊聊2023年人工智能和云计算的集中体现和爆发,以及云计算与GPT模型的关系。
2248 47
云计算的第三次浪潮:人工智能与云计算的融合
|
数据采集 机器学习/深度学习 自然语言处理
Masked Language Modeling,MLM
Masked Language Modeling(MLM)是一种预训练语言模型的方法,通过在输入文本中随机掩盖一些单词或标记,并要求模型预测这些掩盖的单词或标记。MLM 的主要目的是训练模型来学习上下文信息,以便在预测掩盖的单词或标记时提高准确性。
1512 1
|
存储 安全 算法
使用jotp实现双因子验证
扫盲使用totp增强身份安全性指南,原理看懂也不用自己造轮子呀,最讨厌哪些啥也不懂的搬运工,我这里给大家解惑吧
1389 0
|
安全 JavaScript 前端开发
阿里云先知安全沙龙(西安站) ——浅谈XSS漏洞挖掘与构造思路
本文介绍了DOM-XSS构造、运算符的威力和模板字符串妙用三个主题。通过多个实例图解,详细展示了如何利用DOM特性构造XSS攻击、JavaScript运算符在代码中的巧妙应用,以及模板字符串在开发中的灵活运用。这些内容对提升Web安全意识和编程技巧具有重要参考价值。
|
12月前
|
人工智能
替代你的不是AI,而是会使用AI的人——生成式人工智能(GAI)认证成为职场新宠
在AI技术飞速发展的数字化时代,职场环境正经历深刻变革。生成式人工智能(GAI)认证的出现,为职场人士提供了提升自我、证明能力的新途径。由培生推出的GAI认证,涵盖核心技能与伦理知识,助力求职者脱颖而出。它不仅是职场晋升的加速器,还为企业认可的专业能力背书。拥抱AI、学习AI,通过GAI认证,让自己成为掌握AI技术的领先者,在竞争中保持优势。
|
SQL 监控 关系型数据库
PostgreSQL普通表转换成分区表
如何使用pg_rewrite扩展将普遍表转换成分区表
|
Java 数据格式
Java“EOFException”解决
Java中的“EOFException”通常在读取文件或网络流时遇到意外的文件结束符时抛出。解决方法包括检查输入源是否为空、确保数据格式正确以及增加异常处理逻辑。
1628 3
|
Web App开发 数据可视化 前端开发
Pixi入门第一章:绘制一个小精灵
这篇文章是关于Pixi.js的入门教程第一部分,指导读者如何创建并显示一个基本的2D精灵,适用于开始学习Pixi.js进行2D图形开发的初学者。
499 0
Pixi入门第一章:绘制一个小精灵
|
关系型数据库 MySQL 数据库
实时计算 Flink版操作报错之遇到报错org.postgresql.util.psqlexception: The connection attempt failed.,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
消息中间件 Kubernetes Kafka
实时计算 Flink版操作报错合集之在Rancher K8s部署时,TaskManager无法正常连接到其他TaskManager,该如何处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。

热门文章

最新文章