Spark REPL

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
实时数仓Hologres,5000CU*H 100GB 3个月
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: Spark REPL

一.Scala REPL

scala repl("Read-Evaluate-Print-Loop") 是一个交互式命令行解释器,它提供了一个测试scala代码的环境。ILoop和IMain是其核心实现。

属性

有用的REPL功能包括:
REPL的IMain绑定到$intp。
REPL的最后一个异常绑定到lastException。
使用标签完成。
用于//print<tab>显示键入的重复标记。
使用:help的命令列表。
用于:load加载REPL输入文件。
用于:paste输入类和对象作为同伴。
用于:paste -raw禁用代码包装,定义程序包。
用于:javap检查类工件。
用于使用-Yrepl-outdir外部工具检查类工件。
用于:power进入超级模式并导入编译器组件。
用于:settings修改编译器设置;一些设置要求:replay。
用于:replay以修改后的设置重播会话。

示例

import scala.tools.nsc.Settings
import scala.tools.nsc.interpreter._
import javax.script._
/** A simple example showing programmatic usage of the REPL. */
object Main extends App {
  // the REPL has some support for javax.script
  val scripter = new ScriptEngineManager().getEngineByName("scala")
  scripter.eval("""println("hello, world")""")
  // compiler settings
  val settings = new Settings
  settings.processArgumentString("-deprecation -feature -Xfatal-warnings -Xlint")
  // the interpreter is used by the javax.script engine
  val intp = new IMain(settings)
  def interpret(code: String): Unit = {
    import Results._
    val res = intp.interpret(code) match {
      case Success => "OK!"
      case _       => "Sorry, try again."
    }
    println(res)
  }
  interpret("""println("hello, world")""")
  interpret("""println(""")
  interpret("""val who = "world" ; println("hello, $who")""")
  // the REPL uses a line reader and an interpreter interactively
  val interactive = new ILoop()
  interactive.process(settings)
  // input to the REPL can be provided programmatically
  import java.io.{BufferedReader, StringReader, PrintWriter}
  val reader = new BufferedReader(new StringReader(""""hello, world""""))
  val canned = new ILoop(reader, new PrintWriter(Console.out, true))
  canned.process(settings)
  // more canning
  val code = """println("hello, world") ; 42"""
  val out = ILoop.run(code)
  println(s"Output is $out")
}

源码

ILoop为Interpreter类(用于翻译scala代码)提供了read(读代码)-eval(运行代码)-print(打印代码)的循环环境。IMain是用于将遵守scala代码规范的字符串翻译成可运行的字节码,它的子类就是前文所说的Interpreter类,scala在2.12版本中给出了IMain的默认实现ILoopInterpreter。本文重点介绍ILoop,不介绍IMain及其子类的实现。

/** The Scala interactive shell.  It provides a read-eval-print loop
 *  around the Interpreter class.
 *  scala交互shell,它为Interpreter类提供了一个read-eval-print的循环环境
 *  
 *  After instantiation, clients should call the main() method.
 *  在scala shell通过调用main()方法开始提供服务
 *  
 *  If no in0 is specified, then input will come from the console, and
 *  the class will attempt to provide input editing feature such as
 *  input history.
 *  默认情况下input是console, scala shell 并提供了许多方便的功能,比如 历史输入等
 *
 */
class ILoop(in0: Option[BufferedReader], protected val out: JPrintWriter)
                extends AnyRef
                   with LoopCommands
{
  def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out)
  def this() = this(None, new JPrintWriter(Console.out, true))
  @deprecated("Use `intp` instead.", "2.9.0") def interpreter = intp
  @deprecated("Use `intp` instead.", "2.9.0") def interpreter_= (i: Interpreter): Unit = intp = i
  // 默认输入是 console
  var in: InteractiveReader = _   // the input stream from which commands come
  // 配置
  var settings: Settings = _
  // 翻译器
  var intp: IMain = _

  /** Standard commands **/
  // scala shell和spark shell 中可以使用的命令
  lazy val standardCommands = List(
    cmd("edit", "<id>|<line>", "edit history", editCommand),
    cmd("help", "[command]", "print this summary or command-specific help", helpCommand),
    historyCommand,
    cmd("h?", "<string>", "search the history", searchHistory),
    cmd("imports", "[name name ...]", "show import history, identifying sources of names", importsCommand),
    cmd("implicits", "[-v]", "show the implicits in scope", intp.implicitsCommand),
    cmd("javap", "<path|class>", "disassemble a file or class name", javapCommand),
    cmd("line", "<id>|<line>", "place line(s) at the end of history", lineCommand),
    cmd("load", "<path>", "interpret lines in a file", loadCommand),
    cmd("paste", "[-raw] [path]", "enter paste mode or paste a file", pasteCommand),
    nullary("power", "enable power user mode", powerCmd),
    nullary("quit", "exit the interpreter", () => Result(keepRunning = false, None)),
    cmd("replay", "[options]", "reset the repl and replay all previous commands", replayCommand),
    cmd("require", "<path>", "add a jar to the classpath", require),
    cmd("reset", "[options]", "reset the repl to its initial state, forgetting all session entries", resetCommand),
    cmd("save", "<path>", "save replayable session to a file", saveCommand),
    shCommand,
    cmd("settings", "<options>", "update compiler options, if possible; see reset", changeSettings),
    nullary("silent", "disable/enable automatic printing of results", verbosity),
    cmd("type", "[-v] <expr>", "display the type of an expression without evaluating it", typeCommand),
    cmd("kind", "[-v] <expr>", "display the kind of expression's type", kindCommand),
    nullary("warnings", "show the suppressed warnings from the most recent line which had any", warningsCommand)
  )

  /** Create a new interpreter. */
  def createInterpreter() {
    if (addedClasspath != "")
      settings.classpath append addedClasspath
    // 官方实现的IMain
    intp = new ILoopInterpreter
  }

   // start an interpreter with the given settings
   // org.apache.spark.repl.Main中最后调用的 SparkILoop.process()的具体实现
  def process(settings: Settings): Boolean = savingContextLoader {
    this.settings = settings
    // 创建Interpreter , spark 重写了这部分,加入了初始化spark环境的操作
    createInterpreter()
    // sets in to some kind of reader depending on environmental cues
    in = in0.fold(chooseReader(settings))(r => SimpleReader(r, out, interactive = true))
    globalFuture = future {
      intp.initializeSynchronous()
      loopPostInit()
      !intp.reporter.hasErrors
    }
    // 使用load 命令加载文件,主要是外部jar包
    loadFiles(settings)
    // 打印我们熟悉的欢迎界面
    printWelcome()
    // 进入 read-eval-print循环
    try loop() match {
      case LineResults.EOF => out print Properties.shellInterruptedString
      case _               =>
    }
    catch AbstractOrMissingHandler()
    finally closeInterpreter()
    true
  }

   /** The main read-eval-print loop for the repl.  It calls
   *  command() for each line of input, and stops when
   *  command() returns false.
   */
  @tailrec final def loop(): LineResult = {
    import LineResults._
    // 读取1行输入
    readOneLine() match {
      case null => EOF
      // 运行 processLine() , 没错 继续调用loop循环
      case line => if (try processLine(line) catch crashRecovery) loop() else ERR
    }
  }
 }

从核心代码可以明确感受到read-eval-print这个循环过程,使用readOneLine()从输入中读取指令,processLine(line)运行指令并打印输出然后在循环。

二.Spark REPL

spark-shell提供了一种交互式使用spark的方式。用户只要运行./bin/spark-shell就可以进入spark-shell中,在该环境中系统默认为用户创建了一个SparkContext,用户可以直接使用scala语言控制这个sc。下面我们先看下spark-shell。

....
"${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
....

可以看出spark-shell其实是用spark-submit提交了一个任务,这个任务的主函数是org.apache.spark.repl.Main,它在名为spark-repl的module中。

object Main extends Logging {
  ………………
  val conf = new SparkConf()
  var sparkContext: SparkContext = _
  var sparkSession: SparkSession = _
  // this is a public var because tests reset it.
  var interp: SparkILoop = _
  def main(args: Array[String]) {
    doMain(args, new SparkILoop)
  }
  // Visible for testing
  private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = {
  // 核心  
  interp = _interp
  // 设置classpath
    val jars = Utils.getLocalUserJarsForShell(conf)
      // Remove file:///, file:// or file:/ scheme if exists for each jar
      .map { x => if (x.startsWith("file:")) new File(new URI(x)).getPath else x }
      .mkString(File.pathSeparator)
  // 拼接启动参数
    val interpArguments = List(
      "-Yrepl-class-based",
      "-Yrepl-outdir", s"${outputDir.getAbsolutePath}",
      "-classpath", jars
    ) ++ args.toList
  // scala-shell通用参数 拼接用户自定义参数
    val settings = new GenericRunnerSettings(scalaOptionError)
    settings.processArguments(interpArguments, true)
    if (!hasErrors) {
      // 启动 scala-repl
      interp.process(settings) // Repl starts and goes in loop of R.E.P.L
      Option(sparkContext).foreach(_.stop)
    }
  }
  // 创建sparkSession sparkContext
  def createSparkSession(): SparkSession = {
    ………………
  }
  …………
}

从源码可以看出启动spark-shell时主要就是创建并启动了一个SparkILoop,启动时设置了spark所需要的classpath。下面我们来看下SparkILoop。

/**
 *  A Spark-specific interactive shell.
 */
class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
    extends ILoop(in0, out) {
  // sparkILoop主要是继承了scala的ILoop,它需要1个输入流,输出流,使用控制台输出 输入可以
  def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out)
  def this() = this(None, new JPrintWriter(Console.out, true))
  // 初始化spark时运行的代码
  val initializationCommands: Seq[String] = Seq(
    """
    @transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) {
        org.apache.spark.repl.Main.sparkSession
      } else {
        org.apache.spark.repl.Main.createSparkSession()
      }
    @transient val sc = {
      val _sc = spark.sparkContext
      if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
        val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null)
        if (proxyUrl != null) {
          println(
            s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}")
        } else {
          println(s"Spark Context Web UI is available at Spark Master Public URL")
        }
      } else {
        _sc.uiWebUrl.foreach {
          webUrl => println(s"Spark context Web UI available at ${webUrl}")
        }
      }
      println("Spark context available as 'sc' " +
        s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
      println("Spark session available as 'spark'.")
      _sc
    }
    """,
    "import org.apache.spark.SparkContext._",
    "import spark.implicits._",
    "import spark.sql",
    "import org.apache.spark.sql.functions._"
  )
  // 使用command函数逐条运行 initializationCommands中的代码
  def initializeSpark() {
    intp.beQuietDuring {
      savingReplayStack { // remove the commands from session history.
        initializationCommands.foreach(command)
      }
    }
  }
  /** Print a welcome message */
  // spark 启动时打印的欢迎语
  override def printWelcome() {
    import org.apache.spark.SPARK_VERSION
    echo("""Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version %s
      /_/
         """.format(SPARK_VERSION))
    val welcomeMsg = "Using Scala %s (%s, Java %s)".format(
      versionString, javaVmName, javaVersion)
    echo(welcomeMsg)
    echo("Type in expressions to have them evaluated.")
    echo("Type :help for more information.")
  }
  /** Available commands */
  //  shell中可以使用的命令,spark直接借用了 scala中默认的command,比如hlep edit imports等
  override def commands: List[LoopCommand] = standardCommands
  /**
   * We override `createInterpreter` because we need to initialize Spark *before* the REPL
   * sees any files, so that the Spark context is visible in those files. This is a bit of a
   * hack, but there isn't another hook available to us at this point.
   */
   // 覆盖ILoop的 createInterpreter, 在调用ILoop.createInterpreter后立马调用initializeSpark()初始化spark环境
  override def createInterpreter(): Unit = {
    super.createInterpreter()
    initializeSpark()
  }

