Spring与Akka的集成

简介: 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/53783936 概述       近年来随着Spark的火热,Spark本身使用的开发语言Scala、用到的分布式内存文件系统Tachyon(现已更名为Alluxio)以及基于Actor并发编程模型的Akka都引起了大家的注意。
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/53783936

概述

       近年来随着Spark的火热,Spark本身使用的开发语言Scala、用到的分布式内存文件系统Tachyon(现已更名为Alluxio)以及基于Actor并发编程模型的Akka都引起了大家的注意。了解过Akka或者Actor的人应该知道,这的确是一个很不错的框架,按照Akka官网的描述——使用Akka使得构建强有力的并发与分布式应用将更加容易。由于历史原因,很多Web系统在开发分布式服务时首先会选择RMI(Remote Method Invoke ,远程方法调用)、RPC(Remote Procedure Call Protocol,远程过程调用)或者使用JMS(Java Messaging Service,Java消息服务)。

       但是使用RMI只能使用java语言,而且开发、执行效率都不高;RPC框架虽然可以通过匹配方法签名的方式比RMI更灵活,但是其存在调用超时、调用丢失等缺点;JMS方式虽然可以通过At Least Delivery Once、消息持久化等机制保证消息不会丢失,但是只能作为一种跨服务的生产者、消费者编程模型使用。Akka不但处理了以上问题,而且还可以使用Actor作为并发编程模型,减少java多线程编程的阻塞、调度、上下文开销甚至死锁等问题。此外,Akka还提供了集群Sharding、流处理等功能的支持,更易于实现有限状态自动机等功能。所以有心的开发者势必会关心如何在最常见的Java系统中使用它,如何与Spring集成?

       本文参考Akka官方使用文档,根据自身的经验和理解,提供Akka与Spring集成的方案。本文不说明Spring框架的具体使用,并从Spring已经配置完备的情况开始叙述。

Actor系统——ActorSystem

       什么是ActorSystem?根据Akka官网的描述——ActorSystem是一个重量级的结构体,可以用于分配1到N个线程,所以每个应用都需要创建一个ActorSystem。通常而言,使用以下代码来创建ActorSystem。

ActorSystem system = ActorSystem.create("Hello");

不过对于接入Spring而言,由IOC(Inversion of Control,控制反转)方式会更接地气,你可以这样:

        <!-- AKKA System Setup -->
	<bean id="actorSystem" class="akka.actor.ActorSystem" factory-method="create" destroy-method="shutdown" scope="singleton">
	    <constructor-arg value="helloAkkaSystem"/>
	</bean>
然后在你需要的地方依赖注入即可。

Actor编程模型

       有关Actor编程模型的具体介绍可以看我的另一篇博文——《Spark如何使用Akka实现进程、节点通信的简明介绍》,里面有更多的介绍。需要补充的是,在最新的Scala官方网站上已经决定废弃Scala自身的Actor编程模型,转而全面拥抱Akka提供的Actor编程模型。

       我们可以通过以下代码(代码片段借用了Akka官网的例子)创建一个简单的Actor例子。

       Greeter是代表问候者的Actor:

public class Greeter extends UntypedActor {

  public static enum Msg {
    GREET, DONE;
  }

  @Override
  public void onReceive(Object msg) {
    if (msg == Msg.GREET) {
      System.out.println("Hello World!");
      getSender().tell(Msg.DONE, getSelf());
    } else
      unhandled(msg);
  }

}

一般情况下我们的Actor都需要继承自UntypedActor,并实现其onReceive方法。onReceive用于接收消息,你可以在其中实现对消息的匹配并做不同的处理。

HelloWorld是用于向Greeter发送问候消息的访客:

public class HelloWorld extends UntypedActor {

  @Override
  public void preStart() {
    // create the greeter actor
    final ActorRef greeter = getContext().actorOf(Props.create(Greeter.class), "greeter");
    // tell it to perform the greeting
    greeter.tell(Greeter.Msg.GREET, getSelf());
  }

