.net core 源码解析-web app是如何启动并接收处理请求(二) kestrel的启动

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: 上篇讲到.net core web app是如何启动并接受请求的,下面接着探索kestrel server是如何完成此任务的。1.kestrel server的入口KestrelServer.Start(Microsoft.AspNetCore.Hosting.Server.IHttpApplication)FrameFactory创建的frame实例最终会交给libuv的loop回调接收请求。

上篇讲到.net core web app是如何启动并接受请求的,下面接着探索kestrel server是如何完成此任务的。

1.kestrel server的入口KestrelServer.Start(Microsoft.AspNetCore.Hosting.Server.IHttpApplication)

FrameFactory创建的frame实例最终会交给libuv的loop回调接收请求。但是在这过程中还是有很多的初始化工作需要做的。后面我们就管中窥豹来看一看。

public void Start<TContext>(IHttpApplication<TContext> application)
{
    var engine = new KestrelEngine(new ServiceContext
    {
        FrameFactory = context =>
        {
            return new Frame<TContext>(application, context);
        },
        AppLifetime = _applicationLifetime,
        Log = trace,
        ThreadPool = new LoggingThreadPool(trace),
        DateHeaderValueManager = dateHeaderValueManager,
        ServerOptions = Options
    });
    //启动引擎。完成libuv的配置和启动
    engine.Start(threadCount);
    //针对绑定的多个地址创建server来接收请求。也就是针对ip:port来启动tcp监听
    foreach (var address in _serverAddresses.Addresses.ToArray())
    {
        engine.CreateServer(ipv4Address);
    }
}

2.启动kestrel engine。engine.Start(threadCount);

启动绑定的端口*最大处理线程的thread。并初始化libuv组件。
每一个线程初始化libuv,注册loop回调等,并启动libuv。

public void Start(int count)
{
    for (var index = 0; index < count; index++)
    {
        Threads.Add(new KestrelThread(this));
    }
    foreach (var thread in Threads)
    {
        thread.StartAsync().Wait();
    }
}
private void ThreadStart(object parameter)
{
    lock (_startSync)
    {
        var tcs = (TaskCompletionSource<int>) parameter;
        try
        {
            //初始化loop
            _loop.Init(_engine.Libuv);
            //注册loop回调
            //EnqueueCloseHandle:持有的资源释放后的回调方法,回调往queue内增加一个item,事件循环该queue完成资源的最终释放
            _post.Init(_loop, OnPost, EnqueueCloseHandle);
            //注册心跳定时器
            _heartbeatTimer.Init(_loop, EnqueueCloseHandle);
            //启动心跳定时器
            _heartbeatTimer.Start(OnHeartbeat, timeout: HeartbeatMilliseconds, repeat: HeartbeatMilliseconds);
            _initCompleted = true;
            tcs.SetResult(0);
        }
        catch (Exception ex)
        {
            tcs.SetException(ex);
            return;
        }
    }
    try
    {
        //当前线程执行到Run()这里会挂起
        _loop.Run();
        //应用程序stop,shutdown之类的情况,libuv唤醒当前线程,完成资源清理
        if (_stopImmediate)
        {
            // thread-abort form of exit, resources will be leaked
            //线程中止形式的退出,资源会被泄露。
            return;
        }

        // run the loop one more time to delete the open handles
        //再次运行循环以删除打开的句柄
        _post.Reference();
        _post.Dispose();
        _heartbeatTimer.Dispose();

        // Ensure the Dispose operations complete in the event loop.
        //确保事件循环中的Dispose操作完成。
        _loop.Run();

        _loop.Dispose();
    }
    catch (Exception ex)
    {
        _closeError = ExceptionDispatchInfo.Capture(ex);
        // Request shutdown so we can rethrow this exception
        // in Stop which should be observable.
        //请求关闭,以便我们可以重新抛出此异常在停止应该是可观察的。
        _appLifetime.StopApplication();
    }
    finally
    {
        _threadTcs.SetResult(null);
    }
}

3.libuv启动完成之后,接着就是处理订阅注册tcp了。

