开发者社区> 泰山不老生> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

附录C Jetty与JettyUtils

简介: 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/77449938 注:本文是为了配合《Spark内核设计的艺术 架构设计与实现》一书的内容而编写,目的是为了节省成本、方便读者查阅。
+关注继续查看
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/77449938

注:本文是为了配合《Spark内核设计的艺术 架构设计与实现》一书的内容而编写,目的是为了节省成本、方便读者查阅。书中附录C的内容都在本文呈现。

Jetty简介

Jetty是一个开源的,以Java作为开发语言的servlet容器。它的API以一组JAR包的形式发布。Jetty容器可以实例化成一个对象,因而迅速为一些独立运行的Java应用提供网络和web服务。要为Jetty创建servlet,就涉及ServletContextHandler的API使用。示例代码如下:

class HelloServlet extends HttpServlet {  
  private static final long serialVersionUID = 1L;  
  private String msg = "Hello World!";  

  protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {  
    response.setContentType("text/html");  
    response.setStatus(HttpServletResponse.SC_OK);  
    response.getWriter().println("<h1>" + msg + "</h1>");  
    response.getWriter().println("session=" + request.getSession(true).getId());  
  }  
}
public static void main(String[] args) throws Exception {  
  Server server = new Server(8080);  
  ServletContextHandler context = new ServletContextHandler();  
  context.setContextPath("/");  
  server.setHandler(context);  
  // http://localhost:8080/hello  
  context.addServlet(new ServletHolder(new HelloServlet()), "/hello");
  server.start();  
  server.join();
}
如果想更深入了解Jetty,请访问官网http://www.eclipse.org/jetty/

JettyUtils

JettyUtils是Spark对于Jetty相关API的又一层封装,这里对其中一些主要类型和方法进行介绍。

ServerInfo

功能描述:提供给Jetty服务器添加或移除ContextHandler,以及停止Jetty服务器的实现。

private[spark] case class ServerInfo(
    server: Server,
    boundPort: Int,
    securePort: Option[Int],
    private val rootHandler: ContextHandlerCollection) {

  def addHandler(handler: ContextHandler): Unit = {
    handler.setVirtualHosts(Array("@" + JettyUtils.SPARK_CONNECTOR_NAME))
    rootHandler.addHandler(handler)
    if (!handler.isStarted()) {
      handler.start()
    }
  }

  def removeHandler(handler: ContextHandler): Unit = {
    rootHandler.removeHandler(handler)
    if (handler.isStarted) {
      handler.stop()
    }
  }

  def stop(): Unit = {
    server.stop()
    // Stop the ThreadPool if it supports stop() method (through LifeCycle).
    // It is needed because stopping the Server won't stop the ThreadPool it uses.
    val threadPool = server.getThreadPool
    if (threadPool != null && threadPool.isInstanceOf[LifeCycle]) {
      threadPool.asInstanceOf[LifeCycle].stop
    }
  }
}

createServlet

功能描述:创建javax.servlet.http.HttpServlet的匿名内部类实例。此实例处理请求实际是使用servletParams的responder:Responder,此Responder类型发生隐式转换,会转换为用户传入的函数参数。
  def createServlet[T <% AnyRef](
      servletParams: ServletParams[T],
      securityMgr: SecurityManager,
      conf: SparkConf): HttpServlet = {

    val allowFramingFrom = conf.getOption("spark.ui.allowFramingFrom")
    val xFrameOptionsValue =
      allowFramingFrom.map(uri => s"ALLOW-FROM $uri").getOrElse("SAMEORIGIN")

    new HttpServlet {
      override def doGet(request: HttpServletRequest, response: HttpServletResponse) {
        try {
          if (securityMgr.checkUIViewPermissions(request.getRemoteUser)) {
            response.setContentType("%s;charset=utf-8".format(servletParams.contentType))
            response.setStatus(HttpServletResponse.SC_OK)
            val result = servletParams.responder(request)
            response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate")
            response.setHeader("X-Frame-Options", xFrameOptionsValue)
            response.getWriter.print(servletParams.extractFn(result))
          } else {
            response.setStatus(HttpServletResponse.SC_UNAUTHORIZED)
            response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate")
            response.sendError(HttpServletResponse.SC_UNAUTHORIZED,
              "User is not authorized to access this page.")
          }
        } catch {
          case e: IllegalArgumentException =>
            response.sendError(HttpServletResponse.SC_BAD_REQUEST, e.getMessage)
          case e: Exception =>
            logWarning(s"GET ${request.getRequestURI} failed: $e", e)
            throw e
        }
      }
      // SPARK-5983 ensure TRACE is not supported
      protected override def doTrace(req: HttpServletRequest, res: HttpServletResponse): Unit = {
        res.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED)
      }
    }
  }