  @Override
  public void onReceive(Object msg) {
    if (msg == Greeter.Msg.DONE) {
      // when the greeter is done, stop this actor and with it the application
      getContext().stop(getSelf());
    } else
      unhandled(msg);
  }
}

有了Actor之后,我们可以这样使用它:

ActorRef a = system.actorOf(Props.create(HelloWorld.class), "helloWorld");

       在HelloWorld的preStart实现中,获取了Greeter的ActorRef(Actor的引用)并向Greeter发送了问候的消息,Greeter收到问候消息后,会先打印Hello World!,然后向HelloWorld回复完成的消息,HelloWorld得知Greeter完成了向世界问好这个伟大的任务后,就结束了自己的生命。HelloWorld的例子用编程API的方式告诉了我们如何使用Actor及发送、接收消息。为了便于描述与Spring的集成,下面再介绍一个例子。

       CountingActor(代码主体借用自Akka官网)是用于计数的Actor,见代码清单1所示。

代码清单1

@Named("CountingActor")
@Scope("prototype")
public class CountingActor extends UntypedActor {

	public static class Count {
	}

	public static class Get {
	}

	// the service that will be automatically injected
	@Resource
	private CountingService countingService;

	private int count = 0;

	@Override
	public void onReceive(Object message) throws Exception {
		if (message instanceof Count) {
			count = countingService.increment(count);
		} else if (message instanceof Get) {
			getSender().tell(count, getSelf());
		} else {
			unhandled(message);
		}
	}
}

CountingActor用于接收Count消息进行计数,接收Get消息回复给发送者当前的计数值。CountingService是用于计数的接口,其定义如下:
public interface CountingService {
	
	/**
	 * 计数
	 * @param count
	 * @return
	 */
	int increment(int count);

}
CountingService的具体实现是CountingServiceImpl,其实现如下:
@Service("countingService")
public class CountingServiceImpl implements CountingService {

	private static Logger logger = LoggerFactory.getLogger(CountingServiceImpl.class);

	/*
	 * (non-Javadoc)
	 * 
	 * @see com.elong.sentosa.metadata.service.CountingService#increment(int)
	 */
	@Override
	public int increment(int count) {
		logger.info("increase " + count + "by 1.");
		return count + 1;
	}

}

CountingActor通过注解方式注入了CountingService,CountingActor的计数实际是由CountingService完成。
        细心的同学可能发现了CountingActor使用了注解Named,这里为什么没有使用@Service或者@Component等注解呢?由于Akka的Actor在初始化的时候必须使用System或者Context的工厂方法actorOf创建新的Actor实例,不能使用构造器来初始化,而使用Spring的Service或者Component注解,会导致使用构造器初始化Actor,所以会抛出以下异常:
akka.actor.ActorInitializationException: You cannot create an instance of [com.elong.metadata.akka.actor.CountingActor] explicitly using the constructor (new). You have to use one of the 'actorOf' factory methods to create a new actor. See the documentation.

如果我们不能使用@Service或者@Component,也不能使用XML配置的方式使用(与注解一个道理),那么我们如何使用CountingActor提供的服务呢?

IndirectActorProducer接口

        IndirectActorProducer是Akka提供的Actor生成接口,从其名字我们知道Akka给我们指出了另一条道路——石头大了绕着走!通过实现IndirectActorProducer接口我们可以定制一些Actor的生成方式,与Spring集成可以这样实现它,见代码清单2所示。

代码清单2

public class SpringActorProducer implements IndirectActorProducer {
	private final ApplicationContext applicationContext;
	private final String actorBeanName;
	private final Object[] args;

	public SpringActorProducer(ApplicationContext applicationContext, String actorBeanName, Object ... args) {
		this.applicationContext = applicationContext;
		this.actorBeanName = actorBeanName;
		this.args = args;
	}

