版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/77449938
注:本文是为了配合《Spark内核设计的艺术 架构设计与实现》一书的内容而编写,目的是为了节省成本、方便读者查阅。书中附录C的内容都在本文呈现。
Jetty简介
Jetty是一个开源的,以Java作为开发语言的servlet容器。它的API以一组JAR包的形式发布。Jetty容器可以实例化成一个对象,因而迅速为一些独立运行的Java应用提供网络和web服务。要为Jetty创建servlet,就涉及ServletContextHandler的API使用。示例代码如下:
class HelloServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
private String msg = "Hello World!";
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
response.setContentType("text/html");
response.setStatus(HttpServletResponse.SC_OK);
response.getWriter().println("<h1>" + msg + "</h1>");
response.getWriter().println("session=" + request.getSession(true).getId());
}
}
public static void main(String[] args) throws Exception {
Server server = new Server(8080);
ServletContextHandler context = new ServletContextHandler();
context.setContextPath("/");
server.setHandler(context);
// http://localhost:8080/hello
context.addServlet(new ServletHolder(new HelloServlet()), "/hello");
server.start();
server.join();
}
如果想更深入了解Jetty,请访问官网
http://www.eclipse.org/jetty/
JettyUtils
JettyUtils是Spark对于Jetty相关API的又一层封装,这里对其中一些主要类型和方法进行介绍。ServerInfo
功能描述:提供给Jetty服务器添加或移除ContextHandler,以及停止Jetty服务器的实现。
private[spark] case class ServerInfo(
server: Server,
boundPort: Int,
securePort: Option[Int],
private val rootHandler: ContextHandlerCollection) {
def addHandler(handler: ContextHandler): Unit = {
handler.setVirtualHosts(Array("@" + JettyUtils.SPARK_CONNECTOR_NAME))
rootHandler.addHandler(handler)
if (!handler.isStarted()) {
handler.start()
}
}
def removeHandler(handler: ContextHandler): Unit = {
rootHandler.removeHandler(handler)
if (handler.isStarted) {
handler.stop()
}
}
def stop(): Unit = {
server.stop()
// Stop the ThreadPool if it supports stop() method (through LifeCycle).
// It is needed because stopping the Server won't stop the ThreadPool it uses.
val threadPool = server.getThreadPool
if (threadPool != null && threadPool.isInstanceOf[LifeCycle]) {
threadPool.asInstanceOf[LifeCycle].stop
}
}
}
createServlet
功能描述:创建javax.servlet.http.HttpServlet的匿名内部类实例。此实例处理请求实际是使用servletParams的responder:Responder,此Responder类型发生隐式转换,会转换为用户传入的函数参数。 def createServlet[T <% AnyRef](
servletParams: ServletParams[T],
securityMgr: SecurityManager,
conf: SparkConf): HttpServlet = {
val allowFramingFrom = conf.getOption("spark.ui.allowFramingFrom")
val xFrameOptionsValue =
allowFramingFrom.map(uri => s"ALLOW-FROM $uri").getOrElse("SAMEORIGIN")
new HttpServlet {
override def doGet(request: HttpServletRequest, response: HttpServletResponse) {
try {
if (securityMgr.checkUIViewPermissions(request.getRemoteUser)) {
response.setContentType("%s;charset=utf-8".format(servletParams.contentType))
response.setStatus(HttpServletResponse.SC_OK)
val result = servletParams.responder(request)
response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate")
response.setHeader("X-Frame-Options", xFrameOptionsValue)
response.getWriter.print(servletParams.extractFn(result))
} else {
response.setStatus(HttpServletResponse.SC_UNAUTHORIZED)
response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate")
response.sendError(HttpServletResponse.SC_UNAUTHORIZED,
"User is not authorized to access this page.")
}
} catch {
case e: IllegalArgumentException =>
response.sendError(HttpServletResponse.SC_BAD_REQUEST, e.getMessage)
case e: Exception =>
logWarning(s"GET ${request.getRequestURI} failed: $e", e)
throw e
}
}
// SPARK-5983 ensure TRACE is not supported
protected override def doTrace(req: HttpServletRequest, res: HttpServletResponse): Unit = {
res.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED)
}
}
}
createServletHandler
功能描述:创建以给定路径为前缀的请求的ServletContextHandler。处理步骤如下:
1) 调用createServlet,生成javax.servlet.http.HttpServlet的匿名内部类实例。此实例处理请求实际是使用servletParams的responder:Responder,此Responder类型发生隐式转换,会转换为用户传入的函数参数。
2)调用重载的createServletHandler方法,生成org.eclipse.jetty.servlet.ServletHolder ,并最终生成ServletContextHandler。
createServletHandler的实现如下。 def createServletHandler[T <% AnyRef](
path: String,
servletParams: ServletParams[T],
securityMgr: SecurityManager,
conf: SparkConf,
basePath: String = ""): ServletContextHandler = {
createServletHandler(path, createServlet(servletParams, securityMgr, conf), basePath)
}
/** Create a context handler that responds to a request with the given path prefix */
def createServletHandler(
path: String,
servlet: HttpServlet,
basePath: String): ServletContextHandler = {
val prefixedPath = if (basePath == "" && path == "/") {
path
} else {
(basePath + path).stripSuffix("/")
}
val contextHandler = new ServletContextHandler
val holder = new ServletHolder(servlet)
contextHandler.setContextPath(prefixedPath)
contextHandler.addServlet(holder, "/")
contextHandler
}
createStaticHandler
功能描述:创建对静态目录提供文件服务的ServletContextHandler。 def createStaticHandler(resourceBase: String, path: String): ServletContextHandler = {
val contextHandler = new ServletContextHandler
contextHandler.setInitParameter("org.eclipse.jetty.servlet.Default.gzip", "false")
val staticHandler = new DefaultServlet
val holder = new ServletHolder(staticHandler)
Option(Utils.getSparkClassLoader.getResource(resourceBase)) match {
case Some(res) =>
holder.setInitParameter("resourceBase", res.toString)
case None =>
throw new Exception("Could not find resource path for Web UI: " + resourceBase)
}
contextHandler.setContextPath(path)
contextHandler.addServlet(holder, "/")
contextHandler
}
createRedirectHandler
功能描述:创建将用户对源路径的请求总是重定向到目标路径的ServletContextHandler。 def createRedirectHandler(
srcPath: String,
destPath: String,
beforeRedirect: HttpServletRequest => Unit = x => (),
basePath: String = "",
httpMethods: Set[String] = Set("GET")): ServletContextHandler = {
val prefixedDestPath = basePath + destPath
val servlet = new HttpServlet {
override def doGet(request: HttpServletRequest, response: HttpServletResponse): Unit = {
if (httpMethods.contains("GET")) {
doRequest(request, response)
} else {
response.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED)
}
}
override def doPost(request: HttpServletRequest, response: HttpServletResponse): Unit = {
if (httpMethods.contains("POST")) {
doRequest(request, response)
} else {
response.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED)
}
}
private def doRequest(request: HttpServletRequest, response: HttpServletResponse): Unit = {
beforeRedirect(request)
// Make sure we don't end up with "//" in the middle
val newUrl = new URL(new URL(request.getRequestURL.toString), prefixedDestPath).toString
response.sendRedirect(newUrl)
}
// SPARK-5983 ensure TRACE is not supported
protected override def doTrace(req: HttpServletRequest, res: HttpServletResponse): Unit = {
res.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED)
}
}
createServletHandler(srcPath, servlet, basePath)
}
startJettyServer
功能描述:创建以给定路径为前缀的请求的响应处理。处理步骤如下:
1) 将SparkUI中的全部handler加入ContextHandlerCollection。
2) 如果使用配置spark.ui.filters指定了filter,则给所有handler增加filter。
3) 调用Utils的方法startServiceOnPort,最终回调函数connect。
startJettyServer的实现如下。 def startJettyServer(
hostName: String,
port: Int,
sslOptions: SSLOptions,
handlers: Seq[ServletContextHandler],
conf: SparkConf,
serverName: String = ""): ServerInfo = {
addFilters(handlers, conf)
val gzipHandlers = handlers.map { h =>
h.setVirtualHosts(Array("@" + SPARK_CONNECTOR_NAME))
val gzipHandler = new GzipHandler
gzipHandler.setHandler(h)
gzipHandler
}
// Bind to the given port, or throw a java.net.BindException if the port is occupied
def connect(currentPort: Int): ((Server, Option[Int]), Int) = {
val pool = new QueuedThreadPool
if (serverName.nonEmpty) {
pool.setName(serverName)
}
pool.setDaemon(true)
val server = new Server(pool)
val connectors = new ArrayBuffer[ServerConnector]()
val collection = new ContextHandlerCollection
// Create a connector on port currentPort to listen for HTTP requests
val httpConnector = new ServerConnector(
server,
null,
// Call this full constructor to set this, which forces daemon threads:
new ScheduledExecutorScheduler(s"$serverName-JettyScheduler", true),
null,
-1,
-1,
new HttpConnectionFactory())
httpConnector.setPort(currentPort)
connectors += httpConnector
val httpsConnector = sslOptions.createJettySslContextFactory() match {
case Some(factory) =>
// If the new port wraps around, do not try a privileged port.
val securePort =
if (currentPort != 0) {
(currentPort + 400 - 1024) % (65536 - 1024) + 1024
} else {
0
}
val scheme = "https"
// Create a connector on port securePort to listen for HTTPS requests
val connector = new ServerConnector(server, factory)
connector.setPort(securePort)
connector.setName(SPARK_CONNECTOR_NAME)
connectors += connector
// redirect the HTTP requests to HTTPS port
httpConnector.setName(REDIRECT_CONNECTOR_NAME)
collection.addHandler(createRedirectHttpsHandler(securePort, scheme))
Some(connector)
case None =>
// No SSL, so the HTTP connector becomes the official one where all contexts bind.
httpConnector.setName(SPARK_CONNECTOR_NAME)
None
}
// As each acceptor and each selector will use one thread, the number of threads should at
// least be the number of acceptors and selectors plus 1. (See SPARK-13776)
var minThreads = 1
connectors.foreach { connector =>
// Currently we only use "SelectChannelConnector"
// Limit the max acceptor number to 8 so that we don't waste a lot of threads
connector.setAcceptQueueSize(math.min(connector.getAcceptors, 8))
connector.setHost(hostName)
// The number of selectors always equals to the number of acceptors
minThreads += connector.getAcceptors * 2
}
pool.setMaxThreads(math.max(pool.getMaxThreads, minThreads))
val errorHandler = new ErrorHandler()
errorHandler.setShowStacks(true)
errorHandler.setServer(server)
server.addBean(errorHandler)
gzipHandlers.foreach(collection.addHandler)
server.setHandler(collection)
server.setConnectors(connectors.toArray)
try {
server.start()
((server, httpsConnector.map(_.getLocalPort())), httpConnector.getLocalPort)
} catch {
case e: Exception =>
server.stop()
pool.stop()
throw e
}
}
val ((server, securePort), boundPort) = Utils.startServiceOnPort(port, connect, conf,
serverName)
ServerInfo(server, boundPort, securePort,
server.getHandler().asInstanceOf[ContextHandlerCollection])
}
关于《Spark内核设计的艺术 架构设计与实现》
经过近一年的准备,《 Spark内核设计的艺术 架构设计与实现 》一书现已出版发行,图书如图:
电子版售卖链接如下: