「布道师系列文章」众安保险王凯解析 Kafka 网络通信

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 本文由众安保险基础平台 Java 开发专家王凯解析 Kafka 网络通信流程,重点关注请求处理和网络通信模型。文中介绍了生产者与消费者与消息队列的交互,以及服务器端的处理步骤,包括 Acceptor、Processor 和 RequestHandler 的工作原理。此外,还讨论了 Kafka 的线程模型,特别是 KafkaApis 在请求处理中的核心作用。最后,文章提到了 AutoMQ 如何通过优化线程模型和 RequestChannel 实现更高效、有序的处理。参考链接包括 Kafka 3.7、Java NIO 教程和 AutoMQ 相关资料。

作者|众安保险基础平台 Java 开发专家王凯

引言

今天给大家带来的是 Kafka 网路通信主要流程的解析(基于 Apache Kafka 3.7[2])。同时引申分析了业界当前较火的AutoMQ基于Kafka在网络通信层面的优化和提升。

01

如何构建一个基本的请求和处理响应

一个消息队列涉及的网络通信主要有两块:

  • 消息生产者与消息队列服务器之间(Kafka 中是生产者向队列「推」消息)

  • 消息消费者与消息队列服务器之间(Kafka 中是消费者向队列「拉」消息)

图上就是一个从发送消息到收到响应主要经过的流程。Client:1.KafkaProducer 初始化 Sender 线程2.Sender 线程从 RecordAccumulator 中获取攒批好的数据(这里详细的客户端发送可以看 https://mp.weixin.qq.com/s/J2\_O1l81duknfdFvHuBWxw)3.Sender 线程调用 NetworkClient 检查连接(未 ready 需要 initiateConnect)4.Sender 线程调用 NetworkClient 的 doSend 方法将数据写入 KafkaChannel5.Sender 线程调用 NetworkClient 的 poll 进行实际的发送Server:1.KafkaServer 初始化 SocketServer、dataPlaneRequestProcessor(KafkaApis)、dataPlaneRequestHandlerPool2.SocketServer 初始化 RequestChannel、dataPlaneAcceptor3.dataPlaneAcceptor 负责获取连接并分配处理任务给对应的 Processor4.Processor 线程从 newConnections 的 queue 中取出任务进行处理5.Processor 线程处理准备好的 IO 事件

  • configureNewConnections() :创建新连接

  • processNewResponses():发送 Response,并将 Response 放入到 inflightResponses 临时队列

  • poll():执行 NIO poll,获取对应 SocketChannel 上准备就绪的 I/O 操作

  • processCompletedReceives():将接收到的 Request 放入 RequestChannel 队列

  • processCompletedSends():为临时 Response 队列中的 Response 执行回调逻辑

  • processDisconnected():处理因发送失败而导致的连接断开

  • closeExcessConnections():关闭超过配额限制部分的连接

6.KafkaRequestHandler 从 RequestChannel 中获取准备好的事件根据 apiKey 分配给对应的 KafkaApi 进行处理7.KafkaApi 处理完成后,把 response 放入 RequestChannel8.Processor 线程将 response 响应给 client以上就是一个完整的 kafka 发送消息,客户端和服务端的处理流程。

02

Kafka 中的网络通信
1. 服务端通信线程模型

不同于 rocketmq 中通过 netty 实现了高效的网络通信,Kafka 中的相当于通过 java NIO 实现了一个主从 Reactor 模式的网路通信(不熟悉的可以关注 https://jenkov.com/tutorials/java-nio/overview.html)。

DataPlanAcceptor 和 ControlPlanAcceptor 都是 Acceptor 的一个子类,Acceptor 又是一个实现了 Runnable 接口的线程类,Acceptor 的主要目的是监听并且接收 Client 和 Broker 之间的请求,同时建立传输通道(SocketChannel),通过轮询的方式交给一个 Processor 处理。这里还有一个 RequestChannel(ArrayBlockingQueue),用于建立 Processor 和 Handler 的连接,MainReactor(Acceptor)只负责监听 OP_ACCEPT 事件, 监听到之后把 SocketChannel 传递给 SubReactor(Processor), 每个 Processor 都有自己的 Selector,SubReactor 会监听并处理其他的事件,并最终把具体的请求传递给 KafkaRequestHandlerPool。

2. 线程模型中主要组件的初始化

从图上可以看出在 broker 启动时会调用 KafkaServer 的 startup 方法(这里我们默认还是基于 zookeeper 的模式)。startup 方法中主要创建:1.KafkaApis 的处理类:dataPlaneRequestProcessor 和 controlPlaneRequestProcessor 的创建2.KafkaRequestHandlePool:dataPlaneRequestHandlerPool 和 controlPlaneRequestHandlerPool 创建3.socketServer 的初始化4.controlPlaneAcceptorAndProcessor 和 dataPlaneAcceptorAndProcessor 的创建其实这里还有一步图上没有但是也在 startup 方法中非常重要的方法,线程启动:enableRequestProcessing 是通过初始化完成的 socketServer 进行。

3. Processor 的添加和销毁

1.添加

  • broker 启动时添加

  • 主动调整 num.network.threads 处理线程数量

2.启动

  • broker 启动 accetor 时启动 processor

  • 主动调整时启动未启动的新的处理线程

3.移除队列并销毁

  • broker 关闭

  • 主动调整 num.network.threads 处理线程数量时移除多余的线程并关闭

4.KafkaRequestHandlePool 和 KafkaRequestHandler

1.KafkaRequestHandlerPool

真正处理 Kafka 请求的地方,它是一个请求处理线程池,主要负责创建、维护、管理和销毁下辖的请求处理线程。

2.KafkaRequestHandler

真正的业务请求处理线程类,每个请求处理线程实例,负责从 SocketServer 的 RequestChannel 的请求队列中获取请求对象,并进行处理。如下是 KafkaRequestHandler 的线程处理的方法体:

 def run(): Unit = {
  threadRequestChannel.set(requestChannel)
  while (!stopped) {
    // We use a single meter for aggregate idle percentage for the thread pool.
    // Since meter is calculated as total_recorded_value / time_window and
    // time_window is independent of the number of threads, each recorded idle
    // time should be discounted by # threads.
    val startSelectTime = time.nanoseconds
    // 从请求队列中获取下一个待处理的请求
    val req = requestChannel.receiveRequest(300)
    val endTime = time.nanoseconds
    val idleTime = endTime - startSelectTime
    aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get)

    req match {
      case RequestChannel.ShutdownRequest =>
        debug(s"Kafka request handler $id on broker $brokerId received shut down command")
        completeShutdown()
        return

      case callback: RequestChannel.CallbackRequest =>
        val originalRequest = callback.originalRequest
        try {

          // If we've already executed a callback for this request, reset the times and subtract the callback time from the 
          // new dequeue time. This will allow calculation of multiple callback times.
          // Otherwise, set dequeue time to now.
          if (originalRequest.callbackRequestDequeueTimeNanos.isDefined) {
            val prevCallbacksTimeNanos = originalRequest.callbackRequestCompleteTimeNanos.getOrElse(0L) - originalRequest.callbackRequestDequeueTimeNanos.getOrElse(0L)
            originalRequest.callbackRequestCompleteTimeNanos = None
            originalRequest.callbackRequestDequeueTimeNanos = Some(time.nanoseconds() - prevCallbacksTimeNanos)
          } else {
            originalRequest.callbackRequestDequeueTimeNanos = Some(time.nanoseconds())
          }

          threadCurrentRequest.set(originalRequest)
          callback.fun(requestLocal)
        } catch {
          case e: FatalExitError =>
            completeShutdown()
            Exit.exit(e.statusCode)
          case e: Throwable => error("Exception when handling request", e)
        } finally {
          // When handling requests, we try to complete actions after, so we should try to do so here as well.
          apis.tryCompleteActions()
          if (originalRequest.callbackRequestCompleteTimeNanos.isEmpty)
            originalRequest.callbackRequestCompleteTimeNanos = Some(time.nanoseconds())
          threadCurrentRequest.remove()
        }
     // 普通情况由KafkaApis.handle方法执行相应处理逻辑
      case request: RequestChannel.Request =>
        try {
          request.requestDequeueTimeNanos = endTime
          trace(s"Kafka request handler $id on broker $brokerId handling request $request")
          threadCurrentRequest.set(request)
          apis.handle(request, requestLocal)
        } catch {
          case e: FatalExitError =>
            completeShutdown()
            Exit.exit(e.statusCode)
          case e: Throwable => error("Exception when handling request", e)
        } finally {
          threadCurrentRequest.remove()
          request.releaseBuffer()
        }

      case RequestChannel.WakeupRequest => 
        // We should handle this in receiveRequest by polling callbackQueue.
        warn("Received a wakeup request outside of typical usage.")

      case null => // continue
    }
  }
  completeShutdown()
}

这里的 56 行会将任务最重分配给 KafkaApis 的 handle 进行处理。

03

统一的请求处理转发

Kafka 中主要的业务处理方法类其实是 KafkaApis,上面的所有的通信,线程处理类,最终都是为了更好的来到 KafkaApis 的 handle。

 override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
  def handleError(e: Throwable): Unit = {
    error(s"Unexpected error handling request ${request.requestDesc(true)} " +
      s"with context ${request.context}", e)
    requestHelper.handleError(request, e)
  }

  try {
    trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
      s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")

    if (!apiVersionManager.isApiEnabled(request.header.apiKey, request.header.apiVersion)) {
      // The socket server will reject APIs which are not exposed in this scope and close the connection
      // before handing them to the request handler, so this path should not be exercised in practice
      throw new IllegalStateException(s"API ${request.header.apiKey} with version ${request.header.apiVersion} is not enabled")
    }

    request.header.apiKey match {
      case ApiKeys.PRODUCE => handleProduceRequest(request, requestLocal)
      case ApiKeys.FETCH => handleFetchRequest(request)
      case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
      case ApiKeys.METADATA => handleTopicMetadataRequest(request)
      case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
      case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
      case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request, requestLocal)
      case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
      case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request, requestLocal).exceptionally(handleError)
      case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request).exceptionally(handleError)
      case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
      case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request, requestLocal).exceptionally(handleError)
      case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request).exceptionally(handleError)
      case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request).exceptionally(handleError)
      case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request, requestLocal).exceptionally(handleError)
      case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupsRequest(request).exceptionally(handleError)
      case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request).exceptionally(handleError)
      case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
      case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
      case ApiKeys.CREATE_TOPICS => maybeForwardToController(request, handleCreateTopicsRequest)
      case ApiKeys.DELETE_TOPICS => maybeForwardToController(request, handleDeleteTopicsRequest)
      case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
      case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request, requestLocal)
      case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
      case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionsToTxnRequest(request, requestLocal)
      case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request, requestLocal)
      case ApiKeys.END_TXN => handleEndTxnRequest(request, requestLocal)
      case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request, requestLocal)
      case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request, requestLocal).exceptionally(handleError)
      case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
      case ApiKeys.CREATE_ACLS => maybeForwardToController(request, handleCreateAcls)
      case ApiKeys.DELETE_ACLS => maybeForwardToController(request, handleDeleteAcls)
      case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request)
      case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
      case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request)
      case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
      case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
      case ApiKeys.CREATE_PARTITIONS => maybeForwardToController(request, handleCreatePartitionsRequest)
      // Create, renew and expire DelegationTokens must first validate that the connection
      // itself is not authenticated with a delegation token before maybeForwardToController.
      case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateTokenRequest(request)
      case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewTokenRequest(request)
      case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireTokenRequest(request)
      case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
      case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request, requestLocal).exceptionally(handleError)
      case ApiKeys.ELECT_LEADERS => maybeForwardToController(request, handleElectLeaders)
      case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigsRequest(request)
      case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleAlterPartitionReassignmentsRequest)
      case ApiKeys.LIST_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleListPartitionReassignmentsRequest)
      case ApiKeys.OFFSET_DELETE => handleOffsetDeleteRequest(request, requestLocal).exceptionally(handleError)
      case ApiKeys.DESCRIBE_CLIENT_QUOTAS => handleDescribeClientQuotasRequest(request)
      case ApiKeys.ALTER_CLIENT_QUOTAS => maybeForwardToController(request, handleAlterClientQuotasRequest)
      case ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS => handleDescribeUserScramCredentialsRequest(request)
      case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => maybeForwardToController(request, handleAlterUserScramCredentialsRequest)
      case ApiKeys.ALTER_PARTITION => handleAlterPartitionRequest(request)
      case ApiKeys.UPDATE_FEATURES => maybeForwardToController(request, handleUpdateFeatures)
      case ApiKeys.ENVELOPE => handleEnvelope(request, requestLocal)
      case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request)
      case ApiKeys.DESCRIBE_PRODUCERS => handleDescribeProducersRequest(request)
      case ApiKeys.UNREGISTER_BROKER => forwardToControllerOrFail(request)
      case ApiKeys.DESCRIBE_TRANSACTIONS => handleDescribeTransactionsRequest(request)
      case ApiKeys.LIST_TRANSACTIONS => handleListTransactionsRequest(request)
      case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request)
      case ApiKeys.DESCRIBE_QUORUM => forwardToControllerOrFail(request)
      case ApiKeys.CONSUMER_GROUP_HEARTBEAT => handleConsumerGroupHeartbeat(request).exceptionally(handleError)
      case ApiKeys.CONSUMER_GROUP_DESCRIBE => handleConsumerGroupDescribe(request).exceptionally(handleError)
      case ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS => handleGetTelemetrySubscriptionsRequest(request)
      case ApiKeys.PUSH_TELEMETRY => handlePushTelemetryRequest(request)
      case ApiKeys.LIST_CLIENT_METRICS_RESOURCES => handleListClientMetricsResources(request)
      case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}")
    }
  } catch {
    case e: FatalExitError => throw e
    case e: Throwable => handleError(e)
  } finally {
    // try to complete delayed action. In order to avoid conflicting locking, the actions to complete delayed requests
    // are kept in a queue. We add the logic to check the ReplicaManager queue at the end of KafkaApis.handle() and the
    // expiration thread for certain delayed operations (e.g. DelayedJoin)
    // Delayed fetches are also completed by ReplicaFetcherThread.
    replicaManager.tryCompleteActions()
    // The local completion time may be set while processing the request. Only record it if it's unset.
    if (request.apiLocalCompleteTimeNanos < 0)
      request.apiLocalCompleteTimeNanos = time.nanoseconds
  }
}

从上面代码可以看到负责副本管理的 ReplicaManager、维护消费者组的 GroupCoordinator ,操作 Controller 组件的 KafkaController,还有我们最常用到的 KafkaProducer.send(发送消息)和 KafkaConcumser.consume(消费消息)。

04

AutoMQ 中线程模型

1. 处理线程的优化

AutoMQ 参照 CPU 的流水线将 Kafka 的处理模型优化成流水线模式,兼顾了顺序性和高效两方面。

  • 顺序性:TCP 连接与线程绑定,对于同一个 TCP 连接有且只有一个网络线程在解析请求,并且有且只有一个 RequestHandler 线程在进行业务逻辑处理;

  • 高效:不同阶段流水线化,网络线程解析完 MSG1 后就可以立马解析 MSG2,无需等待 MSG1 持久化完成。同理 RequestHandler 对 MSG1 进行完校验 & 定序后,立马就可以开始处理 MSG2;同时为了进一步提高持久化的效率,AutoMQ 还会将数据攒批进行刷盘持久化。

2. 通道 RequestChannel 的优化

AutoMQ 将 RequestChannel 进行了多队列改造,通过多队列模式,可以做到对于相同连接的请求都被放入相同一个队列,并且只被特定的 KafkaRequestHandler 进行业务逻辑处理,保障了检验 & 定序阶段内部的顺序处理。

  • 队列和 KafkaRequestHandler 一一映射,数量保持一致;

  • Processor 解析完请求后,根据 hash(channelId) % N 来决定路由到特定的队列。