	public Actor produce() {
		return (Actor) applicationContext.getBean(actorBeanName, args);
	}

	public Class<? extends Actor> actorClass() {
		return (Class<? extends Actor>) applicationContext.getType(actorBeanName);
	}
}

SpringActorProducer的实现主要借鉴了Akka官方文档,我这里对其作了一些扩展以便于支持构造器带有多个参数的情况。从其实现看到实际是利用了ApplicationContext提供的getBean方式实例化Actor。
       这里还有两个问题:一、ApplicationContext如何获取和设置?二、如何使用SpringActorProducer生成Spring需要的Actor实例?

       对于第一个问题,我们可以通过封装SpringActorProducer并实现ApplicationContextAware接口的方式获取ApplicationContext;对于第二个问题,我们知道Akka中的所有Actor实例都是以Props作为配置参数开始的,这里以SpringActorProducer为代理生成我们需要的Actor的Props。

       SpringExt实现了以上思路,见代码清单3所示。

代码清单3

@Component("springExt")
public class SpringExt implements Extension, ApplicationContextAware {

	private ApplicationContext applicationContext;

	/**
	 * Create a Props for the specified actorBeanName using the
	 * SpringActorProducer class.
	 *
	 * @param actorBeanName
	 *            The name of the actor bean to create Props for
	 * @return a Props that will create the named actor bean using Spring
	 */
	public Props props(String actorBeanName, Object ... args) {
		return Props.create(SpringActorProducer.class, applicationContext, actorBeanName, args);
	}

	public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
		this.applicationContext = applicationContext;
	}
}

应用例子

        经过了以上的铺垫,现在你可以使用创建好的CountingActor了,首先你需要在你的业务类中注入ActorSystem和SpringExt。

        @Autowired
	private ActorSystem actorSystem;

	@Autowired
	private SpringExt springExt;

然后我们使用CountingActor进行计数,代码如下:

	ActorRef counter = actorSystem.actorOf(springExt.props("CountingActor"), "counter");

	// Create the "actor-in-a-box"
        final Inbox inbox = Inbox.create(system);
		
	// tell it to count three times
        inbox.send(counter, new Count());
        inbox.send(counter, new Count());
        inbox.send(counter, new Count());

	// print the result
	FiniteDuration duration = FiniteDuration.create(3, TimeUnit.SECONDS);
	Future<Object> result = ask(counter, new Get(), Timeout.durationToTimeout(duration));
	try {
		System.out.println("Got back " + Await.result(result, duration));
	} catch (Exception e) {
		System.err.println("Failed getting result: " + e.getMessage());
		throw e;
	}

输出结果为:

Got back 3

总结

       本文只是最简单的Akka集成Spring的例子,Akka的remote、cluster、persistence、router等机制都可以应用。

其它Akka应用的博文如下:

  1. Spring与Akka的集成》;
  2. 使用Akka的远程调用》;
  3. 使用Akka构建集群(一)》;
  4. 使用Akka构建集群(二)》;
  5. 使用Akka持久化——持久化与快照》;
  6. 使用Akka持久化——消息发送与接收》;


后记:个人总结整理的《深入理解Spark:核心思想与源码分析》一书现在已经正式出版上市,目前京东、当当、天猫等网站均有销售,欢迎感兴趣的同学购买。


京东:http://item.jd.com/11846120.html 

当当:http://product.dangdang.com/23838168.html 