回到1的kestrel的start中。接着执行engine.CreateServer(ipv4Address);,这里和.net 里面的tcplistener不太一样。.net里面就是listener bind,start,accept就好了。而libuv涉及到一个多路io复用的概念,这也是为什么使用他能高并发的原因。

public IDisposable CreateServer(ServerAddress address)
{
    var usingPipes = address.IsUnixPipe;
    var pipeName = (Libuv.IsWindows ? @"\\.\pipe\kestrel_" : "/tmp/kestrel_") + Guid.NewGuid().ToString("n");
    var single = Threads.Count == 1;
    var first = true;

    foreach (var thread in Threads)
    {
        if(single){}//single就不考虑,这种情况真是环境是不会这样玩的
        else if (first)
        {
            //根据当前平台创建tcp listener
            var listener = usingPipes
                ? (ListenerPrimary)new PipeListenerPrimary(ServiceContext)
                : new TcpListenerPrimary(ServiceContext);
            listener.StartAsync(pipeName, address, thread).Wait();
        }
        else
        {
            //如果是多次对同一个ip:port做监听
            var listener = usingPipes
                ? (ListenerSecondary)new PipeListenerSecondary(ServiceContext)
                : new TcpListenerSecondary(ServiceContext);
            listener.StartAsync(pipeName, address, thread).Wait();
        }
        first = false;
    }
}

tcplistener启动细节,这里就只看TcpListenerPrimary了。

首先说明一下TcpListenerPrimary这个类的继承关系:TcpListenerPrimary -->ListenerPrimary -->Listener。这样才有助于后续代码的理解。
后续代码到处都能看到thread.post/postaysnc的代码。这玩意的意思是把传入的action放到libuv loop中,并激活异步完成回调。libuv另一个重要的概念各种回调。
1.接着上面的代码,我们进入TcpListenerPrimary.StartAsync()方法。方法在ListenerPrimary中。

public async Task StartAsync(string pipeName, ServerAddress address, KestrelThread thread)
{
    _pipeName = pipeName;
    await StartAsync(address, thread).ConfigureAwait(false);
    await Thread.PostAsync(state => ((ListenerPrimary)state).PostCallback(), this).ConfigureAwait(false);
}

2.接着上面的代码进入StartAsync(address, thread)。他是父类Listener的方法。

public Task StartAsync(ServerAddress address, KestrelThread thread)
{
    ServerAddress = address; Thread = thread;
    var tcs = new TaskCompletionSource<int>(this);
    Thread.Post(state =>
    {
        var tcs2 = (TaskCompletionSource<int>)state;
        var listener = ((Listener)tcs2.Task.AsyncState);
        //创建socket
        listener.ListenSocket = listener.CreateListenSocket();
        ////socket监听,libu注册监听并设置回调函数,最大队列。
        ListenSocket.Listen(Constants.ListenBacklog, ConnectionCallback, this);
        tcs2.SetResult(0);
    }, tcs);
    return tcs.Task;
}
protected override UvStreamHandle CreateListenSocket()
{
    //初始化socket并bind到address
    var socket = new UvTcpHandle(Log);
    socket.Init(Thread.Loop, Thread.QueueCloseHandle);
    //是否使用Nagle's algorithm算法。
    socket.NoDelay(ServerOptions.NoDelay);
    socket.Bind(ServerAddress);
    // If requested port was "0", replace with assigned dynamic port.
    ServerAddress.Port = socket.GetSockIPEndPoint().Port;
    return socket;
}

在接着上面的代码ListenSocket.Listen成功之后,libuv回调ConnectionCallback函数。

进入ConnectionCallback函数,完成重要的listen Accept.

step1:listen成功libuv回调ConnectionCallback方法。
step2:初始化接收请求socket,并将之关联到监听socket
step3:适配接收请求socket,如果是第一次适配的话则创建connection
step4:创建connection并启动
step5:new connection 关联 Frame对象。
step6:启动frame
step7:由Connection类调用一次以开始RequestProcessingAsync循环。
step8:循环接收请求,接收请求到之后交给上层程序处理