  override def resetCommand(line: String): Unit = {
    super.resetCommand(line)
    initializeSpark()
    echo("Note that after :reset, state of SparkSession and SparkContext is unchanged.")
  }
  override def replay(): Unit = {
    initializeSpark()
    super.replay()
  }
}
// 定义1个static方法 run(code,sets) 应该用于使用sparkILoop运行1次输入的代码
object SparkILoop {
  /**
   * Creates an interpreter loop with default settings and feeds
   * the given code to it as input.
   */
  def run(code: String, sets: Settings = new Settings): String = {
    import java.io.{ BufferedReader, StringReader, OutputStreamWriter }
    stringFromStream { ostream =>
      Console.withOut(ostream) {
        val input = new BufferedReader(new StringReader(code))
        val output = new JPrintWriter(new OutputStreamWriter(ostream), true)
        val repl = new SparkILoop(input, output)
        if (sets.classpath.isDefault) {
          sets.classpath.value = sys.props("java.class.path")
        }
        //等价于 repl.process(sets)
        repl process sets
      }
    }
  }
  def run(lines: List[String]): String = run(lines.map(_ + "\n").mkString)
}

根据sparkILoop的原码我们可以看出,它主要工作就是继承ILoop,定义ILoop初始化时需要运行的代码。这些代码保存在initializationCommands中,主要用于创建spark运行环境。

