taskfactory默认执行慢的问题

简介: Task.Factory.StartNew不是直接创建线程,创建的是任务,它有一个任务队列,然后通过任务调度器把任务分配到线程池中的空闲线程中,如果任务的数量比线程池中的线程多,线程池的线程数量还没有到达上限,就会创建新线程执行任务。如果线程池的线程已到达上限,没有分配到线程的任务需要等待有线程空闲的时候才执行。

学WebSocket,服务端需要监听多个WebSocket客户端发送的消息。

开始的解决方法是每个WebSocket客户端都添加一个线程进行监听,代码如下:



/// <summary>
/// 监听端口 创建WebSocket
/// </summary>
/// <param name="httpListener"></param>
private void CreateWebSocket(HttpListener httpListener)
{
    if (!httpListener.IsListening)
        throw new Exception("HttpListener未启动");
    HttpListenerContext listenerContext =  httpListener.GetContextAsync().Result;
    if (!listenerContext.Request.IsWebSocketRequest)
    {
        CreateWebSocket(httpListener);
        return;
    }
    WebSocketContext webSocket = null;
    try
    {
        webSocket = new WebSocketContext(listenerContext, SubProtocol);
    }
    catch (Exception ex)
    {
        log.Error(ex);
        CreateWebSocket(HttpListener);
        return;
    }
    log.Info($"成功创建WebSocket:{webSocket.ID}");
    int workerThreads = 0, completionPortThreads = 0;
    ThreadPool.GetAvailableThreads(out workerThreads, out completionPortThreads);
    if (workerThreads <= ReservedThreadsCount + 1 || completionPortThreads <= ReservedThreadsCount + 1)
    {
        /**
         * 可用线程小于预留线程数量
         * 通知客户端关闭连接
         * */
        webSocket.CloseAsync(WebSocketCloseStatus.InternalServerError, "可用线程不足,无法连接").Wait();
    }
    else
    {
        if (OnReceiveMessage != null)
            webSocket.OnReceiveMessage += OnReceiveMessage;
        webSocket.OnCloseWebSocket += WebSocket_OnCloseWebSocket;
        webSocketContexts.Add(webSocket);
        // 在线程中监听客户端发送的消息
        ThreadPool.QueueUserWorkItem(new WaitCallback(p =>
        {
            (p as WebSocketContext).ReceiveMessageAsync().Wait();
        }), webSocket);
    }
    CreateWebSocket(HttpListener);
}


在线程中添加监听代码

 

但是可用线程数量是有限的,先连接的客户端一直递归接收消息,导致线程无限占用,后连接上的客户端就没有线程用于监听接受消息了。

接受消息方法如下:


/// <summary>
/// 递归 同步接收消息
/// </summary>
/// <returns></returns>
public void ReceiveMessage()
{
    WebSocket webSocket = HttpListenerWebSocketContext.WebSocket;
    if (webSocket.State != WebSocketState.Open)
        throw new Exception("Http未握手成功,不能接受消息!");
    var byteBuffer = WebSocket.CreateServerBuffer(ReceiveBufferSize);
    WebSocketReceiveResult receiveResult = null;
    try
    {
        receiveResult = webSocket.ReceiveAsync(byteBuffer, cancellationToken).Result;
    }
    catch (WebSocketException ex)
    {
        if (ex.InnerException is HttpListenerException)
        {
            log.Error(ex);
            CloseAsync(WebSocketCloseStatus.ProtocolError, "客户端断开连接" + ex.Message).Wait(TimeSpan.FromSeconds(20));
            return;
        }
        else
        {
            log.Error(ex);
            CloseAsync(WebSocketCloseStatus.ProtocolError, "WebSocket 连接异常" + ex.Message).Wait(TimeSpan.FromSeconds(20));
            return;
        }
    }
    catch (Exception ex)
    {
        log.Error(ex);
        CloseAsync(WebSocketCloseStatus.ProtocolError, "客户端断开连接" + ex.Message).Wait(TimeSpan.FromSeconds(20));
        return;
    }
    if (receiveResult.CloseStatus.HasValue)
    {
        log.Info("接受到关闭消息!");
        CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription).Wait(TimeSpan.FromSeconds(20));
        return;
    }
    byte[] bytes = new byte[receiveResult.Count];
    Array.Copy(byteBuffer.Array, bytes, bytes.Length);
    string message = Encoding.GetString(bytes);
    log.Info($"{ID}接收到消息:{message}");
    if (OnReceiveMessage != null)
        OnReceiveMessage.Invoke(this, message);
    if (!cancellationToken.IsCancellationRequested)
        ReceiveMessage();
}


