.net core 源码解析-web app是如何启动并接收处理请求(二) kestrel的启动

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 上篇讲到.net core web app是如何启动并接受请求的,下面接着探索kestrel server是如何完成此任务的。1.kestrel server的入口KestrelServer.Start(Microsoft.AspNetCore.Hosting.Server.IHttpApplication)FrameFactory创建的frame实例最终会交给libuv的loop回调接收请求。

上篇讲到.net core web app是如何启动并接受请求的,下面接着探索kestrel server是如何完成此任务的。

1.kestrel server的入口KestrelServer.Start(Microsoft.AspNetCore.Hosting.Server.IHttpApplication)

FrameFactory创建的frame实例最终会交给libuv的loop回调接收请求。但是在这过程中还是有很多的初始化工作需要做的。后面我们就管中窥豹来看一看。

public void Start<TContext>(IHttpApplication<TContext> application)
{
    var engine = new KestrelEngine(new ServiceContext
    {
        FrameFactory = context =>
        {
            return new Frame<TContext>(application, context);
        },
        AppLifetime = _applicationLifetime,
        Log = trace,
        ThreadPool = new LoggingThreadPool(trace),
        DateHeaderValueManager = dateHeaderValueManager,
        ServerOptions = Options
    });
    //启动引擎。完成libuv的配置和启动
    engine.Start(threadCount);
    //针对绑定的多个地址创建server来接收请求。也就是针对ip:port来启动tcp监听
    foreach (var address in _serverAddresses.Addresses.ToArray())
    {
        engine.CreateServer(ipv4Address);
    }
}

2.启动kestrel engine。engine.Start(threadCount);

启动绑定的端口*最大处理线程的thread。并初始化libuv组件。
每一个线程初始化libuv,注册loop回调等,并启动libuv。

public void Start(int count)
{
    for (var index = 0; index < count; index++)
    {
        Threads.Add(new KestrelThread(this));
    }
    foreach (var thread in Threads)
    {
        thread.StartAsync().Wait();
    }
}
private void ThreadStart(object parameter)
{
    lock (_startSync)
    {
        var tcs = (TaskCompletionSource<int>) parameter;
        try
        {
            //初始化loop
            _loop.Init(_engine.Libuv);
            //注册loop回调
            //EnqueueCloseHandle:持有的资源释放后的回调方法,回调往queue内增加一个item,事件循环该queue完成资源的最终释放
            _post.Init(_loop, OnPost, EnqueueCloseHandle);
            //注册心跳定时器
            _heartbeatTimer.Init(_loop, EnqueueCloseHandle);
            //启动心跳定时器
            _heartbeatTimer.Start(OnHeartbeat, timeout: HeartbeatMilliseconds, repeat: HeartbeatMilliseconds);
            _initCompleted = true;
            tcs.SetResult(0);
        }
        catch (Exception ex)
        {
            tcs.SetException(ex);
            return;
        }
    }
    try
    {
        //当前线程执行到Run()这里会挂起
        _loop.Run();
        //应用程序stop,shutdown之类的情况,libuv唤醒当前线程,完成资源清理
        if (_stopImmediate)
        {
            // thread-abort form of exit, resources will be leaked
            //线程中止形式的退出,资源会被泄露。
            return;
        }

        // run the loop one more time to delete the open handles
        //再次运行循环以删除打开的句柄
        _post.Reference();
        _post.Dispose();
        _heartbeatTimer.Dispose();

        // Ensure the Dispose operations complete in the event loop.
        //确保事件循环中的Dispose操作完成。
        _loop.Run();

        _loop.Dispose();
    }
    catch (Exception ex)
    {
        _closeError = ExceptionDispatchInfo.Capture(ex);
        // Request shutdown so we can rethrow this exception
        // in Stop which should be observable.
        //请求关闭,以便我们可以重新抛出此异常在停止应该是可观察的。
        _appLifetime.StopApplication();
    }
    finally
    {
        _threadTcs.SetResult(null);
    }
}

