快速开发基于Yarn的分布式应用

简介: MYC 主要源码来自于 Spark的yarn模块。通过对其进行改造,使得其后续可以作为一个通用的Yarn项目开发框架。目前MYC还比较简单,但不影响你基于它非常快的构建出一个基于Yarn的分布式应用。
mammuthus-yarn-client使得基于Yarn开发分布式应用变得非常容易,基本蔽掉了Yarn的API。我们后面会把mammuthus-yarn-client 缩写为MYC。 
基于该库,我们实现了一个容器调度引擎,可以作为MYC的示例程序。项目见 mammuthus-dynamic-deploy

概述

MYC 主要源码来自于 Spark的yarn模块。通过对其进行改造,使得其后续可以作为一个通用的Yarn项目开发框架。目前MYC还比较简单,但不影响你基于它非常快的构建出一个基于Yarn的分布式应用。

基于MYC 开发基本原理

基于Yarn开发的应用大体是一个典型Master-Slaves结构。其中Master 在Yarn中称为 ApplicationMaster. ApplicationMaster代表应用和ResourceManager进行交互。在MYC中提供了一个公用的ApplicationMaster实现,用户的Master则在在AM中的一个单独线程中被启动,启动完成后AM会获取到Master的地址和端口,然后将这些信息传递给应用对象的Slave,Slave从而根据这些信息和Master获取联系。
通常Slave 和 Master的通讯方式有两种:
  1. 通过一个中间组件进行通讯,比如通过zookeeper来完成。
  2. Slave 主动向Master发送心跳,Master维持关系
前面我们提到,Master被AM启动后,AM会获取到Master的地址和端口,这就需要一种途径传递这些信息。MYC定义了一些接口,只要Master/Slave分别实现这些接口,就能完成这些基础功能。

实现Master

为了能够和你的Master进行交互,需要保证你的Master启动类实现
mammuthus.yarn.MammuthusApplication
接口规范。该类具体待实现方法如下:
trait MammuthusApplication {

  def masterPort: Int //应用开启的端口

  def uiAddress: String // Master的管理界面地址

  /*
    args 来自于--args 参数(启动命令中附件的参数)。MYC 会通过args传递给你。 
    maps 参数主要为了后续进一步和MYC进行交互,譬如你可以将一些参数通过maps传递给MYC
  */
  def run(args: Array[String], maps: java.util.Map[String, String]):Unit
}
  1. 提供你的端口号
  2. 管理页面地址
  3. 在run方法里初始化你的应用
实例代码如下来自DynamicDeployMaster.scala:
//启动ServiceFramework框架
object DynamicDeployMaster {
  var  mammuthusMasterClient:TaskService = _
  def main(args: Array[String]): Unit = {
    ServiceFramwork.scanService.setLoader(classOf[DynamicDeployMaster])
    ServiceFramwork.applicaionYamlName("application.master.yml")
    ServiceFramwork.disableDubbo()
    ServiceFramwork.enableNoThreadJoin()
    Application.main(args)
  }
}

//实现MammuthusApplication 接口,保证AM能够拿到端口,UI地址等
class DynamicDeployMaster extends MammuthusApplication {

  var httpPort: Int = 0
  var _uiAddress = ""
  var mammuthusMasterAddress:String = _

  override def masterPort: Int = httpPort

  override def uiAddress: String = _uiAddress

  override def run(args: Array[String], maps: java.util.Map[String, String]): Unit = {
    DynamicDeployMaster.main(args)

    val hp = ServiceFramwork.injector.getInstance(classOf[HttpServer])
    httpPort = hp.getHttpPort
    val containerService = ServiceFramwork.injector.getInstance(classOf[DynamicMasterInfoService])
    containerService.imageUrl = args(0)
    containerService.location = args(1)
    containerService.startCommand = args(2)
    mammuthusMasterAddress = args(3)

    maps.put("httpPort", masterPort + "")
    maps.put("uiAddress", s"http://${InetAddress.getLocalHost.getHostName}:${masterPort}")

    _uiAddress = maps.get("uiAddress")

    Thread.currentThread().join()
  }