createServletHandler

功能描述:创建以给定路径为前缀的请求的ServletContextHandler。处理步骤如下:

1) 调用createServlet,生成javax.servlet.http.HttpServlet的匿名内部类实例。此实例处理请求实际是使用servletParams的responder:Responder,此Responder类型发生隐式转换,会转换为用户传入的函数参数。

2)调用重载的createServletHandler方法,生成org.eclipse.jetty.servlet.ServletHolder ,并最终生成ServletContextHandler。

createServletHandler的实现如下。
  def createServletHandler[T <% AnyRef](
      path: String,
      servletParams: ServletParams[T],
      securityMgr: SecurityManager,
      conf: SparkConf,
      basePath: String = ""): ServletContextHandler = {
    createServletHandler(path, createServlet(servletParams, securityMgr, conf), basePath)
  }

  /** Create a context handler that responds to a request with the given path prefix */
  def createServletHandler(
      path: String,
      servlet: HttpServlet,
      basePath: String): ServletContextHandler = {
    val prefixedPath = if (basePath == "" && path == "/") {
      path
    } else {
      (basePath + path).stripSuffix("/")
    }
    val contextHandler = new ServletContextHandler
    val holder = new ServletHolder(servlet)
    contextHandler.setContextPath(prefixedPath)
    contextHandler.addServlet(holder, "/")
    contextHandler
  }

createStaticHandler

功能描述:创建对静态目录提供文件服务的ServletContextHandler。
  def createStaticHandler(resourceBase: String, path: String): ServletContextHandler = {
    val contextHandler = new ServletContextHandler
    contextHandler.setInitParameter("org.eclipse.jetty.servlet.Default.gzip", "false")
    val staticHandler = new DefaultServlet
    val holder = new ServletHolder(staticHandler)
    Option(Utils.getSparkClassLoader.getResource(resourceBase)) match {
      case Some(res) =>
        holder.setInitParameter("resourceBase", res.toString)
      case None =>
        throw new Exception("Could not find resource path for Web UI: " + resourceBase)
    }
    contextHandler.setContextPath(path)
    contextHandler.addServlet(holder, "/")
    contextHandler
  }

createRedirectHandler

功能描述:创建将用户对源路径的请求总是重定向到目标路径的ServletContextHandler。
  def createRedirectHandler(
      srcPath: String,
      destPath: String,
      beforeRedirect: HttpServletRequest => Unit = x => (),
      basePath: String = "",
      httpMethods: Set[String] = Set("GET")): ServletContextHandler = {
    val prefixedDestPath = basePath + destPath
    val servlet = new HttpServlet {
      override def doGet(request: HttpServletRequest, response: HttpServletResponse): Unit = {
        if (httpMethods.contains("GET")) {
          doRequest(request, response)
        } else {
          response.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED)
        }
      }
      override def doPost(request: HttpServletRequest, response: HttpServletResponse): Unit = {
        if (httpMethods.contains("POST")) {
          doRequest(request, response)
        } else {
          response.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED)
        }
      }
      private def doRequest(request: HttpServletRequest, response: HttpServletResponse): Unit = {
        beforeRedirect(request)
        // Make sure we don't end up with "//" in the middle
        val newUrl = new URL(new URL(request.getRequestURL.toString), prefixedDestPath).toString
        response.sendRedirect(newUrl)
      }
      // SPARK-5983 ensure TRACE is not supported
      protected override def doTrace(req: HttpServletRequest, res: HttpServletResponse): Unit = {
        res.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED)
      }
    }
    createServletHandler(srcPath, servlet, basePath)
  }

startJettyServer

功能描述:创建以给定路径为前缀的请求的响应处理。处理步骤如下:

1) 将SparkUI中的全部handler加入ContextHandlerCollection。

2) 如果使用配置spark.ui.filters指定了filter,则给所有handler增加filter。

3) 调用Utils的方法startServiceOnPort,最终回调函数connect。

startJettyServer的实现如下。
  def startJettyServer(
      hostName: String,
      port: Int,
      sslOptions: SSLOptions,
      handlers: Seq[ServletContextHandler],
      conf: SparkConf,
      serverName: String = ""): ServerInfo = {

    addFilters(handlers, conf)

    val gzipHandlers = handlers.map { h =>
      h.setVirtualHosts(Array("@" + SPARK_CONNECTOR_NAME))

      val gzipHandler = new GzipHandler
      gzipHandler.setHandler(h)
      gzipHandler
    }

    // Bind to the given port, or throw a java.net.BindException if the port is occupied
    def connect(currentPort: Int): ((Server, Option[Int]), Int) = {
      val pool = new QueuedThreadPool
      if (serverName.nonEmpty) {
        pool.setName(serverName)
      }
      pool.setDaemon(true)

      val server = new Server(pool)
      val connectors = new ArrayBuffer[ServerConnector]()
      val collection = new ContextHandlerCollection

      // Create a connector on port currentPort to listen for HTTP requests
      val httpConnector = new ServerConnector(
        server,
        null,
        // Call this full constructor to set this, which forces daemon threads:
        new ScheduledExecutorScheduler(s"$serverName-JettyScheduler", true),
        null,
        -1,
        -1,
        new HttpConnectionFactory())
      httpConnector.setPort(currentPort)
      connectors += httpConnector

      val httpsConnector = sslOptions.createJettySslContextFactory() match {
        case Some(factory) =>
          // If the new port wraps around, do not try a privileged port.
          val securePort =
            if (currentPort != 0) {
              (currentPort + 400 - 1024) % (65536 - 1024) + 1024
            } else {
              0
            }
          val scheme = "https"
          // Create a connector on port securePort to listen for HTTPS requests
          val connector = new ServerConnector(server, factory)
          connector.setPort(securePort)
          connector.setName(SPARK_CONNECTOR_NAME)
          connectors += connector

          // redirect the HTTP requests to HTTPS port
          httpConnector.setName(REDIRECT_CONNECTOR_NAME)
          collection.addHandler(createRedirectHttpsHandler(securePort, scheme))
          Some(connector)

        case None =>
          // No SSL, so the HTTP connector becomes the official one where all contexts bind.
          httpConnector.setName(SPARK_CONNECTOR_NAME)
          None
      }

      // As each acceptor and each selector will use one thread, the number of threads should at
      // least be the number of acceptors and selectors plus 1. (See SPARK-13776)
      var minThreads = 1
      connectors.foreach { connector =>
        // Currently we only use "SelectChannelConnector"
        // Limit the max acceptor number to 8 so that we don't waste a lot of threads
        connector.setAcceptQueueSize(math.min(connector.getAcceptors, 8))
        connector.setHost(hostName)
        // The number of selectors always equals to the number of acceptors
        minThreads += connector.getAcceptors * 2
      }
      pool.setMaxThreads(math.max(pool.getMaxThreads, minThreads))

      val errorHandler = new ErrorHandler()
      errorHandler.setShowStacks(true)
      errorHandler.setServer(server)
      server.addBean(errorHandler)

      gzipHandlers.foreach(collection.addHandler)
      server.setHandler(collection)

      server.setConnectors(connectors.toArray)
      try {
        server.start()
        ((server, httpsConnector.map(_.getLocalPort())), httpConnector.getLocalPort)
      } catch {
        case e: Exception =>
          server.stop()
          pool.stop()
          throw e
      }
    }

    val ((server, securePort), boundPort) = Utils.startServiceOnPort(port, connect, conf,
      serverName)
    ServerInfo(server, boundPort, securePort,
      server.getHandler().asInstanceOf[ContextHandlerCollection])
  }


关于Spark内核设计的艺术 架构设计与实现

经过近一年的准备,《Spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:


纸质版售卖链接如下:

电子版售卖链接如下:

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
文章
问答
文章排行榜
最热
最新
相关电子书
更多
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载
冬季实战营第三期:MySQL数据库进阶实战
立即下载