3.libuv启动完成之后,接着就是处理订阅注册tcp了。

回到1的kestrel的start中。接着执行engine.CreateServer(ipv4Address);,这里和.net 里面的tcplistener不太一样。.net里面就是listener bind,start,accept就好了。而libuv涉及到一个多路io复用的概念,这也是为什么使用他能高并发的原因。

public IDisposable CreateServer(ServerAddress address)
{
    var usingPipes = address.IsUnixPipe;
    var pipeName = (Libuv.IsWindows ? @"\\.\pipe\kestrel_" : "/tmp/kestrel_") + Guid.NewGuid().ToString("n");
    var single = Threads.Count == 1;
    var first = true;

    foreach (var thread in Threads)
    {
        if(single){}//single就不考虑,这种情况真是环境是不会这样玩的
        else if (first)
        {
            //根据当前平台创建tcp listener
            var listener = usingPipes
                ? (ListenerPrimary)new PipeListenerPrimary(ServiceContext)
                : new TcpListenerPrimary(ServiceContext);
            listener.StartAsync(pipeName, address, thread).Wait();
        }
        else
        {
            //如果是多次对同一个ip:port做监听
            var listener = usingPipes
                ? (ListenerSecondary)new PipeListenerSecondary(ServiceContext)
                : new TcpListenerSecondary(ServiceContext);
            listener.StartAsync(pipeName, address, thread).Wait();
        }
        first = false;
    }
}

tcplistener启动细节,这里就只看TcpListenerPrimary了。

首先说明一下TcpListenerPrimary这个类的继承关系:TcpListenerPrimary -->ListenerPrimary -->Listener。这样才有助于后续代码的理解。
后续代码到处都能看到thread.post/postaysnc的代码。这玩意的意思是把传入的action放到libuv loop中,并激活异步完成回调。libuv另一个重要的概念各种回调。
1.接着上面的代码,我们进入TcpListenerPrimary.StartAsync()方法。方法在ListenerPrimary中。

public async Task StartAsync(string pipeName, ServerAddress address, KestrelThread thread)
{
    _pipeName = pipeName;
    await StartAsync(address, thread).ConfigureAwait(false);
    await Thread.PostAsync(state => ((ListenerPrimary)state).PostCallback(), this).ConfigureAwait(false);
}

2.接着上面的代码进入StartAsync(address, thread)。他是父类Listener的方法。

public Task StartAsync(ServerAddress address, KestrelThread thread)
{
    ServerAddress = address; Thread = thread;
    var tcs = new TaskCompletionSource<int>(this);
    Thread.Post(state =>
    {
        var tcs2 = (TaskCompletionSource<int>)state;
        var listener = ((Listener)tcs2.Task.AsyncState);
        //创建socket
        listener.ListenSocket = listener.CreateListenSocket();
        ////socket监听,libu注册监听并设置回调函数,最大队列。
        ListenSocket.Listen(Constants.ListenBacklog, ConnectionCallback, this);
        tcs2.SetResult(0);
    }, tcs);
    return tcs.Task;
}
protected override UvStreamHandle CreateListenSocket()
{
    //初始化socket并bind到address
    var socket = new UvTcpHandle(Log);
    socket.Init(Thread.Loop, Thread.QueueCloseHandle);
    //是否使用Nagle's algorithm算法。
    socket.NoDelay(ServerOptions.NoDelay);
    socket.Bind(ServerAddress);
    // If requested port was "0", replace with assigned dynamic port.
    ServerAddress.Port = socket.GetSockIPEndPoint().Port;
    return socket;
}

在接着上面的代码ListenSocket.Listen成功之后,libuv回调ConnectionCallback函数。

进入ConnectionCallback函数,完成重要的listen Accept.

step1:listen成功libuv回调ConnectionCallback方法。
step2:初始化接收请求socket,并将之关联到监听socket
step3:适配接收请求socket,如果是第一次适配的话则创建connection
step4:创建connection并启动
step5:new connection 关联 Frame对象。
step6:启动frame
step7:由Connection类调用一次以开始RequestProcessingAsync循环。
step8:循环接收请求,接收请求到之后交给上层程序处理

