分享一个异步任务在遇到IO异常时支持递归回调的辅助方法

简介:
复制代码
public void TryAsyncActionRecursively<TAsyncResult>(
    string asyncActionName,
    Func<Task<TAsyncResult>> asyncAction,
    Action<int> mainAction,
    Action<TAsyncResult> successAction,
    Func<string> getContextInfoFunc,
    Action<Exception> failedAction,
    int retryTimes) where TAsyncResult : AsyncOperationResult
{
    var retryAction = new Action<int>(currentRetryTimes =>
    {
        if (currentRetryTimes >= _immediatelyRetryTimes)
        {
            Task.Factory.StartDelayedTask(_retryIntervalForIOException, () => mainAction(currentRetryTimes + 1));
        }
        else
        {
            mainAction(currentRetryTimes + 1);
        }
    });
    var executeFailedAction = new Action<Exception>(ex =>
    {
        try
        {
            if (failedAction != null)
            {
                failedAction(ex);
            }
        }
        catch (Exception unknownEx)
        {
            _logger.Error(string.Format("Failed to execute the failedCallbackAction of asyncAction:{0}, contextInfo:{1}",
                asyncActionName, getContextInfoFunc()), unknownEx);
        }
    });
    var processTaskException = new Action<Exception, int>((ex, currentRetryTimes) =>
    {
        if (ex is IOException)
        {
            _logger.Error(string.Format("Async task '{0}' has io exception, contextInfo:{1}, current retryTimes:{2}",
                asyncActionName, getContextInfoFunc(), currentRetryTimes), ex);
            retryAction(retryTimes);
        }
        else
        {
            _logger.Error(string.Format("Async task '{0}' has unknown exception, contextInfo:{1}, current retryTimes:{2}",
                asyncActionName, getContextInfoFunc(), currentRetryTimes), ex);
            executeFailedAction(ex);
        }
    });
    var completeAction = new Action<Task<TAsyncResult>>(t =>
    {
        if (t.Exception != null)
        {
            processTaskException(t.Exception.InnerException, retryTimes);
            return;
        }
        if (t.IsCanceled)
        {
            _logger.ErrorFormat("Async task '{0}' was cancelled, contextInfo:{1}, current retryTimes:{2}",
                asyncActionName, getContextInfoFunc(), retryTimes);
            retryAction(retryTimes);
            return;
        }
        var result = t.Result;
        if (result.Status == AsyncOperationResultStatus.IOException)
        {
            _logger.ErrorFormat("Async task '{0}' has io exception, contextInfo:{1}, current retryTimes:{2}, errorMsg:{3}",
                asyncActionName, getContextInfoFunc(), retryTimes, result.ErrorMessage);
            retryAction(retryTimes);
            return;
        }
        if (successAction != null)
        {
            successAction(result);
        }
    });

    try
    {
        asyncAction().ContinueWith(completeAction);
    }
    catch (IOException ex)
    {
        _logger.Error(string.Format("Execute async action '{0}' failed, contextInfo:{1}, current retryTimes:{2}",
            asyncActionName, getContextInfoFunc(), retryTimes), ex);
        retryAction(retryTimes);
    }
    catch (Exception ex)
    {
        _logger.Error(string.Format("Execute async action '{0}' failed, contextInfo:{1}, current retryTimes:{2}",
            asyncActionName, getContextInfoFunc(), retryTimes), ex);
        executeFailedAction(ex);
    }
}
复制代码

该函数的功能是:执行一个异步任务(返回Task的方法),如果执行出现IO异常,则重试当前主函数(mainAction);用户的mainAction中会再次调用TryAsyncActionRecursively方法。从而实现当遇到IO异常时,能做到不断重试。另外,重试只立即重试指定的次数,超过指定次数,则不立即重试,而是暂停一定间隔后再次执行。该函数还提供当acyncAction执行成功或失败后的回调函数,以及允许传入当前上下文的一些说明信息,以便记录有意义的错误日志信息。

下面是使用示例:

复制代码
private void PublishEventAsync(ProcessingCommand processingCommand, EventStream eventStream, int retryTimes)
{
    TryAsyncActionRecursively<AsyncOperationResult>("PublishEventAsync",
    () => _eventPublisher.PublishAsync(eventStream),
    currentRetryTimes => PublishEventAsync(processingCommand, eventStream, currentRetryTimes),
    result =>
    {
        _logger.DebugFormat("Publish events success, {0}", eventStream);
        processingCommand.Complete(new CommandResult(CommandStatus.Success, processingCommand.Command.Id));
    },
    () => string.Format("[eventStream:{0}]", eventStream),
    ex => processingCommand.Complete(new CommandResult(CommandStatus.Failed, processingCommand.Command.Id)),
    retryTimes);
}
复制代码
PublishEventAsync(processingCommand, eventStream, 0);