private static void ConnectionCallback(UvStreamHandle stream, int status, Exception error, object state)
{
    var listener = (Listener)state;
    listener.OnConnection(stream, status);//step 1
}
protected override void OnConnection(UvStreamHandle listenSocket, int status)//step 2
{
    var acceptSocket = new UvTcpHandle(Log);
    acceptSocket.Init(Thread.Loop, Thread.QueueCloseHandle);
    acceptSocket.NoDelay(ServerOptions.NoDelay);
    listenSocket.Accept(acceptSocket);
    DispatchConnection(acceptSocket);
}
protected override void DispatchConnection(UvStreamHandle socket)// step 3
{
    var index = _dispatchIndex++ % (_dispatchPipes.Count + 1);
    if (index == _dispatchPipes.Count)
    {
        base.DispatchConnection(socket);
    }
    else
    {
        DetachFromIOCP(socket);
        var dispatchPipe = _dispatchPipes[index];
        var write = new UvWriteReq(Log);
        write.Init(Thread.Loop);
        write.Write2(dispatchPipe, _dummyMessage, socket,
            (write2, status, error, state) =>
            {
                write2.Dispose();
                ((UvStreamHandle)state).Dispose();
            },
            socket);
    }
}
protected virtual void DispatchConnection(UvStreamHandle socket)//step 4
{
    var connection = new Connection(this, socket);
    connection.Start();
}