private static void ConnectionCallback(UvStreamHandle stream, int status, Exception error, object state)
{
    var listener = (Listener)state;
    listener.OnConnection(stream, status);//step 1
}
protected override void OnConnection(UvStreamHandle listenSocket, int status)//step 2
{
    var acceptSocket = new UvTcpHandle(Log);
    acceptSocket.Init(Thread.Loop, Thread.QueueCloseHandle);
    acceptSocket.NoDelay(ServerOptions.NoDelay);
    listenSocket.Accept(acceptSocket);
    DispatchConnection(acceptSocket);
}
protected override void DispatchConnection(UvStreamHandle socket)// step 3
{
    var index = _dispatchIndex++ % (_dispatchPipes.Count + 1);
    if (index == _dispatchPipes.Count)
    {
        base.DispatchConnection(socket);
    }
    else
    {
        DetachFromIOCP(socket);
        var dispatchPipe = _dispatchPipes[index];
        var write = new UvWriteReq(Log);
        write.Init(Thread.Loop);
        write.Write2(dispatchPipe, _dummyMessage, socket,
            (write2, status, error, state) =>
            {
                write2.Dispose();
                ((UvStreamHandle)state).Dispose();
            },
            socket);
    }
}
protected virtual void DispatchConnection(UvStreamHandle socket)//step 4
{
    var connection = new Connection(this, socket);
    connection.Start();
}