  private def driverHostAndPort = {
    val uri = URI.create(mammuthusMasterAddress)
    uri.getHost + ":" + uri.getPort
  }
}
通过该类完成一个常驻程序。

实现你的Slave

同样的,为了能够让MYC能够启动你的Slave程序,你需要遵循一些接口规范。你的启动类需要实现如下接口:
mammuthus.yarn.ExecutorBackend
你的Slave启动类会得到一个如下的对象:
case class ExecutorBackendParam(driverUrl: String,
                                slaveClass: String,
                                executorId: String,
                                hostname: String,
                                cores: Int,
                                executeMemory: String,
                                appId: String,
                                userClassPath: mutable.ListBuffer[URL]
                                 )
具体例子如下 DynamicDeploySlave.scala:
//启动ServiceFramework框架
object DynamicDeploySlave extends ExecutorBackend {
  var applicationMasterArguments: ExecutorBackendParam = null

  def main(args: Array[String]) = {
    applicationMasterArguments = parse(args)
    ServiceFramwork.applicaionYamlName("application.slave.yml")
    ServiceFramwork.scanService.setLoader(classOf[DynamicDeploySlave])
    ServiceFramwork.disableDubbo()
    ServiceFramwork.registerStartWithSystemServices(classOf[HeartBeatService])
    Application.main(args)
  }
}
class DynamicDeploySlave

运行方式

