分享一个异步任务在遇到IO异常时支持递归回调的辅助方法

简介:
复制代码
public void TryAsyncActionRecursively<TAsyncResult>(
    string asyncActionName,
    Func<Task<TAsyncResult>> asyncAction,
    Action<int> mainAction,
    Action<TAsyncResult> successAction,
    Func<string> getContextInfoFunc,
    Action<Exception> failedAction,
    int retryTimes) where TAsyncResult : AsyncOperationResult
{
    var retryAction = new Action<int>(currentRetryTimes =>
    {
        if (currentRetryTimes >= _immediatelyRetryTimes)
        {
            Task.Factory.StartDelayedTask(_retryIntervalForIOException, () => mainAction(currentRetryTimes + 1));
        }
        else
        {
            mainAction(currentRetryTimes + 1);
        }
    });
    var executeFailedAction = new Action<Exception>(ex =>
    {
        try
        {
            if (failedAction != null)
            {
                failedAction(ex);
            }
        }
        catch (Exception unknownEx)
        {
            _logger.Error(string.Format("Failed to execute the failedCallbackAction of asyncAction:{0}, contextInfo:{1}",
                asyncActionName, getContextInfoFunc()), unknownEx);
        }
    });
    var processTaskException = new Action<Exception, int>((ex, currentRetryTimes) =>
    {
        if (ex is IOException)
        {
            _logger.Error(string.Format("Async task '{0}' has io exception, contextInfo:{1}, current retryTimes:{2}",
                asyncActionName, getContextInfoFunc(), currentRetryTimes), ex);
            retryAction(retryTimes);
        }
        else
        {
            _logger.Error(string.Format("Async task '{0}' has unknown exception, contextInfo:{1}, current retryTimes:{2}",
                asyncActionName, getContextInfoFunc(), currentRetryTimes), ex);
            executeFailedAction(ex);
        }
    });
    var completeAction = new Action<Task<TAsyncResult>>(t =>
    {
        if (t.Exception != null)
        {
            processTaskException(t.Exception.InnerException, retryTimes);
            return;
        }
        if (t.IsCanceled)
        {
            _logger.ErrorFormat("Async task '{0}' was cancelled, contextInfo:{1}, current retryTimes:{2}",
                asyncActionName, getContextInfoFunc(), retryTimes);
            retryAction(retryTimes);
            return;
        }
        var result = t.Result;
        if (result.Status == AsyncOperationResultStatus.IOException)
        {
            _logger.ErrorFormat("Async task '{0}' has io exception, contextInfo:{1}, current retryTimes:{2}, errorMsg:{3}",
                asyncActionName, getContextInfoFunc(), retryTimes, result.ErrorMessage);
            retryAction(retryTimes);
            return;
        }
        if (successAction != null)
        {
            successAction(result);
        }
    });

    try
    {
        asyncAction().ContinueWith(completeAction);
    }
    catch (IOException ex)
    {
        _logger.Error(string.Format("Execute async action '{0}' failed, contextInfo:{1}, current retryTimes:{2}",
            asyncActionName, getContextInfoFunc(), retryTimes), ex);
        retryAction(retryTimes);
    }
    catch (Exception ex)
    {
        _logger.Error(string.Format("Execute async action '{0}' failed, contextInfo:{1}, current retryTimes:{2}",
            asyncActionName, getContextInfoFunc(), retryTimes), ex);
        executeFailedAction(ex);
    }
}
复制代码

该函数的功能是:执行一个异步任务(返回Task的方法),如果执行出现IO异常,则重试当前主函数(mainAction);用户的mainAction中会再次调用TryAsyncActionRecursively方法。从而实现当遇到IO异常时,能做到不断重试。另外,重试只立即重试指定的次数,超过指定次数,则不立即重试,而是暂停一定间隔后再次执行。该函数还提供当acyncAction执行成功或失败后的回调函数,以及允许传入当前上下文的一些说明信息,以便记录有意义的错误日志信息。

