C# 同步 异步 回调 状态机 async await Demo
为什么会研究这个?
我们项目的客户端和服务端通信用的是WCF,我就想,能不能用异步的方式调用WCF服务呢?或者说能不能用async await的方式调用WCF服务呢?
然后我发现WCF是通过BeginXXX和EndXXX这种回调的方式实现异步的,似乎不支持async await语法糖,那只能通过状态机的方式实现了,不然多个请求就会写成回调地狱。
如果用Task.Run包起来,里面再写个Wait,这就是假异步,还不如不用异步,按原来非异步的方式写。
研究了回调和状态机之后,又看了相关的博客,就像一篇博客里说的,调用WCF不难,异步调用WCF不难,异步调用WCF并保证调用顺序也不难,难的是实现了这些之后,代码的可读性和可维护性。
所以我的结论是,调用WCF,如果没有特殊需求,还是不要异步的好,写起来复杂。
C# 同步 异步 回调 状态机 async await Demo
主要演示了不使用async、await语法糖,通过回调函数和状态机实现异步
为什么要写这个Demo?
为了帮助理解异步,async、await经过编译生成的状态机代码,有点复杂,这个Demo里写了一个相对简单的状态机代码,便于理解
代码说明
代码中主要写了三种下载文件的示例
- 同步方式下载文件,为了防止下载较大文件时卡住界面,代码在线程中执行,文件下载完成之前,始终占用一个线程
- 异步方式下载文件,使用了async、await语法糖,下载文件时,可以看到,workerThreads(可用线程数)和completionPortThreads(可用异步线程数)会发生变化,但是不会长时间占用一个线程
- 异步方式下载文件,不使用async、await语法糖,通过回调函数和状态机实现,workerThreads(可用线程数)和completionPortThreads(可用异步线程数)会发生变化,但是不会长时间占用一个线程
结论:异步的本质是回调,异步是回调的语法糖
- 相比同步方式,使用异步方式下载文件时,忽略掉误差,下载速度并没有更快,异步的主要优点是不会长时间占用一个线程
- 在没有async、await语法糖时,使用回调函数和状态机也可以实现异步,但代码写的不够优雅,心智负担重,所以async、await的另一个优点是使代码简单
- 异步的本质就是回调,C#异步底层是通过系统级回调和状态机实现的,async、await会被编译成状态机代码,相关于用代码生成器生成了代码,但这个代码自己写的话,心智负担重
- 不使用async、await语法糖的前提下,使用状态机可以避免回调地狱
- 即使不使用async、await语法糖,依然感受到了异步的侵入性,没办法在底层完全封装起来,代码一直侵入到控件的事件里,使用async、await也是,你从代码的底层,写async一直写到控件的事件层
思考:
经测试,button4_Click、button5_Click、button6_Click在下载大文件时,耗时较长,但是界面并不会卡住,依然可以自由拖动窗体或点击其它按钮
button4_Click、button5_Click、button6_Click方法分别调用了AsyncFunctionWithCallback方法
AsyncFunctionWithCallback方法调用了HttpUtil类的HttpDownloadFileAsyncWithCallback方法
请仔细观察上面的方法,它们没有使用new Thread、Task.Run、Task.Factory.StartNew等线程相关的方法,
也就是说执行长时间操作,没有阻塞界面,但是又没有使用线程,
实际上下载文件过程中,没有使用C#的CLR线程池,测试时,workerThreads数量不会变化,但是你会看到completionPortThreads数量变化,
下载文件的过程使用了异步线程池,但是下载大文件时,并不会长时间占用异步线程池
Demo截图
测试代码Form1.cs
using Models; using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.IO; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Windows.Forms; using Utils; namespace AsyncAwaitDemo { public partial class Form1 : Form { private DateTime _dt1; private System.Timers.Timer _timer; private string _testFileDownloadUrl = "http://desk-fd.zol-img.com.cn/t_s1920x1080c5/g6/M00/06/00/ChMkKWHXqV-IJFpUAEJatCdq_LUAAXW8AA1PvwAQlrM029.jpg?downfile=1642227460763.jpg"; //文件下载测试URL //private string _testFileDownloadUrl = "https://down21.xiazaidb.com/app/xiaomanghe.apk"; //文件下载测试URL //private string _testFileDownloadUrl = "https://codeload.github.com/0611163/DBHelper/zip/refs/heads/master"; //文件下载测试URL //private string _testFileDownloadUrl = "http://down-ww5.537a.com/soft/3/ce/com.yhong.muchun_51982ada.apk"; //文件下载测试URL //private string _testFileDownloadUrl = "https://dl.360safe.com/netunion/20140425/360se+191727+n3f07b78190.exe"; //文件下载测试URL 大一点的文件 public Form1() { InitializeComponent(); } private void Form1_Load(object sender, EventArgs e) { //定时器会使用线程池中的一个线程 _timer = new System.Timers.Timer(); _timer.Interval = 100; _timer.Elapsed += _timer_Elapsed; _timer.Start(); } #region Log private void Log(string log) { if (!this.IsDisposed) { string msg = DateTime.Now.ToString("mm:ss.fff") + " " + log + "\r\n\r\n"; if (this.InvokeRequired) { this.BeginInvoke(new Action(() => { textBox1.AppendText(msg); })); } else { textBox1.AppendText(msg); } } } #endregion #region _timer_Elapsed private void _timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) { int workerThreads; int completionPortThreads; int maxWorkerThreads; int maxCompletionPortThreads; ThreadPool.GetAvailableThreads(out workerThreads, out completionPortThreads); ThreadPool.GetMaxThreads(out maxWorkerThreads, out maxCompletionPortThreads); this.BeginInvoke(new Action(() => { label1.Text = "程序当前使用线程数:" + (maxWorkerThreads - workerThreads) + " workerThreads:" + workerThreads.ToString() + " completionPortThreads:" + completionPortThreads; })); } #endregion #region button1_Click 测试同步方法 private void button1_Click(object sender, EventArgs e) { Task.Run(() => //下载文件是耗时操作,需要在线程中执行,否则界面卡住 { Log("开始"); DateTime dt = DateTime.Now; //输入参数 string arg1 = "1"; string arg2 = "2"; string arg3 = "3"; //方法调用 string result1 = SyncFunction(arg1); string result2 = SyncFunction(arg2); string result3 = SyncFunction(arg3); //输出结果 Log(result1); Log(result2); Log(result3); Log("结束,耗时:" + DateTime.Now.Subtract(dt).TotalSeconds.ToString("0.000")); }); } #endregion #region button2_Click 测试异步方法(三个异步方法并行,按顺序输出) private async void button2_Click(object sender, EventArgs e) { Log("开始"); DateTime dt = DateTime.Now; //输入参数 string arg1 = "1"; string arg2 = "2"; string arg3 = "3"; //方法调用 var t1 = AsyncFunction(arg1); var t2 = AsyncFunction(arg2); var t3 = AsyncFunction(arg3); string result1 = await t1; string result2 = await t2; string result3 = await t3; //输出结果 Log(result1); Log(result2); Log(result3); Log("结束,耗时:" + DateTime.Now.Subtract(dt).TotalSeconds.ToString("0.000")); } #endregion #region button3_Click 测试异步方法(三个异步方法顺序执行,按顺序输出) private async void button3_Click(object sender, EventArgs e) { Log("开始"); DateTime dt = DateTime.Now; //输入参数 string arg1 = "1"; string arg2 = "2"; string arg3 = "3"; //方法调用 string result1 = await AsyncFunction(arg1); string result2 = await AsyncFunction(arg2); string result3 = await AsyncFunction(arg3); //输出结果 Log(result1); Log(result2); Log(result3); Log("结束,耗时:" + DateTime.Now.Subtract(dt).TotalSeconds.ToString("0.000")); } #endregion #region button4_Click 测试状态机(三个异步方法顺序执行,按顺序输出)(等价于button3_Click) private void button4_Click(object sender, EventArgs e) { List<object> resultList = new List<object>(); //回调、状态机,不使用async await语法糖 Action<int, object> awaitAction = null; awaitAction = (state, result) => { //输入参数 string arg1 = "1"; string arg2 = "2"; string arg3 = "3"; //方法调用 if (state == 0) { Log("开始"); _dt1 = DateTime.Now; AsyncFunctionWithCallback(arg1, r => awaitAction(1, r)); return; } if (state == 1) { resultList.Add(result); AsyncFunctionWithCallback(arg2, r => awaitAction(2, r)); return; } if (state == 2) { resultList.Add(result); AsyncFunctionWithCallback(arg3, r => awaitAction(3, r)); return; } //输出结果 if (state == 3) { resultList.Add(result); foreach (object item in resultList) { Log(item != null ? item.ToString() : "null"); } Log("结束,耗时:" + DateTime.Now.Subtract(_dt1).TotalSeconds.ToString("0.000")); return; } }; awaitAction(0, null); } #endregion #region button5_Click 测试状态机(三个异步方法并行,按顺序输出)(等价于button2_Click) private void button5_Click(object sender, EventArgs e) { object[] resultArray = new object[3]; //是否已调用回调函数 bool _completedCalled = false; //锁 object _lock = new object(); //输出结果 Action<object[]> output = arr => { if (arr.Count(a => a != null) == 3) { lock (_lock) { if (!_completedCalled) { _completedCalled = true; foreach (object item in arr) { Log(item != null ? item.ToString() : "null"); } Log("结束,耗时:" + DateTime.Now.Subtract(_dt1).TotalSeconds.ToString("0.000")); } } } }; //回调、状态机,不使用async await语法糖 Action<int, object> awaitAction = null; awaitAction = (state, result) => { //输入参数 string arg1 = "1"; string arg2 = "2"; string arg3 = "3"; //方法调用 if (state == 0) { Log("开始"); _dt1 = DateTime.Now; AsyncFunctionWithCallback(arg1, r => awaitAction(1, r)); AsyncFunctionWithCallback(arg2, r => awaitAction(2, r)); AsyncFunctionWithCallback(arg3, r => awaitAction(3, r)); return; } //输出结果 if (state == 1) { resultArray[0] = result; output(resultArray); return; } if (state == 2) { resultArray[1] = result; output(resultArray); return; } if (state == 3) { resultArray[2] = result; output(resultArray); return; } }; awaitAction(0, null); } #endregion #region button6_Click 测试状态机(三个异步方法并行,按顺序输出)(等价于button2_Click)(相对于button5_Click更优雅的实现) private void button6_Click(object sender, EventArgs e) { Log("开始"); DateTime dt = DateTime.Now; //输入参数 string arg1 = "1"; string arg2 = "2"; string arg3 = "3"; //操作结果 object result1 = null; object result2 = null; object result3 = null; //方法调用 Await await = new Await(); await.Add(() => AsyncFunctionWithCallback(arg1, r => await.Collect(r, out result1))) .Add(() => AsyncFunctionWithCallback(arg2, r => await.Collect(r, out result2))) .Add(() => AsyncFunctionWithCallback(arg3, r => await.Collect(r, out result3))) .Completed(resultList => //输出结果 { Log(result1 != null ? result1.ToString() : "null"); Log(result2 != null ? result2.ToString() : "null"); Log(result3 != null ? result3.ToString() : "null"); Log("结束,耗时:" + DateTime.Now.Subtract(dt).TotalSeconds.ToString("0.000")); }) .Start(); } #endregion #region button7_Click 不使用状态机,执行带回调的异步方法(基本等价于button1_Click) private void button7_Click(object sender, EventArgs e) { Task.Run(() => { Log("开始"); DateTime dt = DateTime.Now; //输入参数 string arg1 = "1"; string arg2 = "2"; string arg3 = "3"; //操作结果 object result1 = null; object result2 = null; object result3 = null; //方法调用 AsyncFunctionWithCallback(arg1, r => result1 = r); AsyncFunctionWithCallback(arg2, r => result2 = r); AsyncFunctionWithCallback(arg3, r => result3 = r); //等待三个操作结果全部返回,三个操作结果返回之前,始终占用一个线程 while (result1 == null || result2 == null || result3 == null) { Thread.Sleep(1); } //输出结果 Log(result1 != null ? result1.ToString() : "null"); Log(result2 != null ? result2.ToString() : "null"); Log(result3 != null ? result3.ToString() : "null"); Log("结束,耗时:" + DateTime.Now.Subtract(dt).TotalSeconds.ToString("0.000")); }); } #endregion #region 同步方法 /// <summary> /// 同步方法 /// </summary> private string SyncFunction(string arg) { MemoryStream ms = HttpUtil.HttpDownloadFile(_testFileDownloadUrl); return "同步方法 arg=" + arg + " 下载文件的字节数组长度=" + ms.Length; } #endregion #region 异步方法 /// <summary> /// 异步方法 /// </summary> private async Task<string> AsyncFunction(string arg) { MemoryStream ms = await HttpUtil.HttpDownloadFileAsync(_testFileDownloadUrl); return "异步方法 arg=" + arg + " 下载文件的字节数组长度=" + ms.Length; } #endregion #region 带回调的异步方法 /// <summary> /// 带回调的异步方法 /// </summary> /// <param name="arg">传入参数</param> /// <param name="callback">回调</param> /// <param name="nextState">下一个状态</param> private void AsyncFunctionWithCallback(string arg, Action<object> callback) { HttpUtil.HttpDownloadFileAsyncWithCallback(_testFileDownloadUrl, new HttpUtilAsyncState() { State = 0 }, obj => { if (obj is Exception) { Exception ex = obj as Exception; Log("错误:" + ex.Message); } else { MemoryStream ms = obj as MemoryStream; string result = "带回调的异步方法 arg=" + arg + " 下载文件的字节数组长度=" + ms.Length; callback(result); } }); } #endregion #region 用Task模拟的异步方法 /* #region 异步方法 /// <summary> /// 异步方法 /// </summary> private Task<string> AsyncFunction(string arg) { return Task.Run<string>(() => { Thread.Sleep(2000); return "异步方法 arg=" + arg; }); } #endregion #region 带回调的异步方法 /// <summary> /// 带回调的异步方法 /// </summary> /// <param name="arg">传入参数</param> /// <param name="callback">回调</param> /// <param name="nextState">下一个状态</param> private void AsyncFunctionWithCallback(string arg, Action<object> callback) { Task.Run(() => { if (arg == "1") { Thread.Sleep(2000); } else { Thread.Sleep(2000); } string result = "带回调的异步方法 arg=" + arg; callback(result); }); } #endregion */ #endregion } }
异步工具类Await.cs
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Utils { /// <summary> /// 异步工具类 /// 代替async await语法糖 /// </summary> public class Await { /// <summary> /// 状态 /// </summary> private int _state { get; set; } /// <summary> /// 异步操作集合 /// </summary> private List<Action> _actionList = new List<Action>(); /// <summary> /// 操作结果集合 /// </summary> private List<object> _resultList = new List<object>(); /// <summary> /// 完成后的回调 /// </summary> private Action<List<object>> _completedCallback; /// <summary> /// 是否已调用回调函数 /// </summary> private bool _completedCalled = false; /// <summary> /// 锁 /// </summary> private object _lock = new object(); /// <summary> /// 异步工具类 构造函数 /// </summary> public Await() { _state = 0; } /// <summary> /// 添加一个异步操作 /// </summary> public Await Add(Action action) { _actionList.Add(action); return this; } /// <summary> /// 开始执行 /// </summary> public Await Start() { foreach (Action action in _actionList) { action(); } return this; } /// <summary> /// 收集结果 /// </summary> public Await Collect(object result, out object outResult) { outResult = result; _resultList.Add(result); lock (_lock) { _state++; if (_state == _actionList.Count && _completedCallback != null && !_completedCalled) { _completedCalled = true; _completedCallback(_resultList); } } return this; } /// <summary> /// 注册完成后的回调函数 /// </summary> public Await Completed(Action<List<object>> completedCallback) { this._completedCallback = completedCallback; return this; } } }
文件下载工具类HttpUtil.cs
下面的代码使用了三种方式实现文件下载
- 同步方式实现文件下载
- 异步方式实现文件下载
- 通过回调和状态机实现的异步下载
using Models; using System; using System.Collections.Generic; using System.Collections.Specialized; using System.IO; using System.Linq; using System.Net; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Utils { /// <summary> /// Http上传下载文件 /// </summary> public class HttpUtil { #region HttpDownloadFile 下载文件 /// <summary> /// 下载文件 /// </summary> /// <param name="url">下载文件url路径</param> /// <param name="cookie">cookie</param> public static MemoryStream HttpDownloadFile(string url, CookieContainer cookie = null, WebHeaderCollection headers = null) { try { // 设置参数 HttpWebRequest request = WebRequest.Create(url) as HttpWebRequest; request.Method = "GET"; request.CookieContainer = cookie; if (headers != null) { foreach (string key in headers.Keys) { request.Headers.Add(key, headers[key]); } } //发送请求并获取相应回应数据 HttpWebResponse response = request.GetResponse() as HttpWebResponse; //直到request.GetResponse()程序才开始向目标网页发送Post请求 Stream responseStream = response.GetResponseStream(); //创建写入流 MemoryStream stream = new MemoryStream(); byte[] bArr = new byte[10240]; int size = responseStream.Read(bArr, 0, (int)bArr.Length); while (size > 0) { stream.Write(bArr, 0, size); size = responseStream.Read(bArr, 0, (int)bArr.Length); } stream.Seek(0, SeekOrigin.Begin); responseStream.Close(); return stream; } catch (Exception ex) { throw ex; } } #endregion #region HttpDownloadFile 下载文件(异步) /// <summary> /// 下载文件 /// </summary> /// <param name="url">下载文件url路径</param> /// <param name="cookie">cookie</param> public static async Task<MemoryStream> HttpDownloadFileAsync(string url, CookieContainer cookie = null, WebHeaderCollection headers = null) { try { // 设置参数 HttpWebRequest request = WebRequest.Create(url) as HttpWebRequest; request.Method = "GET"; request.CookieContainer = cookie; if (headers != null) { foreach (string key in headers.Keys) { request.Headers.Add(key, headers[key]); } } //发送请求并获取相应回应数据 HttpWebResponse response = await request.GetResponseAsync() as HttpWebResponse; //直到request.GetResponse()程序才开始向目标网页发送Post请求 Stream responseStream = response.GetResponseStream(); //创建写入流 MemoryStream stream = new MemoryStream(); byte[] bArr = new byte[10240]; int size = await responseStream.ReadAsync(bArr, 0, (int)bArr.Length); while (size > 0) { stream.Write(bArr, 0, size); size = await responseStream.ReadAsync(bArr, 0, (int)bArr.Length); } stream.Seek(0, SeekOrigin.Begin); responseStream.Close(); return stream; } catch (Exception ex) { throw ex; } } #endregion #region HttpDownloadFile 下载文件(基于回调的异步) /// <summary> /// 下载文件 /// </summary> /// <param name="url">下载文件url路径</param> /// <param name="cookie">cookie</param> public static void HttpDownloadFileAsyncWithCallback(string url, HttpUtilAsyncState state = null, Action<object> callback = null, CookieContainer cookie = null, WebHeaderCollection headers = null) { try { if (state.State == 0) { // 设置参数 HttpWebRequest request = WebRequest.Create(url) as HttpWebRequest; request.Method = "GET"; request.CookieContainer = cookie; if (headers != null) { foreach (string key in headers.Keys) { request.Headers.Add(key, headers[key]); } } //发送请求并获取相应回应数据 request.BeginGetResponse(asyncResult => { HttpUtilAsyncState asyncState = asyncResult.AsyncState as HttpUtilAsyncState; try { HttpWebResponse response = request.EndGetResponse(asyncResult) as HttpWebResponse; asyncState.ResponseStream = response.GetResponseStream(); HttpDownloadFileAsyncWithCallback(null, asyncState, callback); } catch (Exception ex) { asyncState.Exception = ex; asyncState.State = 2; HttpDownloadFileAsyncWithCallback(null, asyncState, callback); } }, new HttpUtilAsyncState { Request = request, State = 1 }); return; } if (state.State == 1) { byte[] bArr = new byte[10240]; state.ResponseStream.BeginRead(bArr, 0, (int)bArr.Length, asyncResult => { HttpUtilAsyncState asyncState = asyncResult.AsyncState as HttpUtilAsyncState; try { int size = state.ResponseStream.EndRead(asyncResult); if (size > 0) { state.Stream.Write(bArr, 0, size); HttpDownloadFileAsyncWithCallback(null, asyncState, callback); } else { asyncState.State = 2; HttpDownloadFileAsyncWithCallback(null, asyncState, callback); } } catch (Exception ex) { asyncState.Exception = ex; asyncState.State = 2; HttpDownloadFileAsyncWithCallback(null, asyncState, callback); } }, state); return; } if (state.State == 2) { if (state.Exception != null) { callback(state.Exception); return; } else { state.Stream.Seek(0, SeekOrigin.Begin); state.ResponseStream.Close(); callback(state.Stream); return; } } } catch (Exception ex) { throw ex; } } #endregion } }