目录
相关文章
|
11天前
|
并行计算 监控 数据处理
构建高效Python应用:并发与异步编程的实战秘籍,IO与CPU密集型任务一网打尽!
【7月更文挑战第16天】Python并发异步提升性能:使用`asyncio`处理IO密集型任务,如网络请求,借助事件循环实现非阻塞;`multiprocessing`模块用于CPU密集型任务,绕过GIL进行并行计算。通过任务类型识别、任务分割、避免共享状态、利用现代库和性能调优,实现高效编程。示例代码展示异步HTTP请求和多进程数据处理。
26 8
|
10天前
|
并行计算 数据处理 Python
Python并发编程迷雾:IO密集型为何偏爱异步?CPU密集型又该如何应对?
【7月更文挑战第17天】Python并发编程中,异步编程(如`asyncio`)在IO密集型任务中提高效率,利用等待时间执行其他任务。但对CPU密集型任务,由于GIL限制,多线程效率不高,此时应选用`multiprocessing`进行多进程并行计算以突破限制。选择合适的并发策略是关键:异步适合IO,多进程适合CPU。理解这些能帮助构建高效并发程序。
22 6
|
9天前
|
开发框架 并行计算 .NET
从菜鸟到大神:Python并发编程深度剖析,IO与CPU的异步战争!
【7月更文挑战第18天】Python并发涉及多线程、多进程和异步IO(asyncio)。异步IO适合IO密集型任务,如并发HTTP请求,能避免等待提高效率。多进程在CPU密集型任务中更优,因可绕过GIL限制实现并行计算。通过正确选择并发策略,开发者能提升应用性能和响应速度。
|
10天前
|
算法 Java 程序员
解锁Python高效之道:并发与异步在IO与CPU密集型任务中的精准打击策略!
【7月更文挑战第17天】在数据驱动时代,Python凭借其优雅语法和强大库支持成为并发处理大规模数据的首选。并发与异步编程是关键,包括多线程、多进程和异步IO。对于IO密集型任务,如网络请求,可使用`concurrent.futures`和`asyncio`;CPU密集型任务则推荐多进程,如`multiprocessing`;`asyncio`适用于混合任务,实现等待IO时执行CPU任务。通过这些工具,开发者能有效优化资源,提升系统性能。
24 4
|
10天前
|
并行计算 Java 大数据
深度探索:Python异步编程如何优雅征服IO密集型任务,让CPU密集型任务也臣服!
【7月更文挑战第17天】Python的异步编程借助`asyncio`库提升IO密集型任务效率,如并发下载网页,通过`async def`定义协程,`asyncio.gather`并发执行。在CPU密集型任务中,结合`ThreadPoolExecutor`实现并行计算,利用多核优势。`asyncio.run`简化事件循环管理,使Python在高负载场景下表现更佳。
21 4
|
10天前
|
分布式计算 并行计算 Java
Python并发风暴来袭!IO密集型与CPU密集型任务并发策略大比拼,你站哪队?
【7月更文挑战第17天】Python并发处理IO密集型(如网络请求)与CPU密集型(如数学计算)任务。IO密集型适合多线程和异步IO,如`ThreadPoolExecutor`进行网页下载;CPU密集型推荐多进程,如`multiprocessing`模块进行并行计算。选择取决于任务类型,理解任务特性是关键,以实现最佳效率。
|
9天前
|
开发框架 并行计算 .NET
脑洞大开!Python并发与异步编程的哲学思考:IO密集型与CPU密集型任务的智慧选择!
【7月更文挑战第18天】在Python中,异步编程(如`asyncio`)适合处理IO密集型任务,通过非阻塞操作提高响应性,例如使用`aiohttp`进行异步HTTP请求。而对于CPU密集型任务,由于GIL的存在,多进程(`multiprocessing`)能实现并行计算,如使用进程池进行大量计算。明智选择并发模型是性能优化的关键,体现了对任务特性和编程哲学的深刻理解。
16 2
|
9天前
|
UED 开发者 Python
Python并发编程新纪元:异步编程如何重塑IO与CPU密集型任务的处理方式?
【7月更文挑战第18天】Python异步编程提升IO任务效率,非阻塞模式减少等待时间,优化用户体验。asyncio库与await关键字助力编写非阻塞代码,示例展示异步HTTP请求。CPU密集型任务中,异步编程结合多进程可提升效率。异步编程挑战包括代码复杂性,解决策略包括使用类型提示、异步框架及最佳实践。异步编程重塑任务处理方式,成为现代Python开发的关键。
16 2
|
9天前
|
开发框架 并行计算 算法
揭秘Python并发神器:IO密集型与CPU密集型任务的异步革命,你竟还傻傻分不清?
【7月更文挑战第18天】Python并发编程中,异步IO适合IO密集型任务,如异步HTTP请求,利用`asyncio`和`aiohttp`实现并发抓取,避免等待延迟。而对于CPU密集型任务,如并行计算斐波那契数列,多进程通过`multiprocessing`库能绕过GIL限制实现并行计算。选择正确的并发模型能显著提升性能。
16 2
|
11天前
|
开发框架 数据挖掘 .NET
显微镜下的Python并发:细说IO与CPU密集型任务的异步差异,助你精准施策!
【7月更文挑战第16天】在Python并发编程中,理解和区分IO密集型与CPU密集型任务至关重要。IO密集型任务(如网络请求)适合使用异步编程(如`asyncio`),以利用等待时间执行其他任务,提高效率。CPU密集型任务(如计算)则推荐使用多进程(如`multiprocessing`),绕过GIL限制,利用多核CPU。正确选择并发策略能优化应用性能。