代码调用org.apache.spark.repl.Main.createSparkSession()创建sparkSession和sparkContext,并将sparkContext赋值给变量sc,这样用户就可以spark-shell中直接使用了。

同时代码还引入了spark中常用的隐式转化,spark sql常用的方法。由于scala在2.12版本只有提供了默认的Interpreter,所在原码中createInterpreter()直接调用父类的对应方法,而在之前的版本,createInterpreter()创建的是spark自己实现的SparkILoopInterpreter。

三.总结

  • spark-shell实际是使用spark-submit运行org.apache.spark.repl.Main。
  • 该Main主要是就是初始化SparkILoop并调用其process()方法。
  • SparkILoop主要是继承scala的ILoop,并在指定初始化时运行的代码,这些代码主要是创建sparkContext。
  • scala ILoop为Interpreter类提供了一个read-eval-print的循环环境。
  • 后续准备将SparkILoop的输入输出流与HTTP对接,实现一个可以与spark直接交互的web界面。

    示例

    import java.io.{BufferedOutputStream, File, FileOutputStream}
    import org.apache.spark.SparkConf
    import org.apache.spark.repl.SparkILoop
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.util.{SparkUtils, Utils}
    import org.slf4j.LoggerFactory
    import scala.concurrent.Future
    import scala.tools.nsc.GenericRunnerSettings
    import scala.tools.nsc.interpreter.{ILoop, Results}
    /**
    * @Classname SparkILoopStream
    * @Description TODO
    * @Date 2020/11/17 17:11
    * @Created by limeng
    */
    object SparkILoopStream {
    protected lazy implicit val logger = LoggerFactory.getLogger(getClass)
    var sparkILoop: ILoop = _
    def main(args: Array[String]): Unit = {
      val execUri = System.getenv("SPARK_EXECUTOR_URI")
      val conf = new SparkConf()
      val outputDir = createOutputDir(conf)
      val sparkJars = conf.getOption("spark.jars")
      val master = conf.getOption("spark.master").getOrElse("yarn")
      conf.setMaster(master)
      val jars = if (conf.get("spark.master").contains("yarn")) {
        val yarnJars = conf.getOption("spark.yarn.dist.jars")
        SparkUtils.unionFileLists(sparkJars, yarnJars).toSeq
      } else {
        sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
      }
      if(outputDir != null) {
        conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath)
      }
      logger.info("main jars :"+jars.mkString(";"))
      if(jars.nonEmpty) conf.setJars(jars)
      if(execUri != null) conf.set("spark.executor.uri",execUri)
      if(System.getenv("SPARK_HOME") != null) conf.setSparkHome(System.getenv("SPARK_HOME"))
      conf.set("spark.scheduler.mode", "FAIR")
      initSparkILoop(conf,outputDir)
      Thread.currentThread().setContextClassLoader(sparkILoop.intp.classLoader)
      val sparkSession = SparkSession.builder().config(conf).appName("SparkILoopStream").getOrCreate()
      val sparkContext = sparkSession.sparkContext
      sparkContext.setJobGroup("test-lm-1", "print(\"test\")", true)
      logger.info("sparkILoop init start -")
      if(sparkILoop.intp != null && sparkILoop.isInitializeComplete){
        logger.info("sparkILoop init run -")
        sparkILoop.beSilentDuring {
          sparkILoop.processLine(":silent")
          sparkILoop.processLine("import org.apache.spark.SparkContext")
          sparkILoop.processLine("import org.apache.spark.sql.SparkSession")
          sparkILoop.bind("sc", "org.apache.spark.SparkContext", sparkContext, List("""@transient"""))
          sparkILoop.bind("spark", "org.apache.spark.sql.SparkSession", sparkSession, List("""@transient"""))
          sparkILoop.processLine("import spark.implicits._")
          sparkILoop.processLine("implicit val sparkSession = spark")
        }
        try {
          sparkILoop.interpret("print(\"test lm hello\")") match {
            case Results.Success => logger.info("success code.")
            case Results.Incomplete => logger.info("incomplete code.")
            case Results.Error =>  logger.info("error code.")
          }
        }catch {
          case e=>e.printStackTrace()
        }
      }
      sparkContext.clearJobGroup()
      if (!sparkContext.isStopped) {
        sparkContext.cancelAllJobs
      }
    }
    def initSparkILoop(conf:SparkConf,outputDir: File) = {
      val settings = new GenericRunnerSettings(error(_))
      val sparkJars = conf.getOption("spark.jars")
      val jars = if(conf.get("spark.master").contains("yarn")){
        val yarnJars = conf.getOption("spark.yarn.dist.jars")
        SparkUtils.unionFileLists(sparkJars,yarnJars)
      }else{
        sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
      }
      logger.info("jars : "+jars.mkString(";"))
      val classpathJars = System.getProperty("java.class.path").split(":").filter(_.endsWith(".jar"))
      val classpath =jars.mkString(File.pathSeparator) + File.pathSeparator + classpathJars.mkString(File.pathSeparator)
      logger.info("classPath : "+classpath)
      settings.processArguments(List("-Yrepl-class-based",
        "-Yrepl-outdir", s"${outputDir.getAbsolutePath}","-classpath", classpath), true)
      //类加载 java.class.path
      settings.usejavacp.value = true
      //加载
      settings.embeddedDefaults(Thread.currentThread().getContextClassLoader)
       sparkILoop = new ILoop()
       val bool = sparkILoop.process(settings)
       logger.info("sparkILoop process :"+bool)
       // println("spark repl has been finished, now stop it.")
    }
    def createOutputDir(conf: SparkConf): File = {
      val rootDir = "/tmp"
      SparkUtils.createTempDir(root = rootDir, namePrefix = "repl")
    }
    def error(message: => String): Unit = {
      logger.error(message.toString)
    }
    }
    

参考

https://docs.scala-lang.org/overviews/repl/overview.html

目录
相关文章
|
2月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
154 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
3月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
77 0
|
3月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
51 0
|
3月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
105 0
|
2月前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
101 6
|
2月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
129 2
|
2月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
93 1
|
2月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
73 1
|
3月前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
68 1
|
3月前
|
消息中间件 分布式计算 Kafka
大数据平台的毕业设计02:Spark与实时计算
大数据平台的毕业设计02:Spark与实时计算
130 0