下面是使用示例:

复制代码
private void PublishEventAsync(ProcessingCommand processingCommand, EventStream eventStream, int retryTimes)
{
    TryAsyncActionRecursively<AsyncOperationResult>("PublishEventAsync",
    () => _eventPublisher.PublishAsync(eventStream),
    currentRetryTimes => PublishEventAsync(processingCommand, eventStream, currentRetryTimes),
    result =>
    {
        _logger.DebugFormat("Publish events success, {0}", eventStream);
        processingCommand.Complete(new CommandResult(CommandStatus.Success, processingCommand.Command.Id));
    },
    () => string.Format("[eventStream:{0}]", eventStream),
    ex => processingCommand.Complete(new CommandResult(CommandStatus.Failed, processingCommand.Command.Id)),
    retryTimes);
}
复制代码
PublishEventAsync(processingCommand, eventStream, 0);

目录
相关文章
|
1月前
|
算法 数据处理 Python
Python并发编程:解密异步IO与多线程
本文将深入探讨Python中的并发编程技术,重点介绍异步IO和多线程两种常见的并发模型。通过对比它们的特点、适用场景和实现方式,帮助读者更好地理解并发编程的核心概念,并掌握在不同场景下选择合适的并发模型的方法。
|
1月前
|
数据采集 Java Python
python并发编程:Python异步IO实现并发爬虫
python并发编程:Python异步IO实现并发爬虫
37 1
|
1月前
|
调度 数据库 Python
【专栏】异步IO在处理IO密集型任务中的高效性
【4月更文挑战第27天】本文介绍了Python并发编程和异步IO,包括并发的基本概念(多线程、多进程、协程),线程与进程的实现(threading和multiprocessing模块),协程的使用(asyncio模块),以及异步IO的原理和优势。强调了异步IO在处理IO密集型任务中的高效性,指出应根据任务类型选择合适的并发技术。
|
1天前
|
数据挖掘 程序员 调度
Python并发编程之协程与异步IO
传统的多线程和多进程模型在处理大规模并发时存在一些性能瓶颈和资源消耗问题。本文将重点介绍Python中基于协程和异步IO的并发编程方法,探讨其工作原理和实际应用,帮助开发者更好地理解并利用Python的并发编程能力。
|
5天前
|
Linux
Linux异步io机制 io_uring
Linux异步io机制 io_uring
13 1
|
12天前
|
Python
并发编程,Python让你轻松驾驭多线程与异步IO!
【6月更文挑战第12天】本文探讨了Python中的并发编程,包括多线程和异步IO。通过`threading`模块展示了多线程编程,创建并运行多个线程以并发执行任务。同时,使用`asyncio`库演示了异步IO编程,允许在单线程中高效处理多个IO操作。两个示例代码详细解释了如何在Python中实现并发,展现了其在提升程序性能和响应速度方面的潜力。
|
1月前
|
消息中间件 关系型数据库 Kafka
实时计算 Flink版操作报错之在执行任务时遇到了一个IO错误,具体表现为无法从本地主机(localhost)下载文件,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
14天前
|
调度 数据库 开发者
在Python编程中,并发编程和异步IO是两个重要的概念,它们对于提高程序性能和响应速度具有至关重要的作用
【6月更文挑战第10天】本文介绍了Python并发编程和异步IO,包括并发编程的基本概念如多线程、多进程和协程。线程和进程可通过threading及multiprocessing模块管理,但多线程受限于GIL。协程利用asyncio模块实现非阻塞IO,适合处理IO密集型任务。异步IO基于事件循环,能提高服务器并发处理能力,适用于网络编程和文件操作等场景。异步IO与多线程、多进程在不同任务中有各自优势,开发者应根据需求选择合适的技术。
19 0
|
1月前
|
人工智能 算法 调度
uvloop,一个强大的 Python 异步IO编程库!
uvloop,一个强大的 Python 异步IO编程库!
47 2