完成你的应用开发后,package成一个jar包,然后就可以通过一个java命令完成向Yarn集群提交任务的步骤了。
参考 mammuthus-yarn-docker-scheduler项目的说明,大体来说如下:
java -cp /Users/allwefantasy/CSDNWorkSpace/mammuthus-yarn-client/target/mammuthus-yarn-client-1.0-SNAPSHOT-SHADED.jar mammuthus.yarn.Client \
--jar /Users/allwefantasy/CSDNWorkSpace/mammuthus-dynamic-deploy/target/mammuthus-dynamic-deploy-1.0-SNAPSHOT-jar-with-dependencies.jar \ 
--driver-memory 256m \
--num-executors 2 \
--executor-memory 512m  \
--class mammuthus.deploy.dynamic.DynamicDeployMaster \
--slave-class mammuthus.deploy.dynamic.DynamicDeploySlave \
--arg "http://appstore/DCS@1.0.tar.gz" \
--arg "/tmp/DCS-dev@1.0.tar.gz" \
--arg "./app.sh launch_dcs.sh start" \
--arg "http://appstore"
其中
--jar,应用程序jar包地址
--dirver-memory, master 数量
--num-executors,Slave数量
--executor-memory, Slave 内存大小
--class, Master 启动类
--slave-class , Slave 启动类 
--args 则是你应用需要接受的参数。
目录
相关文章
|
1月前
|
人工智能 文字识别 Java
SpringCloud+Python 混合微服务,如何打造AI分布式业务应用的技术底层?
尼恩,一位拥有20年架构经验的老架构师,通过其深厚的架构功力,成功指导了一位9年经验的网易工程师转型为大模型架构师,薪资逆涨50%,年薪近80W。尼恩的指导不仅帮助这位工程师在一年内成为大模型架构师,还让他管理起了10人团队,产品成功应用于多家大中型企业。尼恩因此决定编写《LLM大模型学习圣经》系列,帮助更多人掌握大模型架构,实现职业跃迁。该系列包括《从0到1吃透Transformer技术底座》、《从0到1精通RAG架构》等,旨在系统化、体系化地讲解大模型技术,助力读者实现“offer直提”。此外,尼恩还分享了多个技术圣经,如《NIO圣经》、《Docker圣经》等,帮助读者深入理解核心技术。
SpringCloud+Python 混合微服务,如何打造AI分布式业务应用的技术底层?
|
1月前
|
NoSQL Java Redis
开发实战:使用Redisson实现分布式延时消息,订单30分钟关闭的另外一种实现!
本文详细介绍了 Redisson 延迟队列(DelayedQueue)的实现原理,包括基本使用、内部数据结构、基本流程、发送和获取延时消息以及初始化延时队列等内容。文章通过代码示例和流程图,逐步解析了延迟消息的发送、接收及处理机制,帮助读者深入了解 Redisson 延迟队列的工作原理。
|
2月前
|
存储 NoSQL Java
分布式session-SpringSession的应用
Spring Session 提供了一种创建和管理 Servlet HttpSession 的方案,默认使用外置 Redis 存储 Session 数据,解决了 Session 共享问题。其特性包括:API 及实现用于管理用户会话、以应用容器中性方式替换 HttpSession、简化集群会话支持、管理单个浏览器实例中的多个用户会话以及通过 headers 提供会话 ID 以使用 RESTful API。Spring Session 通过 SessionRepositoryFilter 实现,拦截请求并转换 request 和 response 对象,从而实现 Session 的创建与管理。
分布式session-SpringSession的应用
|
2月前
|
存储 NoSQL Java
分布式session-SpringSession的应用
Spring Session 提供了一种创建和管理 Servlet HttpSession 的方案,默认使用外置 Redis 存储 Session 数据,解决 Session 共享问题。其主要特性包括:提供 API 和实现来管理用户会话,以中立方式替换应用程序容器中的 HttpSession,简化集群会话支持,并在单个浏览器实例中管理多个用户会话。此外,Spring Session 允许通过 headers 提供会话 ID 以使用 RESTful API。结合 Spring Boot 使用时,可通过配置 Redis 依赖和支持缓存的依赖实现 Session 共享。
分布式session-SpringSession的应用
|
1月前
|
缓存 网络协议 API
分布式系统应用之服务发现!
分布式系统应用之服务发现!
|
2月前
|
存储 运维 应用服务中间件
阿里云分布式存储应用示例
通过阿里云EDAS,您可以轻松部署与管理微服务应用。创建应用时,使用`CreateApplication`接口基于模板生成新应用,并获得包含应用ID在内的成功响应。随后,利用`DeployApplication`接口将应用部署至云端,返回"Success"确认部署成功。当业务调整需下线应用时,调用`ReleaseApplication`接口释放资源。阿里云EDAS简化了应用全生命周期管理,提升了运维效率与可靠性。[相关链接]提供了详细的操作与返回参数说明。
|
2月前
|
Dubbo Java 应用服务中间件
分布式(基础)-RMI简单的应用
分布式(基础)-RMI简单的应用
|
3月前
|
机器学习/深度学习 分布式计算 PyTorch
大规模数据集管理:DataLoader在分布式环境中的应用
【8月更文第29天】随着大数据时代的到来,如何高效地处理和利用大规模数据集成为了许多领域面临的关键挑战之一。本文将探讨如何在分布式环境中使用`DataLoader`来优化大规模数据集的管理与加载过程,并通过具体的代码示例展示其实现方法。
180 1
|
3月前
|
运维 安全 Cloud Native
核心系统转型问题之保障云原生分布式转型中的基础设施和应用层面如何解决
核心系统转型问题之保障云原生分布式转型中的基础设施和应用层面如何解决
|
3月前
|
Kubernetes 安全 云计算
分布式应用的终极革命:Distributionless,告别分布式烦恼!
【8月更文挑战第8天】探讨分布式应用的进化形态——Distributionless,一种使开发者聚焦业务逻辑而非系统细节的理念。借助容器化、云计算与自动化工具的进步,分布式应用的开发与管理变得简易。透过示例展现了使用Bazel构建及Kubernetes部署的流程,预示着Distributionless模式下的应用将更加高效、可靠与安全,引领未来分布式应用的发展趋势。
56 7