一.Scala REPL
scala repl("Read-Evaluate-Print-Loop") 是一个交互式命令行解释器,它提供了一个测试scala代码的环境。ILoop和IMain是其核心实现。
属性
有用的REPL功能包括: REPL的IMain绑定到$intp。 REPL的最后一个异常绑定到lastException。 使用标签完成。 用于//print<tab>显示键入的重复标记。 使用:help的命令列表。 用于:load加载REPL输入文件。 用于:paste输入类和对象作为同伴。 用于:paste -raw禁用代码包装,定义程序包。 用于:javap检查类工件。 用于使用-Yrepl-outdir外部工具检查类工件。 用于:power进入超级模式并导入编译器组件。 用于:settings修改编译器设置;一些设置要求:replay。 用于:replay以修改后的设置重播会话。
示例
import scala.tools.nsc.Settings import scala.tools.nsc.interpreter._ import javax.script._ /** A simple example showing programmatic usage of the REPL. */ object Main extends App { // the REPL has some support for javax.script val scripter = new ScriptEngineManager().getEngineByName("scala") scripter.eval("""println("hello, world")""") // compiler settings val settings = new Settings settings.processArgumentString("-deprecation -feature -Xfatal-warnings -Xlint") // the interpreter is used by the javax.script engine val intp = new IMain(settings) def interpret(code: String): Unit = { import Results._ val res = intp.interpret(code) match { case Success => "OK!" case _ => "Sorry, try again." } println(res) } interpret("""println("hello, world")""") interpret("""println(""") interpret("""val who = "world" ; println("hello, $who")""") // the REPL uses a line reader and an interpreter interactively val interactive = new ILoop() interactive.process(settings) // input to the REPL can be provided programmatically import java.io.{BufferedReader, StringReader, PrintWriter} val reader = new BufferedReader(new StringReader(""""hello, world"""")) val canned = new ILoop(reader, new PrintWriter(Console.out, true)) canned.process(settings) // more canning val code = """println("hello, world") ; 42""" val out = ILoop.run(code) println(s"Output is $out") }
源码
ILoop为Interpreter类(用于翻译scala代码)提供了read(读代码)-eval(运行代码)-print(打印代码)的循环环境。IMain是用于将遵守scala代码规范的字符串翻译成可运行的字节码,它的子类就是前文所说的Interpreter类,scala在2.12版本中给出了IMain的默认实现ILoopInterpreter。本文重点介绍ILoop,不介绍IMain及其子类的实现。
/** The Scala interactive shell. It provides a read-eval-print loop
* around the Interpreter class.
* scala交互shell,它为Interpreter类提供了一个read-eval-print的循环环境
*
* After instantiation, clients should call the main() method.
* 在scala shell通过调用main()方法开始提供服务
*
* If no in0 is specified, then input will come from the console, and
* the class will attempt to provide input editing feature such as
* input history.
* 默认情况下input是console, scala shell 并提供了许多方便的功能,比如 历史输入等
*
*/
class ILoop(in0: Option[BufferedReader], protected val out: JPrintWriter)
extends AnyRef
with LoopCommands
{
def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out)
def this() = this(None, new JPrintWriter(Console.out, true))
@deprecated("Use `intp` instead.", "2.9.0") def interpreter = intp
@deprecated("Use `intp` instead.", "2.9.0") def interpreter_= (i: Interpreter): Unit = intp = i
// 默认输入是 console
var in: InteractiveReader = _ // the input stream from which commands come
// 配置
var settings: Settings = _
// 翻译器
var intp: IMain = _
/** Standard commands **/
// scala shell和spark shell 中可以使用的命令
lazy val standardCommands = List(
cmd("edit", "<id>|<line>", "edit history", editCommand),
cmd("help", "[command]", "print this summary or command-specific help", helpCommand),
historyCommand,
cmd("h?", "<string>", "search the history", searchHistory),
cmd("imports", "[name name ...]", "show import history, identifying sources of names", importsCommand),
cmd("implicits", "[-v]", "show the implicits in scope", intp.implicitsCommand),
cmd("javap", "<path|class>", "disassemble a file or class name", javapCommand),
cmd("line", "<id>|<line>", "place line(s) at the end of history", lineCommand),
cmd("load", "<path>", "interpret lines in a file", loadCommand),
cmd("paste", "[-raw] [path]", "enter paste mode or paste a file", pasteCommand),
nullary("power", "enable power user mode", powerCmd),
nullary("quit", "exit the interpreter", () => Result(keepRunning = false, None)),
cmd("replay", "[options]", "reset the repl and replay all previous commands", replayCommand),
cmd("require", "<path>", "add a jar to the classpath", require),
cmd("reset", "[options]", "reset the repl to its initial state, forgetting all session entries", resetCommand),
cmd("save", "<path>", "save replayable session to a file", saveCommand),
shCommand,
cmd("settings", "<options>", "update compiler options, if possible; see reset", changeSettings),
nullary("silent", "disable/enable automatic printing of results", verbosity),
cmd("type", "[-v] <expr>", "display the type of an expression without evaluating it", typeCommand),
cmd("kind", "[-v] <expr>", "display the kind of expression's type", kindCommand),
nullary("warnings", "show the suppressed warnings from the most recent line which had any", warningsCommand)
)
/** Create a new interpreter. */
def createInterpreter() {
if (addedClasspath != "")
settings.classpath append addedClasspath
// 官方实现的IMain
intp = new ILoopInterpreter
}
// start an interpreter with the given settings
// org.apache.spark.repl.Main中最后调用的 SparkILoop.process()的具体实现
def process(settings: Settings): Boolean = savingContextLoader {
this.settings = settings
// 创建Interpreter , spark 重写了这部分,加入了初始化spark环境的操作
createInterpreter()
// sets in to some kind of reader depending on environmental cues
in = in0.fold(chooseReader(settings))(r => SimpleReader(r, out, interactive = true))
globalFuture = future {
intp.initializeSynchronous()
loopPostInit()
!intp.reporter.hasErrors
}
// 使用load 命令加载文件,主要是外部jar包
loadFiles(settings)
// 打印我们熟悉的欢迎界面
printWelcome()
// 进入 read-eval-print循环
try loop() match {
case LineResults.EOF => out print Properties.shellInterruptedString
case _ =>
}
catch AbstractOrMissingHandler()
finally closeInterpreter()
true
}
/** The main read-eval-print loop for the repl. It calls
* command() for each line of input, and stops when
* command() returns false.
*/
@tailrec final def loop(): LineResult = {
import LineResults._
// 读取1行输入
readOneLine() match {
case null => EOF
// 运行 processLine() , 没错 继续调用loop循环
case line => if (try processLine(line) catch crashRecovery) loop() else ERR
}
}
}
从核心代码可以明确感受到read-eval-print这个循环过程,使用readOneLine()从输入中读取指令,processLine(line)运行指令并打印输出然后在循环。
二.Spark REPL
spark-shell提供了一种交互式使用spark的方式。用户只要运行./bin/spark-shell就可以进入spark-shell中,在该环境中系统默认为用户创建了一个SparkContext,用户可以直接使用scala语言控制这个sc。下面我们先看下spark-shell。
....
"${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
....
可以看出spark-shell其实是用spark-submit提交了一个任务,这个任务的主函数是org.apache.spark.repl.Main,它在名为spark-repl的module中。
object Main extends Logging {
………………
val conf = new SparkConf()
var sparkContext: SparkContext = _
var sparkSession: SparkSession = _
// this is a public var because tests reset it.
var interp: SparkILoop = _
def main(args: Array[String]) {
doMain(args, new SparkILoop)
}
// Visible for testing
private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = {
// 核心
interp = _interp
// 设置classpath
val jars = Utils.getLocalUserJarsForShell(conf)
// Remove file:///, file:// or file:/ scheme if exists for each jar
.map { x => if (x.startsWith("file:")) new File(new URI(x)).getPath else x }
.mkString(File.pathSeparator)
// 拼接启动参数
val interpArguments = List(
"-Yrepl-class-based",
"-Yrepl-outdir", s"${outputDir.getAbsolutePath}",
"-classpath", jars
) ++ args.toList
// scala-shell通用参数 拼接用户自定义参数
val settings = new GenericRunnerSettings(scalaOptionError)
settings.processArguments(interpArguments, true)
if (!hasErrors) {
// 启动 scala-repl
interp.process(settings) // Repl starts and goes in loop of R.E.P.L
Option(sparkContext).foreach(_.stop)
}
}
// 创建sparkSession sparkContext
def createSparkSession(): SparkSession = {
………………
}
…………
}
从源码可以看出启动spark-shell时主要就是创建并启动了一个SparkILoop,启动时设置了spark所需要的classpath。下面我们来看下SparkILoop。
/**
* A Spark-specific interactive shell.
*/
class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
extends ILoop(in0, out) {
// sparkILoop主要是继承了scala的ILoop,它需要1个输入流,输出流,使用控制台输出 输入可以
def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out)
def this() = this(None, new JPrintWriter(Console.out, true))
// 初始化spark时运行的代码
val initializationCommands: Seq[String] = Seq(
"""
@transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) {
org.apache.spark.repl.Main.sparkSession
} else {
org.apache.spark.repl.Main.createSparkSession()
}
@transient val sc = {
val _sc = spark.sparkContext
if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null)
if (proxyUrl != null) {
println(
s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}")
} else {
println(s"Spark Context Web UI is available at Spark Master Public URL")
}
} else {
_sc.uiWebUrl.foreach {
webUrl => println(s"Spark context Web UI available at ${webUrl}")
}
}
println("Spark context available as 'sc' " +
s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
println("Spark session available as 'spark'.")
_sc
}
""",
"import org.apache.spark.SparkContext._",
"import spark.implicits._",
"import spark.sql",
"import org.apache.spark.sql.functions._"
)
// 使用command函数逐条运行 initializationCommands中的代码
def initializeSpark() {
intp.beQuietDuring {
savingReplayStack { // remove the commands from session history.
initializationCommands.foreach(command)
}
}
}
/** Print a welcome message */
// spark 启动时打印的欢迎语
override def printWelcome() {
import org.apache.spark.SPARK_VERSION
echo("""Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version %s
/_/
""".format(SPARK_VERSION))
val welcomeMsg = "Using Scala %s (%s, Java %s)".format(
versionString, javaVmName, javaVersion)
echo(welcomeMsg)
echo("Type in expressions to have them evaluated.")
echo("Type :help for more information.")
}
/** Available commands */
// shell中可以使用的命令,spark直接借用了 scala中默认的command,比如hlep edit imports等
override def commands: List[LoopCommand] = standardCommands
/**
* We override `createInterpreter` because we need to initialize Spark *before* the REPL
* sees any files, so that the Spark context is visible in those files. This is a bit of a
* hack, but there isn't another hook available to us at this point.
*/
// 覆盖ILoop的 createInterpreter, 在调用ILoop.createInterpreter后立马调用initializeSpark()初始化spark环境
override def createInterpreter(): Unit = {
super.createInterpreter()
initializeSpark()
}
override def resetCommand(line: String): Unit = {
super.resetCommand(line)
initializeSpark()
echo("Note that after :reset, state of SparkSession and SparkContext is unchanged.")
}
override def replay(): Unit = {
initializeSpark()
super.replay()
}
}
// 定义1个static方法 run(code,sets) 应该用于使用sparkILoop运行1次输入的代码
object SparkILoop {
/**
* Creates an interpreter loop with default settings and feeds
* the given code to it as input.
*/
def run(code: String, sets: Settings = new Settings): String = {
import java.io.{ BufferedReader, StringReader, OutputStreamWriter }
stringFromStream { ostream =>
Console.withOut(ostream) {
val input = new BufferedReader(new StringReader(code))
val output = new JPrintWriter(new OutputStreamWriter(ostream), true)
val repl = new SparkILoop(input, output)
if (sets.classpath.isDefault) {
sets.classpath.value = sys.props("java.class.path")
}
//等价于 repl.process(sets)
repl process sets
}
}
}
def run(lines: List[String]): String = run(lines.map(_ + "\n").mkString)
}
根据sparkILoop的原码我们可以看出,它主要工作就是继承ILoop,定义ILoop初始化时需要运行的代码。这些代码保存在initializationCommands中,主要用于创建spark运行环境。
代码调用org.apache.spark.repl.Main.createSparkSession()创建sparkSession和sparkContext,并将sparkContext赋值给变量sc,这样用户就可以spark-shell中直接使用了。
同时代码还引入了spark中常用的隐式转化,spark sql常用的方法。由于scala在2.12版本只有提供了默认的Interpreter,所在原码中createInterpreter()直接调用父类的对应方法,而在之前的版本,createInterpreter()创建的是spark自己实现的SparkILoopInterpreter。
三.总结
- spark-shell实际是使用spark-submit运行org.apache.spark.repl.Main。
- 该Main主要是就是初始化SparkILoop并调用其process()方法。
- SparkILoop主要是继承scala的ILoop,并在指定初始化时运行的代码,这些代码主要是创建sparkContext。
- scala ILoop为Interpreter类提供了一个read-eval-print的循环环境。
- 后续准备将SparkILoop的输入输出流与HTTP对接,实现一个可以与spark直接交互的web界面。
示例
import java.io.{BufferedOutputStream, File, FileOutputStream} import org.apache.spark.SparkConf import org.apache.spark.repl.SparkILoop import org.apache.spark.sql.SparkSession import org.apache.spark.util.{SparkUtils, Utils} import org.slf4j.LoggerFactory import scala.concurrent.Future import scala.tools.nsc.GenericRunnerSettings import scala.tools.nsc.interpreter.{ILoop, Results} /** * @Classname SparkILoopStream * @Description TODO * @Date 2020/11/17 17:11 * @Created by limeng */ object SparkILoopStream { protected lazy implicit val logger = LoggerFactory.getLogger(getClass) var sparkILoop: ILoop = _ def main(args: Array[String]): Unit = { val execUri = System.getenv("SPARK_EXECUTOR_URI") val conf = new SparkConf() val outputDir = createOutputDir(conf) val sparkJars = conf.getOption("spark.jars") val master = conf.getOption("spark.master").getOrElse("yarn") conf.setMaster(master) val jars = if (conf.get("spark.master").contains("yarn")) { val yarnJars = conf.getOption("spark.yarn.dist.jars") SparkUtils.unionFileLists(sparkJars, yarnJars).toSeq } else { sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten } if(outputDir != null) { conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath) } logger.info("main jars :"+jars.mkString(";")) if(jars.nonEmpty) conf.setJars(jars) if(execUri != null) conf.set("spark.executor.uri",execUri) if(System.getenv("SPARK_HOME") != null) conf.setSparkHome(System.getenv("SPARK_HOME")) conf.set("spark.scheduler.mode", "FAIR") initSparkILoop(conf,outputDir) Thread.currentThread().setContextClassLoader(sparkILoop.intp.classLoader) val sparkSession = SparkSession.builder().config(conf).appName("SparkILoopStream").getOrCreate() val sparkContext = sparkSession.sparkContext sparkContext.setJobGroup("test-lm-1", "print(\"test\")", true) logger.info("sparkILoop init start -") if(sparkILoop.intp != null && sparkILoop.isInitializeComplete){ logger.info("sparkILoop init run -") sparkILoop.beSilentDuring { sparkILoop.processLine(":silent") sparkILoop.processLine("import org.apache.spark.SparkContext") sparkILoop.processLine("import org.apache.spark.sql.SparkSession") sparkILoop.bind("sc", "org.apache.spark.SparkContext", sparkContext, List("""@transient""")) sparkILoop.bind("spark", "org.apache.spark.sql.SparkSession", sparkSession, List("""@transient""")) sparkILoop.processLine("import spark.implicits._") sparkILoop.processLine("implicit val sparkSession = spark") } try { sparkILoop.interpret("print(\"test lm hello\")") match { case Results.Success => logger.info("success code.") case Results.Incomplete => logger.info("incomplete code.") case Results.Error => logger.info("error code.") } }catch { case e=>e.printStackTrace() } } sparkContext.clearJobGroup() if (!sparkContext.isStopped) { sparkContext.cancelAllJobs } } def initSparkILoop(conf:SparkConf,outputDir: File) = { val settings = new GenericRunnerSettings(error(_)) val sparkJars = conf.getOption("spark.jars") val jars = if(conf.get("spark.master").contains("yarn")){ val yarnJars = conf.getOption("spark.yarn.dist.jars") SparkUtils.unionFileLists(sparkJars,yarnJars) }else{ sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten } logger.info("jars : "+jars.mkString(";")) val classpathJars = System.getProperty("java.class.path").split(":").filter(_.endsWith(".jar")) val classpath =jars.mkString(File.pathSeparator) + File.pathSeparator + classpathJars.mkString(File.pathSeparator) logger.info("classPath : "+classpath) settings.processArguments(List("-Yrepl-class-based", "-Yrepl-outdir", s"${outputDir.getAbsolutePath}","-classpath", classpath), true) //类加载 java.class.path settings.usejavacp.value = true //加载 settings.embeddedDefaults(Thread.currentThread().getContextClassLoader) sparkILoop = new ILoop() val bool = sparkILoop.process(settings) logger.info("sparkILoop process :"+bool) // println("spark repl has been finished, now stop it.") } def createOutputDir(conf: SparkConf): File = { val rootDir = "/tmp" SparkUtils.createTempDir(root = rootDir, namePrefix = "repl") } def error(message: => String): Unit = { logger.error(message.toString) } }
参考