private Func<ConnectionContext, Frame> FrameFactory => ListenerContext.ServiceContext.FrameFactory;
public Connection(ListenerContext context, UvStreamHandle socket) : base(context)//step 5
{
    SocketInput = new SocketInput(Thread.Memory, ThreadPool, _bufferSizeControl);
    SocketOutput = new SocketOutput(Thread, _socket, this, ConnectionId, Log, ThreadPool);
    //重点代码在这里,FrameFactory是一个委托,是KestrelServer.Start中注册的action
    _frame = FrameFactory(this);
}
public void Start()//step 6
{
    Log.ConnectionStart(ConnectionId);
    // Start socket prior to applying the ConnectionFilter
    _socket.ReadStart(_allocCallback, _readCallback, this);
    _frame.Start();
}
/// <summary>
/// Called once by Connection class to begin the RequestProcessingAsync loop.
/// </summary>
public void Start()//step 7
{
    Reset();
    _requestProcessingTask =
        Task.Factory.StartNew(
            (o) => ((Frame)o).RequestProcessingAsync(),
            this,
            default(CancellationToken),
            TaskCreationOptions.DenyChildAttach,
            TaskScheduler.Default).Unwrap();
}
/// <summary>
/// 主循环消耗套接字输入,将其解析为协议帧,并调用应用程序委托,只要套接字打算保持打开。
/// 从此循环得到的任务将保留在服务器需要时使用的字段中以排除和关闭所有当前活动的连接。
/// </summary>
public override async Task RequestProcessingAsync()
{
    while (!_requestProcessingStopping)
    {
        InitializeHeaders();
        var context = _application.CreateContext(this);
        await _application.ProcessRequestAsync(context).ConfigureAwait(false);
    }
}
目录
相关文章
|
2月前
|
开发框架 .NET 开发者
简化 ASP.NET Core 依赖注入(DI)注册-Scrutor
Scrutor 是一个简化 ASP.NET Core 应用程序中依赖注入(DI)注册过程的开源库,支持自动扫描和注册服务。通过简单的配置,开发者可以轻松地从指定程序集中筛选、注册服务,并设置其生命周期,同时支持服务装饰等高级功能。适用于大型项目,提高代码的可维护性和简洁性。仓库地址:&lt;https://github.com/khellang/Scrutor&gt;
60 5
|
4月前
|
XML JSON API
ServiceStack:不仅仅是一个高性能Web API和微服务框架,更是一站式解决方案——深入解析其多协议支持及简便开发流程,带您体验前所未有的.NET开发效率革命
【10月更文挑战第9天】ServiceStack 是一个高性能的 Web API 和微服务框架,支持 JSON、XML、CSV 等多种数据格式。它简化了 .NET 应用的开发流程,提供了直观的 RESTful 服务构建方式。ServiceStack 支持高并发请求和复杂业务逻辑,安装简单,通过 NuGet 包管理器即可快速集成。示例代码展示了如何创建一个返回当前日期的简单服务,包括定义请求和响应 DTO、实现服务逻辑、配置路由和宿主。ServiceStack 还支持 WebSocket、SignalR 等实时通信协议,具备自动验证、自动过滤器等丰富功能,适合快速搭建高性能、可扩展的服务端应用。
259 3
|
2月前
|
开发框架 算法 中间件
ASP.NET Core 中的速率限制中间件
在ASP.NET Core中,速率限制中间件用于控制客户端请求速率,防止服务器过载并提高安全性。通过`AddRateLimiter`注册服务,并配置不同策略如固定窗口、滑动窗口、令牌桶和并发限制。这些策略可在全局、控制器或动作级别应用,支持自定义响应处理。使用中间件`UseRateLimiter`启用限流功能,并可通过属性禁用特定控制器或动作的限流。这有助于有效保护API免受滥用和过载。 欢迎关注我的公众号:Net分享 (239字符)
62 1
|
3月前
|
开发框架 .NET C#
在 ASP.NET Core 中创建 gRPC 客户端和服务器
本文介绍了如何使用 gRPC 框架搭建一个简单的“Hello World”示例。首先创建了一个名为 GrpcDemo 的解决方案,其中包含一个 gRPC 服务端项目 GrpcServer 和一个客户端项目 GrpcClient。服务端通过定义 `greeter.proto` 文件中的服务和消息类型,实现了一个简单的问候服务 `GreeterService`。客户端则通过 gRPC 客户端库连接到服务端并调用其 `SayHello` 方法,展示了 gRPC 在 C# 中的基本使用方法。
64 5
在 ASP.NET Core 中创建 gRPC 客户端和服务器
|
2月前
|
开发框架 缓存 .NET
GraphQL 与 ASP.NET Core 集成:从入门到精通
本文详细介绍了如何在ASP.NET Core中集成GraphQL,包括安装必要的NuGet包、创建GraphQL Schema、配置GraphQL服务等步骤。同时,文章还探讨了常见问题及其解决方法,如处理复杂查询、错误处理、性能优化和实现认证授权等,旨在帮助开发者构建灵活且高效的API。
57 3
|
2月前
|
开发框架 .NET PHP
ASP.NET Web Pages - 添加 Razor 代码
ASP.NET Web Pages 使用 Razor 标记添加服务器端代码,支持 C# 和 Visual Basic。Razor 语法简洁易学,类似于 ASP 和 PHP。例如,在网页中加入 `@DateTime.Now` 可以实时显示当前时间。
|
3月前
|
开发框架 监控 .NET
【Azure App Service】部署在App Service上的.NET应用内存消耗不能超过2GB的情况分析
x64 dotnet runtime is not installed on the app service by default. Since we had the app service running in x64, it was proxying the request to a 32 bit dotnet process which was throwing an OutOfMemoryException with requests >100MB. It worked on the IaaS servers because we had the x64 runtime install
|
4月前
|
测试技术 API 开发者
精通.NET单元测试:MSTest、xUnit、NUnit全面解析
【10月更文挑战第15天】本文介绍了.NET生态系统中最流行的三种单元测试框架:MSTest、xUnit和NUnit。通过示例代码展示了每种框架的基本用法和特点,帮助开发者根据项目需求和个人偏好选择合适的测试工具。
116 3
|
4月前
|
开发框架 JavaScript 前端开发
一个适用于 ASP.NET Core 的轻量级插件框架
一个适用于 ASP.NET Core 的轻量级插件框架

热门文章

最新文章

推荐镜像

更多