private Func<ConnectionContext, Frame> FrameFactory => ListenerContext.ServiceContext.FrameFactory;
public Connection(ListenerContext context, UvStreamHandle socket) : base(context)//step 5
{
    SocketInput = new SocketInput(Thread.Memory, ThreadPool, _bufferSizeControl);
    SocketOutput = new SocketOutput(Thread, _socket, this, ConnectionId, Log, ThreadPool);
    //重点代码在这里,FrameFactory是一个委托,是KestrelServer.Start中注册的action
    _frame = FrameFactory(this);
}
public void Start()//step 6
{
    Log.ConnectionStart(ConnectionId);
    // Start socket prior to applying the ConnectionFilter
    _socket.ReadStart(_allocCallback, _readCallback, this);
    _frame.Start();
}
/// <summary>
/// Called once by Connection class to begin the RequestProcessingAsync loop.
/// </summary>
public void Start()//step 7
{
    Reset();
    _requestProcessingTask =
        Task.Factory.StartNew(
            (o) => ((Frame)o).RequestProcessingAsync(),
            this,
            default(CancellationToken),
            TaskCreationOptions.DenyChildAttach,
            TaskScheduler.Default).Unwrap();
}
/// <summary>
/// 主循环消耗套接字输入,将其解析为协议帧,并调用应用程序委托,只要套接字打算保持打开。
/// 从此循环得到的任务将保留在服务器需要时使用的字段中以排除和关闭所有当前活动的连接。
/// </summary>
public override async Task RequestProcessingAsync()
{
    while (!_requestProcessingStopping)
    {
        InitializeHeaders();
        var context = _application.CreateContext(this);
        await _application.ProcessRequestAsync(context).ConfigureAwait(false);
    }
}
目录
相关文章
|
30天前
|
开发框架 前端开发 JavaScript
ASP.NET Web Pages - 教程
ASP.NET Web Pages 是一种用于创建动态网页的开发模式,采用HTML、CSS、JavaScript 和服务器脚本。本教程聚焦于Web Pages,介绍如何使用Razor语法结合服务器端代码与前端技术,以及利用WebMatrix工具进行开发。适合初学者入门ASP.NET。
|
3月前
|
XML JSON API
ServiceStack:不仅仅是一个高性能Web API和微服务框架,更是一站式解决方案——深入解析其多协议支持及简便开发流程,带您体验前所未有的.NET开发效率革命
【10月更文挑战第9天】ServiceStack 是一个高性能的 Web API 和微服务框架,支持 JSON、XML、CSV 等多种数据格式。它简化了 .NET 应用的开发流程,提供了直观的 RESTful 服务构建方式。ServiceStack 支持高并发请求和复杂业务逻辑,安装简单,通过 NuGet 包管理器即可快速集成。示例代码展示了如何创建一个返回当前日期的简单服务,包括定义请求和响应 DTO、实现服务逻辑、配置路由和宿主。ServiceStack 还支持 WebSocket、SignalR 等实时通信协议,具备自动验证、自动过滤器等丰富功能,适合快速搭建高性能、可扩展的服务端应用。
171 3
|
3天前
|
运维 前端开发 C#
一套以用户体验出发的.NET8 Web开源框架
一套以用户体验出发的.NET8 Web开源框架
一套以用户体验出发的.NET8 Web开源框架
|
8天前
|
JSON 数据格式
.net HTTP请求类封装
`HttpRequestHelper` 是一个用于简化 HTTP 请求的辅助类,支持发送 GET 和 POST 请求。它使用 `HttpClient` 发起请求,并通过 `Newtonsoft.Json` 处理 JSON 数据。示例展示了如何使用该类发送请求并处理响应。注意事项包括:简单的错误处理、需安装 `Newtonsoft.Json` 依赖,以及建议重用 `HttpClient` 实例以优化性能。
49 2
|
30天前
|
开发框架 .NET PHP
ASP.NET Web Pages - 添加 Razor 代码
ASP.NET Web Pages 使用 Razor 标记添加服务器端代码,支持 C# 和 Visual Basic。Razor 语法简洁易学,类似于 ASP 和 PHP。例如,在网页中加入 `@DateTime.Now` 可以实时显示当前时间。
|
3月前
|
测试技术 API 开发者
精通.NET单元测试:MSTest、xUnit、NUnit全面解析
【10月更文挑战第15天】本文介绍了.NET生态系统中最流行的三种单元测试框架:MSTest、xUnit和NUnit。通过示例代码展示了每种框架的基本用法和特点,帮助开发者根据项目需求和个人偏好选择合适的测试工具。
49 3
|
4月前
|
移动开发 Android开发 数据安全/隐私保护
移动应用与系统的技术演进:从开发到操作系统的全景解析随着智能手机和平板电脑的普及,移动应用(App)已成为人们日常生活中不可或缺的一部分。无论是社交、娱乐、购物还是办公,移动应用都扮演着重要的角色。而支撑这些应用运行的,正是功能强大且复杂的移动操作系统。本文将深入探讨移动应用的开发过程及其背后的操作系统机制,揭示这一领域的技术演进。
本文旨在提供关于移动应用与系统技术的全面概述,涵盖移动应用的开发生命周期、主要移动操作系统的特点以及它们之间的竞争关系。我们将探讨如何高效地开发移动应用,并分析iOS和Android两大主流操作系统的技术优势与局限。同时,本文还将讨论跨平台解决方案的兴起及其对移动开发领域的影响。通过这篇技术性文章,读者将获得对移动应用开发及操作系统深层理解的钥匙。
109 12
|
4月前
|
开发框架 监控 前端开发
在 ASP.NET Core Web API 中使用操作筛选器统一处理通用操作
【9月更文挑战第27天】操作筛选器是ASP.NET Core MVC和Web API中的一种过滤器,可在操作方法执行前后运行代码,适用于日志记录、性能监控和验证等场景。通过实现`IActionFilter`接口的`OnActionExecuting`和`OnActionExecuted`方法,可以统一处理日志、验证及异常。创建并注册自定义筛选器类,能提升代码的可维护性和复用性。
|
4月前
|
开发框架 .NET 中间件
ASP.NET Core Web 开发浅谈
本文介绍ASP.NET Core,一个轻量级、开源的跨平台框架,专为构建高性能Web应用设计。通过简单步骤,你将学会创建首个Web应用。文章还深入探讨了路由配置、依赖注入及安全性配置等常见问题,并提供了实用示例代码以助于理解与避免错误,帮助开发者更好地掌握ASP.NET Core的核心概念。
115 3
|
3月前
|
SQL 开发框架 .NET
ASP.NET连接SQL数据库:实现过程与关键细节解析an3.021-6232.com
随着互联网技术的快速发展,ASP.NET作为一种广泛使用的服务器端开发技术,其与数据库的交互操作成为了应用开发中的重要环节。本文将详细介绍在ASP.NET中如何连接SQL数据库,包括连接的基本概念、实现步骤、关键代码示例以及常见问题的解决方案。由于篇幅限制,本文不能保证达到完整的2000字,但会确保