相关文章
|
1月前
|
Java Maven Docker
gitlab-ci 集成 k3s 部署spring boot 应用
gitlab-ci 集成 k3s 部署spring boot 应用
|
3月前
|
资源调度 Java 调度
Spring Cloud Alibaba 集成分布式定时任务调度功能
定时任务在企业应用中至关重要,常用于异步数据处理、自动化运维等场景。在单体应用中,利用Java的`java.util.Timer`或Spring的`@Scheduled`即可轻松实现。然而,进入微服务架构后,任务可能因多节点并发执行而重复。Spring Cloud Alibaba为此发布了Scheduling模块,提供轻量级、高可用的分布式定时任务解决方案,支持防重复执行、分片运行等功能,并可通过`spring-cloud-starter-alibaba-schedulerx`快速集成。用户可选择基于阿里云SchedulerX托管服务或采用本地开源方案(如ShedLock)
122 1
|
1月前
|
前端开发 Java 程序员
springboot 学习十五:Spring Boot 优雅的集成Swagger2、Knife4j
这篇文章是关于如何在Spring Boot项目中集成Swagger2和Knife4j来生成和美化API接口文档的详细教程。
84 1
|
1月前
|
存储 前端开发 Java
Spring Boot 集成 MinIO 与 KKFile 实现文件预览功能
本文详细介绍如何在Spring Boot项目中集成MinIO对象存储系统与KKFileView文件预览工具,实现文件上传及在线预览功能。首先搭建MinIO服务器,并在Spring Boot中配置MinIO SDK进行文件管理;接着通过KKFileView提供文件预览服务,最终实现文档管理系统的高效文件处理能力。
262 11
|
1月前
|
Java Spring
springboot 学习十一:Spring Boot 优雅的集成 Lombok
这篇文章是关于如何在Spring Boot项目中集成Lombok,以简化JavaBean的编写,避免冗余代码,并提供了相关的配置步骤和常用注解的介绍。
90 0
|
4月前
|
资源调度 Java 调度
Spring Cloud Alibaba 集成分布式定时任务调度功能
Spring Cloud Alibaba 发布了 Scheduling 任务调度模块 [#3732]提供了一套开源、轻量级、高可用的定时任务解决方案,帮助您快速开发微服务体系下的分布式定时任务。
14933 30
|
3月前
|
消息中间件 安全 Java
Spring Boot 基于 SCRAM 认证集成 Kafka 的详解
【8月更文挑战第4天】本文详解Spring Boot结合SCRAM认证集成Kafka的过程。SCRAM为Kafka提供安全身份验证。首先确认Kafka服务已启用SCRAM,并准备认证凭据。接着,在`pom.xml`添加`spring-kafka`依赖,并在`application.properties`中配置Kafka属性,包括SASL_SSL协议与SCRAM-SHA-256机制。创建生产者与消费者类以实现消息的发送与接收功能。最后,通过实际消息传递测试集成效果与认证机制的有效性。
136 4
|
3月前
|
人工智能 Java API
JeecgBoot 低代码平台快速集成 Spring AI
Spring 通过 Spring AI 项目正式启用了 AI(人工智能)生成提示功能。本文将带你了解如何在 Jeecg Boot 应用中集成生成式 AI,以及 Spring AI 如何与模型互动,包含 RAG 功能。
123 3
|
3月前
|
XML Java 数据库连接
Spring Boot集成MyBatis
主要系统的讲解了 Spring Boot 集成 MyBatis 的过程,分为基于 xml 形式和基于注解的形式来讲解,通过实际配置手把手讲解了 Spring Boot 中 MyBatis 的使用方式,并针对注解方式,讲解了常见的问题已经解决方式,有很强的实战意义。在实际项目中,建议根据实际情况来确定使用哪种方式,一般 xml 和注解都在用。
|
3月前
|
测试技术 Java Spring
Spring 框架中的测试之道:揭秘单元测试与集成测试的双重保障,你的应用真的安全了吗?
【8月更文挑战第31天】本文以问答形式深入探讨了Spring框架中的测试策略,包括单元测试与集成测试的有效编写方法,及其对提升代码质量和可靠性的重要性。通过具体示例,展示了如何使用`@MockBean`、`@SpringBootTest`等注解来进行服务和控制器的测试,同时介绍了Spring Boot提供的测试工具,如`@DataJpaTest`,以简化数据库测试流程。合理运用这些测试策略和工具,将助力开发者构建更为稳健的软件系统。
59 0

热门文章

最新文章