参考资料
[1]AutoMQ:https://github.com/AutoMQ/automq
[2]Kafka3.7:https://github.com/apache/kafka/releases/tag/3.7.0
[3]JAVANIO:https://jenkov.com/tutorials/java-nio/overview.html
[4]AutoMQ 线程优化:https://mp.weixin.qq.com/s/kDZJgUnMoc5K8jTuV08OJw

END

关于我们

我们是来自 Apache RocketMQ 和 Linux LVS 项目的核心团队,曾经见证并应对过消息队列基础设施在大型互联网公司和云计算公司的挑战。现在我们基于对象存储优先、存算分离、多云原生等技术理念,重新设计并实现了 Apache Kafka 和 Apache RocketMQ,带来高达 10 倍的成本优势和百倍的弹性效率提升。

🌟 GitHub 地址:https://github.com/AutoMQ/automq
💻 官网:https://www.automq.com

目录
相关文章
|
12天前
|
机器学习/深度学习 数据可视化 PyTorch
深入解析图神经网络注意力机制:数学原理与可视化实现
本文深入解析了图神经网络(GNNs)中自注意力机制的内部运作原理,通过可视化和数学推导揭示其工作机制。文章采用“位置-转移图”概念框架,并使用NumPy实现代码示例,逐步拆解自注意力层的计算过程。文中详细展示了从节点特征矩阵、邻接矩阵到生成注意力权重的具体步骤,并通过四个类(GAL1至GAL4)模拟了整个计算流程。最终,结合实际PyTorch Geometric库中的代码,对比分析了核心逻辑,为理解GNN自注意力机制提供了清晰的学习路径。
161 7
深入解析图神经网络注意力机制:数学原理与可视化实现
|
2月前
|
监控 安全 网络安全
深入解析PDCERF:网络安全应急响应的六阶段方法
PDCERF是网络安全应急响应的六阶段方法,涵盖准备、检测、抑制、根除、恢复和跟进。本文详细解析各阶段目标与操作步骤,并附图例,助读者理解与应用,提升组织应对安全事件的能力。
431 89
|
2天前
|
XML JavaScript Android开发
【Android】网络技术知识总结之WebView,HttpURLConnection,OKHttp,XML的pull解析方式
本文总结了Android中几种常用的网络技术,包括WebView、HttpURLConnection、OKHttp和XML的Pull解析方式。每种技术都有其独特的特点和适用场景。理解并熟练运用这些技术,可以帮助开发者构建高效、可靠的网络应用程序。通过示例代码和详细解释,本文为开发者提供了实用的参考和指导。
41 15
|
3天前
|
JavaScript 算法 前端开发
JS数组操作方法全景图,全网最全构建完整知识网络!js数组操作方法全集(实现筛选转换、随机排序洗牌算法、复杂数据处理统计等情景详解,附大量源码和易错点解析)
这些方法提供了对数组的全面操作,包括搜索、遍历、转换和聚合等。通过分为原地操作方法、非原地操作方法和其他方法便于您理解和记忆,并熟悉他们各自的使用方法与使用范围。详细的案例与进阶使用,方便您理解数组操作的底层原理。链式调用的几个案例,让您玩转数组操作。 只有锻炼思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
|
12天前
|
缓存 边缘计算 安全
阿里云CDN:全球加速网络的实践创新与价值解析
在数字化浪潮下,用户体验成为企业竞争力的核心。阿里云CDN凭借技术创新与全球化布局,提供高效稳定的加速解决方案。其三层优化体系(智能调度、缓存策略、安全防护)确保低延迟和高命中率,覆盖2800+全球节点,支持电商、教育、游戏等行业,帮助企业节省带宽成本,提升加载速度和安全性。未来,阿里云CDN将继续引领内容分发的行业标准。
58 7
|
18天前
|
云安全 人工智能 安全
阿里云网络安全体系解析:如何构建数字时代的"安全盾牌"
在数字经济时代,阿里云作为亚太地区最大的云服务提供商,构建了行业领先的网络安全体系。本文解析其网络安全架构的三大核心维度:基础架构安全、核心技术防护和安全管理体系。通过技术创新与体系化防御,阿里云为企业数字化转型提供坚实的安全屏障,确保数据安全与业务连续性。案例显示,某金融客户借助阿里云成功拦截3200万次攻击,降低运维成本40%,响应时间缩短至8分钟。未来,阿里云将继续推进自适应安全架构,助力企业提升核心竞争力。
|
2月前
|
网络协议 Unix Linux
深入解析:Linux网络配置工具ifconfig与ip命令的全面对比
虽然 `ifconfig`作为一个经典的网络配置工具,简单易用,但其功能已经不能满足现代网络配置的需求。相比之下,`ip`命令不仅功能全面,而且提供了一致且简洁的语法,适用于各种网络配置场景。因此,在实际使用中,推荐逐步过渡到 `ip`命令,以更好地适应现代网络管理需求。
63 11
|
3月前
|
机器学习/深度学习 人工智能 算法
深入解析图神经网络:Graph Transformer的算法基础与工程实践
Graph Transformer是一种结合了Transformer自注意力机制与图神经网络(GNNs)特点的神经网络模型,专为处理图结构数据而设计。它通过改进的数据表示方法、自注意力机制、拉普拉斯位置编码、消息传递与聚合机制等核心技术,实现了对图中节点间关系信息的高效处理及长程依赖关系的捕捉,显著提升了图相关任务的性能。本文详细解析了Graph Transformer的技术原理、实现细节及应用场景,并通过图书推荐系统的实例,展示了其在实际问题解决中的强大能力。
353 30
|
3月前
|
网络协议
TCP报文格式全解析:网络小白变高手的必读指南
本文深入解析TCP报文格式,涵盖源端口、目的端口、序号、确认序号、首部长度、标志字段、窗口大小、检验和、紧急指针及选项字段。每个字段的作用和意义详尽说明,帮助理解TCP协议如何确保可靠的数据传输,是互联网通信的基石。通过学习这些内容,读者可以更好地掌握TCP的工作原理及其在网络中的应用。
|
3月前
|
网络协议 安全 网络安全
探索网络模型与协议:从OSI到HTTPs的原理解析
OSI七层网络模型和TCP/IP四层模型是理解和设计计算机网络的框架。OSI模型包括物理层、数据链路层、网络层、传输层、会话层、表示层和应用层,而TCP/IP模型则简化为链路层、网络层、传输层和 HTTPS协议基于HTTP并通过TLS/SSL加密数据,确保安全传输。其连接过程涉及TCP三次握手、SSL证书验证、对称密钥交换等步骤,以保障通信的安全性和完整性。数字信封技术使用非对称加密和数字证书确保数据的机密性和身份认证。 浏览器通过Https访问网站的过程包括输入网址、DNS解析、建立TCP连接、发送HTTPS请求、接收响应、验证证书和解析网页内容等步骤,确保用户与服务器之间的安全通信。
218 3

热门文章

最新文章

推荐镜像

更多