一.Scala REPL
scala repl("Read-Evaluate-Print-Loop") 是一个交互式命令行解释器,它提供了一个测试scala代码的环境。ILoop和IMain是其核心实现。
有用的REPL功能包括: REPL的IMain绑定到$intp。 REPL的最后一个异常绑定到lastException。 使用标签完成。 用于//print<tab>显示键入的重复标记。 使用:help的命令列表。 用于:load加载REPL输入文件。 用于:paste输入类和对象作为同伴。 用于:paste -raw禁用代码包装,定义程序包。 用于:javap检查类工件。 用于使用-Yrepl-outdir外部工具检查类工件。 用于:power进入超级模式并导入编译器组件。 用于:settings修改编译器设置;一些设置要求:replay。 用于:replay以修改后的设置重播会话。
import scala.tools.nsc.Settings import scala.tools.nsc.interpreter._ import javax.script._ /** A simple example showing programmatic usage of the REPL. */ object Main extends App { // the REPL has some support for javax.script val scripter = new ScriptEngineManager().getEngineByName("scala") scripter.eval("""println("hello, world")""") // compiler settings val settings = new Settings settings.processArgumentString("-deprecation -feature -Xfatal-warnings -Xlint") // the interpreter is used by the javax.script engine val intp = new IMain(settings) def interpret(code: String): Unit = { import Results._ val res = intp.interpret(code) match { case Success => "OK!" case _ => "Sorry, try again." } println(res) } interpret("""println("hello, world")""") interpret("""println(""") interpret("""val who = "world" ; println("hello, $who")""") // the REPL uses a line reader and an interpreter interactively val interactive = new ILoop() interactive.process(settings) // input to the REPL can be provided programmatically import java.io.{BufferedReader, StringReader, PrintWriter} val reader = new BufferedReader(new StringReader(""""hello, world"""")) val canned = new ILoop(reader, new PrintWriter(Console.out, true)) canned.process(settings) // more canning val code = """println("hello, world") ; 42""" val out = ILoop.run(code) println(s"Output is $out") }
/** The Scala interactive shell. It provides a read-eval-print loop
* around the Interpreter class.
* scala交互shell,它为Interpreter类提供了一个read-eval-print的循环环境
* After instantiation, clients should call the main() method.
* 在scala shell通过调用main()方法开始提供服务
* If no in0 is specified, then input will come from the console, and
* the class will attempt to provide input editing feature such as
* input history.
* 默认情况下input是console, scala shell 并提供了许多方便的功能,比如 历史输入等
class ILoop(in0: Option[BufferedReader], protected val out: JPrintWriter)
extends AnyRef
with LoopCommands
def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out)
def this() = this(None, new JPrintWriter(Console.out, true))
@deprecated("Use `intp` instead.", "2.9.0") def interpreter = intp
@deprecated("Use `intp` instead.", "2.9.0") def interpreter_= (i: Interpreter): Unit = intp = i
// 默认输入是 console
var in: InteractiveReader = _ // the input stream from which commands come
// 配置
var settings: Settings = _
// 翻译器
var intp: IMain = _
/** Standard commands **/
// scala shell和spark shell 中可以使用的命令
lazy val standardCommands = List(
cmd("edit", "<id>|<line>", "edit history", editCommand),
cmd("help", "[command]", "print this summary or command-specific help", helpCommand),
cmd("h?", "<string>", "search the history", searchHistory),
cmd("imports", "[name name ...]", "show import history, identifying sources of names", importsCommand),
cmd("implicits", "[-v]", "show the implicits in scope", intp.implicitsCommand),
cmd("javap", "<path|class>", "disassemble a file or class name", javapCommand),
cmd("line", "<id>|<line>", "place line(s) at the end of history", lineCommand),
cmd("load", "<path>", "interpret lines in a file", loadCommand),
cmd("paste", "[-raw] [path]", "enter paste mode or paste a file", pasteCommand),
nullary("power", "enable power user mode", powerCmd),
nullary("quit", "exit the interpreter", () => Result(keepRunning = false, None)),
cmd("replay", "[options]", "reset the repl and replay all previous commands", replayCommand),
cmd("require", "<path>", "add a jar to the classpath", require),
cmd("reset", "[options]", "reset the repl to its initial state, forgetting all session entries", resetCommand),
cmd("save", "<path>", "save replayable session to a file", saveCommand),
cmd("settings", "<options>", "update compiler options, if possible; see reset", changeSettings),
nullary("silent", "disable/enable automatic printing of results", verbosity),
cmd("type", "[-v] <expr>", "display the type of an expression without evaluating it", typeCommand),
cmd("kind", "[-v] <expr>", "display the kind of expression's type", kindCommand),
nullary("warnings", "show the suppressed warnings from the most recent line which had any", warningsCommand)
/** Create a new interpreter. */
def createInterpreter() {
if (addedClasspath != "")
settings.classpath append addedClasspath
// 官方实现的IMain
intp = new ILoopInterpreter
// start an interpreter with the given settings
// org.apache.spark.repl.Main中最后调用的 SparkILoop.process()的具体实现
def process(settings: Settings): Boolean = savingContextLoader {
this.settings = settings
// 创建Interpreter , spark 重写了这部分,加入了初始化spark环境的操作
// sets in to some kind of reader depending on environmental cues
in = in0.fold(chooseReader(settings))(r => SimpleReader(r, out, interactive = true))
globalFuture = future {
// 使用load 命令加载文件,主要是外部jar包
// 打印我们熟悉的欢迎界面
// 进入 read-eval-print循环
try loop() match {
case LineResults.EOF => out print Properties.shellInterruptedString
case _ =>
catch AbstractOrMissingHandler()
finally closeInterpreter()
/** The main read-eval-print loop for the repl. It calls
* command() for each line of input, and stops when
* command() returns false.
@tailrec final def loop(): LineResult = {
import LineResults._
// 读取1行输入
readOneLine() match {
case null => EOF
// 运行 processLine() , 没错 继续调用loop循环
case line => if (try processLine(line) catch crashRecovery) loop() else ERR
二.Spark REPL
"${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
object Main extends Logging {
val conf = new SparkConf()
var sparkContext: SparkContext = _
var sparkSession: SparkSession = _
// this is a public var because tests reset it.
var interp: SparkILoop = _
def main(args: Array[String]) {
doMain(args, new SparkILoop)
// Visible for testing
private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = {
// 核心
interp = _interp
// 设置classpath
val jars = Utils.getLocalUserJarsForShell(conf)
// Remove file:///, file:// or file:/ scheme if exists for each jar
.map { x => if (x.startsWith("file:")) new File(new URI(x)).getPath else x }
// 拼接启动参数
val interpArguments = List(
"-Yrepl-outdir", s"${outputDir.getAbsolutePath}",
"-classpath", jars
) ++ args.toList
// scala-shell通用参数 拼接用户自定义参数
val settings = new GenericRunnerSettings(scalaOptionError)
settings.processArguments(interpArguments, true)
if (!hasErrors) {
// 启动 scala-repl
interp.process(settings) // Repl starts and goes in loop of R.E.P.L
// 创建sparkSession sparkContext
def createSparkSession(): SparkSession = {
* A Spark-specific interactive shell.
class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
extends ILoop(in0, out) {
// sparkILoop主要是继承了scala的ILoop,它需要1个输入流,输出流,使用控制台输出 输入可以
def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out)
def this() = this(None, new JPrintWriter(Console.out, true))
// 初始化spark时运行的代码
val initializationCommands: Seq[String] = Seq(
@transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) {
} else {
@transient val sc = {
val _sc = spark.sparkContext
if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null)
if (proxyUrl != null) {
s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}")
} else {
println(s"Spark Context Web UI is available at Spark Master Public URL")
} else {
_sc.uiWebUrl.foreach {
webUrl => println(s"Spark context Web UI available at ${webUrl}")
println("Spark context available as 'sc' " +
s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
println("Spark session available as 'spark'.")
"import org.apache.spark.SparkContext._",
"import spark.implicits._",
"import spark.sql",
"import org.apache.spark.sql.functions._"
// 使用command函数逐条运行 initializationCommands中的代码
def initializeSpark() {
intp.beQuietDuring {
savingReplayStack { // remove the commands from session history.
/** Print a welcome message */
// spark 启动时打印的欢迎语
override def printWelcome() {
import org.apache.spark.SPARK_VERSION
echo("""Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version %s
val welcomeMsg = "Using Scala %s (%s, Java %s)".format(
versionString, javaVmName, javaVersion)
echo("Type in expressions to have them evaluated.")
echo("Type :help for more information.")
/** Available commands */
// shell中可以使用的命令,spark直接借用了 scala中默认的command,比如hlep edit imports等
override def commands: List[LoopCommand] = standardCommands
* We override `createInterpreter` because we need to initialize Spark *before* the REPL
* sees any files, so that the Spark context is visible in those files. This is a bit of a
* hack, but there isn't another hook available to us at this point.
// 覆盖ILoop的 createInterpreter, 在调用ILoop.createInterpreter后立马调用initializeSpark()初始化spark环境
override def createInterpreter(): Unit = {
override def resetCommand(line: String): Unit = {
echo("Note that after :reset, state of SparkSession and SparkContext is unchanged.")
override def replay(): Unit = {
// 定义1个static方法 run(code,sets) 应该用于使用sparkILoop运行1次输入的代码
object SparkILoop {
* Creates an interpreter loop with default settings and feeds
* the given code to it as input.
def run(code: String, sets: Settings = new Settings): String = {
import java.io.{ BufferedReader, StringReader, OutputStreamWriter }
stringFromStream { ostream =>
Console.withOut(ostream) {
val input = new BufferedReader(new StringReader(code))
val output = new JPrintWriter(new OutputStreamWriter(ostream), true)
val repl = new SparkILoop(input, output)
if (sets.classpath.isDefault) {
sets.classpath.value = sys.props("java.class.path")
//等价于 repl.process(sets)
repl process sets
def run(lines: List[String]): String = run(lines.map(_ + "\n").mkString)
同时代码还引入了spark中常用的隐式转化,spark sql常用的方法。由于scala在2.12版本只有提供了默认的Interpreter,所在原码中createInterpreter()直接调用父类的对应方法,而在之前的版本,createInterpreter()创建的是spark自己实现的SparkILoopInterpreter。
- spark-shell实际是使用spark-submit运行org.apache.spark.repl.Main。
- 该Main主要是就是初始化SparkILoop并调用其process()方法。
- SparkILoop主要是继承scala的ILoop,并在指定初始化时运行的代码,这些代码主要是创建sparkContext。
- scala ILoop为Interpreter类提供了一个read-eval-print的循环环境。
- 后续准备将SparkILoop的输入输出流与HTTP对接,实现一个可以与spark直接交互的web界面。
import java.io.{BufferedOutputStream, File, FileOutputStream} import org.apache.spark.SparkConf import org.apache.spark.repl.SparkILoop import org.apache.spark.sql.SparkSession import org.apache.spark.util.{SparkUtils, Utils} import org.slf4j.LoggerFactory import scala.concurrent.Future import scala.tools.nsc.GenericRunnerSettings import scala.tools.nsc.interpreter.{ILoop, Results} /** * @Classname SparkILoopStream * @Description TODO * @Date 2020/11/17 17:11 * @Created by limeng */ object SparkILoopStream { protected lazy implicit val logger = LoggerFactory.getLogger(getClass) var sparkILoop: ILoop = _ def main(args: Array[String]): Unit = { val execUri = System.getenv("SPARK_EXECUTOR_URI") val conf = new SparkConf() val outputDir = createOutputDir(conf) val sparkJars = conf.getOption("spark.jars") val master = conf.getOption("spark.master").getOrElse("yarn") conf.setMaster(master) val jars = if (conf.get("spark.master").contains("yarn")) { val yarnJars = conf.getOption("spark.yarn.dist.jars") SparkUtils.unionFileLists(sparkJars, yarnJars).toSeq } else { sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten } if(outputDir != null) { conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath) } logger.info("main jars :"+jars.mkString(";")) if(jars.nonEmpty) conf.setJars(jars) if(execUri != null) conf.set("spark.executor.uri",execUri) if(System.getenv("SPARK_HOME") != null) conf.setSparkHome(System.getenv("SPARK_HOME")) conf.set("spark.scheduler.mode", "FAIR") initSparkILoop(conf,outputDir) Thread.currentThread().setContextClassLoader(sparkILoop.intp.classLoader) val sparkSession = SparkSession.builder().config(conf).appName("SparkILoopStream").getOrCreate() val sparkContext = sparkSession.sparkContext sparkContext.setJobGroup("test-lm-1", "print(\"test\")", true) logger.info("sparkILoop init start -") if(sparkILoop.intp != null && sparkILoop.isInitializeComplete){ logger.info("sparkILoop init run -") sparkILoop.beSilentDuring { sparkILoop.processLine(":silent") sparkILoop.processLine("import org.apache.spark.SparkContext") sparkILoop.processLine("import org.apache.spark.sql.SparkSession") sparkILoop.bind("sc", "org.apache.spark.SparkContext", sparkContext, List("""@transient""")) sparkILoop.bind("spark", "org.apache.spark.sql.SparkSession", sparkSession, List("""@transient""")) sparkILoop.processLine("import spark.implicits._") sparkILoop.processLine("implicit val sparkSession = spark") } try { sparkILoop.interpret("print(\"test lm hello\")") match { case Results.Success => logger.info("success code.") case Results.Incomplete => logger.info("incomplete code.") case Results.Error => logger.info("error code.") } }catch { case e=>e.printStackTrace() } } sparkContext.clearJobGroup() if (!sparkContext.isStopped) { sparkContext.cancelAllJobs } } def initSparkILoop(conf:SparkConf,outputDir: File) = { val settings = new GenericRunnerSettings(error(_)) val sparkJars = conf.getOption("spark.jars") val jars = if(conf.get("spark.master").contains("yarn")){ val yarnJars = conf.getOption("spark.yarn.dist.jars") SparkUtils.unionFileLists(sparkJars,yarnJars) }else{ sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten } logger.info("jars : "+jars.mkString(";")) val classpathJars = System.getProperty("java.class.path").split(":").filter(_.endsWith(".jar")) val classpath =jars.mkString(File.pathSeparator) + File.pathSeparator + classpathJars.mkString(File.pathSeparator) logger.info("classPath : "+classpath) settings.processArguments(List("-Yrepl-class-based", "-Yrepl-outdir", s"${outputDir.getAbsolutePath}","-classpath", classpath), true) //类加载 java.class.path settings.usejavacp.value = true //加载 settings.embeddedDefaults(Thread.currentThread().getContextClassLoader) sparkILoop = new ILoop() val bool = sparkILoop.process(settings) logger.info("sparkILoop process :"+bool) // println("spark repl has been finished, now stop it.") } def createOutputDir(conf: SparkConf): File = { val rootDir = "/tmp" SparkUtils.createTempDir(root = rootDir, namePrefix = "repl") } def error(message: => String): Unit = { logger.error(message.toString) } }