接受消息方法

这是不能接受的。

后来在Task中看到,在创建Task时可以设置TaskCreationOptions参数

该枚举有个字段LongRunning

LongRunning 2

指定任务将是长时间运行的、粗粒度的操作,涉及比细化的系统更少、更大的组件。 它会向 TaskScheduler 提示,过度订阅可能是合理的。 可以通过过度订阅创建比可用硬件线程数更多的线程。 它还将提示任务计划程序:该任务需要附加线程,以使任务不阻塞本地线程池队列中其他线程或工作项的向前推动。

经过测试,可同时运行的任务数量的确可以超出可用线程数量。

测试如下:

没有设置 TaskCreationOptions.LongRunning  代码如下:


/// <summary>
        /// 测试任务
        /// 只运行了9个任务
        /// </summary>
        [TestMethod]
        public void TestTask1()
        {
            var cts = new CancellationTokenSource();
            int MaxWorkerThreads = 0, MaxCompletionPortThreads = 0;
            ThreadPool.GetMaxThreads(out MaxWorkerThreads, out MaxCompletionPortThreads);
            Console.WriteLine($"最大可用辅助线程数目为{MaxCompletionPortThreads},最大可用异步 I/O 线程数目为{MaxCompletionPortThreads}");
            MaxWorkerThreads = 10;
            MaxCompletionPortThreads = 10;
            Console.WriteLine(@"设置线程池中辅助线程的最大数目为{0}, 线程池中异步 I/O 线程的最大数目为{1}
同时运行30个长时运行线程,每个线程中运行一个同步方法,看是否30个线程是否都能运行。", MaxWorkerThreads, MaxCompletionPortThreads);
            ThreadPool.SetMaxThreads(10, 10);
            ThreadPool.GetMaxThreads(out MaxWorkerThreads, out MaxCompletionPortThreads);
            Console.WriteLine($"最大可用辅助线程数目为{MaxCompletionPortThreads},最大可用异步 I/O 线程数目为{MaxCompletionPortThreads}");
            int count = 0;
            while (count++ < 30)
            {
                Task.Factory.StartNew(p =>
                {
                    int index = (int)p;
                    int runCount = 0;
                    LongRunningTask($"线程{index}", runCount, cts.Token);
                }, count, cts.Token, TaskCreationOptions.None, TaskScheduler.Default);
            }
            Task.Delay(TimeSpan.FromSeconds(10)).Wait(TimeSpan.FromSeconds(20)); // 等待超时,等待任务没有执行
            cts.Cancel();
        }
        /// <summary>
        /// 长时运行任务
        /// 递归运行
        /// </summary>
        /// <param name="taskName">任务名称</param>
        /// <param name="runCount">运行次数</param>
        /// <param name="token">传播有关取消操作的通知</param>
        private void LongRunningTask(string taskName, int runCount, CancellationToken token)
        {
            PrintTask($"任务【{taskName}】线程ID【{Environment.CurrentManagedThreadId}】第【{++runCount}】次运行").Wait();
            if (!token.IsCancellationRequested)
                LongRunningTask(taskName, runCount, token);
        }
        /// <summary>
        /// 异步打印任务 等待1秒后打印消息
        /// </summary>
        /// <param name="message">消息</param>
        /// <returns></returns>
        private Task PrintTask(string message)
        {
            return Task.Factory.StartNew(() =>
            {
                Thread.Sleep(1000);
                Console.WriteLine(message);
            });
        }


测试代码

测试结果


测试用了20秒才完成


主线程创建了一个等待10秒后完成的任务,任务等待超时20秒

说明主程序创建的任务没有执行,而是等待超时了。

 

设置了 TaskCreationOptions.LongRunning  代码如下:


/// <summary>
        /// 测试长时运行任务
        /// 30个任务全部都运行了
        /// </summary>
        [TestMethod]
        public void TestTaskLongRunning()
        {
            var cts = new CancellationTokenSource();
            int MaxWorkerThreads = 0, MaxCompletionPortThreads = 0;
            ThreadPool.GetMaxThreads(out MaxWorkerThreads, out MaxCompletionPortThreads);
            MaxWorkerThreads = 10;
            MaxCompletionPortThreads = 10;
            Console.WriteLine($"最大可用辅助线程数目为{MaxCompletionPortThreads},最大可用异步 I/O 线程数目为{MaxCompletionPortThreads}");
            Console.WriteLine(@"设置线程池中辅助线程的最大数目为{0}, 线程池中异步 I/O 线程的最大数目为{1}
同时运行30个长时运行线程,每个线程中运行一个同步方法,看是否30个线程是否都能运行。", MaxWorkerThreads, MaxCompletionPortThreads);
            ThreadPool.SetMaxThreads(10, 10);
            ThreadPool.GetMaxThreads(out MaxWorkerThreads, out MaxCompletionPortThreads);
            Console.WriteLine($"最大可用辅助线程数目为{MaxCompletionPortThreads},最大可用异步 I/O 线程数目为{MaxCompletionPortThreads}");
            int count = 0;
            while (count++ < 30)
            {
                Task.Factory.StartNew(p =>
                {
                    int index = (int)p;
                    int runCount = 0;
                    LongRunningTask($"线程{index}", runCount, cts.Token);
                }, count, cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
            }
            Task.Delay(TimeSpan.FromSeconds(10)).Wait(TimeSpan.FromSeconds(20));    // 等待没有超时,等待任务有执行
            cts.Cancel();
        }


测试代码

测试结果:


测试用了10秒完成


主线程创建了一个等待10秒后完成的任务,任务等待超时20秒

说明主程序创建的任务立即执行了,程序等待了10秒完成。

使用TaskCreationOptions.LongRunning  需要注意的是Action必须是同步方法同时运行任务书才能超出可以用线程数量,否则不能。

例如:


/// <summary>
        /// 测试长时运行任务
        /// 只运行了前9个任务
        /// </summary>
        [TestMethod]
        public void TestTaskLongRunning2()
        {
            var cts = new CancellationTokenSource();
            int MaxWorkerThreads = 0, MaxCompletionPortThreads = 0;
            ThreadPool.GetMaxThreads(out MaxWorkerThreads, out MaxCompletionPortThreads);
            Console.WriteLine($"最大可用辅助线程数目为{MaxCompletionPortThreads},最大可用异步 I/O 线程数目为{MaxCompletionPortThreads}");
            MaxWorkerThreads = 10;
            MaxCompletionPortThreads = 10;
            Console.WriteLine(@"设置线程池中辅助线程的最大数目为{0}, 线程池中异步 I/O 线程的最大数目为{1}
同时运行30个长时运行线程,每个线程中运行一个异步方法,看是否30个线程是否都能运行。", MaxWorkerThreads, MaxCompletionPortThreads);
            ThreadPool.SetMaxThreads(10, 10);
            ThreadPool.GetMaxThreads(out MaxWorkerThreads, out MaxCompletionPortThreads);
            Console.WriteLine($"最大可用辅助线程数目为{MaxCompletionPortThreads},最大可用异步 I/O 线程数目为{MaxCompletionPortThreads}");
            int count = 0;
            while (count++ < 30)
            {
                Task.Factory.StartNew(async p =>
                {
                    int index = (int)p;
                    int runCount = 0;
                    await LongRunningTaskAsync($"线程{index}", runCount, cts.Token);
                }, count, cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
            }
            Task.Delay(TimeSpan.FromSeconds(10)).Wait(TimeSpan.FromSeconds(20));    // 等待没有超时,等待任务有执行
            cts.Cancel();
        }
        /// <summary>
        /// 异步长时运行任务
        /// </summary>
        /// <param name="taskName">任务名称</param>
        /// <param name="runCount">运行次数</param>
        /// <param name="token">传播有关取消操作的通知</param>
        /// <returns></returns>
        private async Task LongRunningTaskAsync(string taskName, int runCount, CancellationToken token)
        {
            await PrintTask($"任务【{taskName}】线程ID【{Environment.CurrentManagedThreadId}】第【{++runCount}】次运行");
            if (!token.IsCancellationRequested)
                await LongRunningTaskAsync(taskName, runCount, token);
        }


测试代码

 

测试结果


测试用了10秒完成


主线程创建了一个等待10秒后完成的任务,任务等待超时20秒

说明主程序创建的任务立即执行了,程序等待了10秒完成。

WebSocket修改后的监听方法:



/// <summary>
        /// 监听端口 创建WebSocket
        /// </summary>
        /// <param name="httpListener"></param>
        private void CreateWebSocket(HttpListener httpListener)
        {
            if (!httpListener.IsListening)
                throw new Exception("HttpListener未启动");
            HttpListenerContext listenerContext = httpListener.GetContext();
            if (!listenerContext.Request.IsWebSocketRequest)
            {
                CreateWebSocket(httpListener);
                return;
            }
            WebSocketContext webSocket = null;
            try
            {
                webSocket = new WebSocketContext(listenerContext, SubProtocol);
            }
            catch (Exception ex)
            {
                log.Error(ex);
                CreateWebSocket(HttpListener);
                return;
            }
            log.Info($"成功创建WebSocket:{webSocket.ID}");
            int workerThreads = 0, completionPortThreads = 0;
            ThreadPool.GetAvailableThreads(out workerThreads, out completionPortThreads);
            if (OnReceiveMessage != null)
                    webSocket.OnReceiveMessage += OnReceiveMessage;
                webSocket.OnCloseWebSocket += WebSocket_OnCloseWebSocket;
            Task.Factory.StartNew(() =>
            {
                webSocket.ReceiveMessage();
            }, cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);
            CreateWebSocket(HttpListener);
        }

 

修改后的WebSocket服务可以监听超过可用线程数量的客户端

目录
相关文章
|
3月前
|
SQL 存储 关系型数据库
原本可以执行得很快的 SQL 语句,执行速度却比预期的慢很多,原因是什么?如何解决?
原本可以执行得很快的 SQL 语句,执行速度却比预期的慢很多,原因是什么?如何解决?
|
2月前
|
存储 关系型数据库 MySQL
从 MySQL 执行原理告诉你:为什么分页场景下,请求速度非常慢
从 MySQL 执行原理告诉你:为什么分页场景下,请求速度非常慢
26 0
|
9月前
|
SQL 缓存 NoSQL
执行SQL响应比较慢,你有哪些排查思路?
如果面试问你,执行SQL响应慢,你有哪些排查思路和解决方案?这是一位去某里面试的小伙伴跟我分享的面试真题,那今天我给大家来分享一下我的思路。
69 1
|
SQL 关系型数据库 MySQL
如何解决mysql警告:“ InnoDB:page_cleaner:1000毫秒的预期循环用了XXX毫秒设置可能不是最佳的”?
如何解决mysql警告:“ InnoDB:page_cleaner:1000毫秒的预期循环用了XXX毫秒设置可能不是最佳的”?
1010 0
|
11月前
|
数据库
19c初始化数据库提示端口1521占用,但查不到占用的程序[DBT-06103]
19c初始化数据库的时候,提示端口占用,但查不到占用的程序。这个问题很诡异,如果按照提示的思路去查,找占用端口的进程,就走入了错误的方向。
|
JSON 前端开发 关系型数据库
解决mysql 库中间时间查询出来是时间戳方法 【数据库查询出时间,传给前端变为时间戳】【可用】
解决mysql 库中间时间查询出来是时间戳方法 【数据库查询出时间,传给前端变为时间戳】【可用】
276 0
|
SQL 关系型数据库 MySQL
|
云安全 关系型数据库 MySQL
如何解决mysql服务经常会自动终止的问题
最近在帮客户网站搬家的时候遇到一个比较奇怪的问题,网站搬家后服务器的mysql服务偶尔会莫名其妙的自动终止,而重启服务会报“ERROR! The server quit without updating PID file”这个错误,开始认为对方的服务器被攻击了,经过一番排查发现并没有;
2152 0
如何解决mysql服务经常会自动终止的问题
|
SQL 前端开发 关系型数据库
为什么就查了一行数据,执行那么慢?
今天主要介绍一下查了一行数据,为什么慢到人发慌。剖析一下MySQL的底层运行流程!
为什么就查了一行数据,执行那么慢?
|
Java 调度
taskfactory默认执行慢的问题
Task.Factory.StartNew不是直接创建线程,创建的是任务,它有一个任务队列,然后通过任务调度器把任务分配到线程池中的空闲线程中,如果任务的数量比线程池中的线程多,线程池的线程数量还没有到达上限,就会创建新线程执行任务。如果线程池的线程已到达上限,没有分配到线程的任务需要等待有线程空闲的时候才执行。
90 0