封装多线程处理大量数据操作

简介:

们需要解决WaitAny和取得异步执行的返回值的问题。地球人都知道Thread和ThreadPool接受的委托都是没有返回值的。要想取的返回值,我们就得自己动手了,我们需要构造一个AsyncContext类,由这个类来保存异步执行的状态以并存储返回值。

复制代码
using System; 
using System.Collections.Generic; 
using System.Text; 
using System.Collections; 
using System.Threading; 
using System.Diagnostics; 
  
namespace AppUtility 
{ 
 public delegate object DoGetObjTask(object state); 
    public static class AsyncHelper 
    { 
        /// <summary> 
        /// 执行多线程操作任务 
        /// </summary> 
        /// <param name="dataCollection">多线程操作的数据集合</param> 
        /// <param name="threadCn">分多少个线程来做</param> 
        /// <param name="processItemMethod">处理数据集合中单个数据使用的处理方法</param> 
        public static void DoAsync(IList dataCollection, int threadCn, WaitCallback processItemMethod) 
        { 
            DoAsync(dataCollection, threadCn, processItemMethod, true); 
        } 
  
  
        /// <summary> 
        /// 执行多线程操作任务 
        /// </summary> 
        /// <param name="dataCollection">多线程操作的数据集合</param> 
        /// <param name="threadCn">分多少个线程来做</param> 
        /// <param name="processItemMethod">处理数据集合中单个数据使用的处理方法</param> 
        /// <param name="needWaitAll">是否需要等待所有线程执行完毕才返回,为true时会等待所有线程执行完毕,否则则是在有一个线程执行完毕就返回</param> 
        public static void DoAsync(IList dataCollection, int threadCn, DoGetObjTask processItemMethod, bool needWaitAll, out Hashtable processResult) 
        { 
            DoAsyncPrivate(dataCollection, threadCn, null, processItemMethod, needWaitAll, true, out processResult); 
        } 
  
        /// <summary> 
        /// 执行多线程操作任务 
        /// </summary> 
        /// <param name="dataCollection">多线程操作的数据集合</param> 
        /// <param name="threadCn">分多少个线程来做</param> 
        /// <param name="processItemMethod">处理数据集合中单个数据使用的处理方法</param> 
        /// <param name="needWaitAll">是否需要等待所有线程执行完毕才返回,为true时会等待所有线程执行完毕,否则则是在有一个线程执行完毕就返回</param> 
        public static void DoAsync(IList dataCollection, int threadCn, DoGetObjTask processItemMethod, out Hashtable processResult) 
        { 
            DoAsyncPrivate(dataCollection, threadCn, null, processItemMethod, true, true, out processResult); 
        } 
  
        /// <summary> 
        /// 执行多线程操作任务 
        /// </summary> 
        /// <param name="dataCollection">多线程操作的数据集合</param> 
        /// <param name="threadCn">分多少个线程来做</param> 
        /// <param name="processItemMethod">处理数据集合中单个数据使用的处理方法</param> 
        /// <param name="needWaitAll">是否需要等待所有线程执行完毕才返回,为true时会等待所有线程执行完毕,否则则是在有一个线程执行完毕就返回</param> 
        public static void DoAsync(IList dataCollection, int threadCn, WaitCallback processItemMethod, bool needWaitAll) 
        { 
            Hashtable hash; 
            DoAsyncPrivate(dataCollection, threadCn, processItemMethod, null, needWaitAll, false, out hash); 
        } 
  
        private static void DoAsyncPrivate(IList dataCollection, int threadCn, WaitCallback processItemMethod, DoGetObjTask getObjMethod, bool needWaitAll, bool hasReturnValue, out Hashtable processResult) 
        { 
            if (dataCollection == null) throw new ArgumentNullException("dataCollection"); 
  
            if (threadCn >= 64 || threadCn < 2) 
            { 
                throw new ArgumentOutOfRangeException("threadCn", "threadCn 参数必须在2和64之间"); 
            } 
  
            if (threadCn > dataCollection.Count) threadCn = dataCollection.Count; 
  
            IList[] colls = new ArrayList[threadCn]; 
  
            DataWithStateList dataWithStates = new DataWithStateList(); 
            AutoResetEvent[] evts = new AutoResetEvent[threadCn]; 
  
            for (int i = 0; i < threadCn; i++) 
            { 
                colls[i] = new ArrayList(); 
                evts[i] = new AutoResetEvent(false); 
            } 
  
            for (int i = 0; i < dataCollection.Count; i++) 
            { 
                object obj = dataCollection[i]; 
                int threadIndex = i % threadCn; 
                colls[threadIndex].Add(obj); 
                dataWithStates.Add(new DataWithState(obj, ProcessState.WaitForProcess)); 
            } 
  
            AsyncContext context = AsyncContext.GetContext(threadCn, dataWithStates, needWaitAll, hasReturnValue, processItemMethod, getObjMethod); 
  
            for (int i = 0; i < threadCn; i++) 
            { 
                ThreadPool.QueueUserWorkItem(DoPrivate, new object[] {  
                    colls[i],context,evts[i] 
                }); 
            } 
  
            if (needWaitAll) 
            { 
                WaitHandle.WaitAll(evts); 
            } 
            else
            { 
                WaitHandle.WaitAny(evts); 
                context.SetBreakSignal(); 
            } 
            processResult = context.ProcessResult; 
        } 
  
        private class AsyncContext 
        { 
            static public AsyncContext GetContext( 
                int threadCn, 
                DataWithStateList dataWithStates, 
                bool needWaitAll, 
                bool hasReturnValue, 
                WaitCallback processItemMethod, 
                DoGetObjTask hasReturnValueMethod 
                ) 
            { 
                AsyncContext context = new AsyncContext(); 
                context.ThreadCount = threadCn; 
                context.DataWithStates = dataWithStates; 
                context.NeedWaitAll = needWaitAll; 
                if (hasReturnValue) 
                { 
                    Hashtable processResult = Hashtable.Synchronized(new Hashtable()); 
                    context.ProcessResult = processResult; 
                    context.HasReturnValueMethod = hasReturnValueMethod; 
                } 
                else
                { 
                    context.VoidMethod = processItemMethod; 
                } 
                context.HasReturnValue = hasReturnValue; 
                return context; 
            } 
  
            internal int ThreadCount; 
  
            internal DataWithStateList DataWithStates; 
  
            internal bool NeedWaitAll; 
  
            internal bool HasReturnValue; 
  
            internal WaitCallback VoidMethod; 
  
            internal DoGetObjTask HasReturnValueMethod; 
  
            private bool _breakSignal; 
  
            private Hashtable _processResult; 
  
            internal Hashtable ProcessResult 
            { 
                get { return _processResult; } 
                set { _processResult = value; } 
            } 
  
            internal void SetReturnValue(object obj, object result) 
            { 
                lock (_processResult.SyncRoot) 
                { 
                    _processResult[obj] = result; 
                } 
            } 
  
            internal void SetBreakSignal() 
            { 
                if (NeedWaitAll) throw new NotSupportedException("设定为NeedWaitAll时不可设置BreakSignal"); 
  
                _breakSignal = true; 
            } 
  
            internal bool NeedBreak 
            { 
                get
                { 
                    return !NeedWaitAll && _breakSignal; 
                } 
            } 
  
            internal void Exec(object obj) 
            { 
                if (HasReturnValue) 
                { 
                    SetReturnValue(obj, HasReturnValueMethod(obj)); 
                } 
                else
                { 
                    VoidMethod(obj); 
                } 
                DataWithStates.SetState(obj, ProcessState.Processed); 
            } 
        } 
  
        private enum ProcessState : byte
        { 
            WaitForProcess = 0, 
            Processing = 1, 
            Processed = 2 
        } 
  
        private class DataWithStateList : List<DataWithState> 
        { 
            public void SetState(object obj, ProcessState state) 
            { 
                lock (((ICollection)this).SyncRoot) 
                { 
                    DataWithState dws = this.Find(delegate(DataWithState i) { return Object.Equals(i.Data, obj); }); 
  
                    if (dws != null) 
                    { 
                        dws.State = state; 
                    } 
                } 
            } 
  
            public ProcessState GetState(object obj) 
            { 
                lock (((ICollection)this).SyncRoot) 
                { 
                    DataWithState dws = this.Find(delegate(DataWithState i) { return Object.Equals(i.Data, obj); }); 
                    return dws.State; 
                } 
            } 
  
            private int GetCount(ProcessState state) 
            { 
                List<DataWithState> datas = this.FindAll(delegate(DataWithState i) { return i.State == state; }); 
                if (datas == null) return 0; 
                return datas.Count; 
            } 
  
            public int WaitForDataCount 
            { 
                get
                { 
                    return GetCount(ProcessState.WaitForProcess); 
                } 
            } 
  
            internal object GetWaitForObject() 
            { 
                lock (((ICollection)this).SyncRoot) 
                { 
                    DataWithState dws = this.Find(delegate(DataWithState i) { return i.State == ProcessState.WaitForProcess; }); 
                    if (dws == null) return null; 
                    dws.State = ProcessState.Processing; 
                    return dws.Data; 
                } 
            } 
  
            internal bool IsWaitForData(object obj, bool setState) 
            { 
                lock (((ICollection)this).SyncRoot) 
                { 
                    DataWithState dws = this.Find(delegate(DataWithState i) { return i.State == ProcessState.WaitForProcess; }); 
  
                    if (setState && dws != null) dws.State = ProcessState.Processing; 
  
                    return dws != null; 
                } 
            } 
        } 
  
        private class DataWithState 
        { 
            public readonly object Data; 
            public ProcessState State; 
  
            public DataWithState(object data, ProcessState state) 
            { 
                Data = data; 
                State = state; 
            } 
        } 
  
        private static int _threadNo = 0; 
  
        private static void DoPrivate(object state) 
        { 
            object[] objs = state as object[]; 
  
            IList datas = objs[0] as IList; 
            AsyncContext context = objs[1] as AsyncContext; 
            AutoResetEvent evt = objs[2] as AutoResetEvent; 
  
            DataWithStateList objStates = context.DataWithStates; 
  
#if DEBUG 
            Thread.CurrentThread.Name = "Thread " + _threadNo; 
  
            Interlocked.Increment(ref _threadNo); 
            string threadName = Thread.CurrentThread.Name + "[" + Thread.CurrentThread.ManagedThreadId + "]"; 
            Trace.WriteLine("线程ID:" + threadName); 
#endif 
            if (datas != null) 
            { 
                for (int i = 0; i < datas.Count; i++) 
                { 
                    if (context.NeedBreak) 
                    { 
#if DEBUG 
                        Trace.WriteLine("线程" + threadName + "未执行完跳出"); 
#endif 
                        break; 
                    } 
                    object obj = datas[i]; 
                    if (objStates.IsWaitForData(obj, true)) 
                    { 
                        if (context.NeedBreak) 
                        { 
#if DEBUG 
                            Trace.WriteLine("线程" + threadName + "未执行完跳出"); 
#endif 
                            break; 
                        } 
  
                        context.Exec(obj); 
  
#if DEBUG 
                        Trace.WriteLine(string.Format("线程{0}处理{1}", threadName, obj)); 
#endif 
                    } 
                } 
            } 
  
            if (context.NeedWaitAll) 
            { 
                //如果执行完当前进程的数据,还要查看剩下多少没有做,如果还剩下超过ThreadCount个没有做 
                while (objStates.WaitForDataCount > context.ThreadCount) 
                { 
                    if (context.NeedBreak) break; 
  
                    object obj = objStates.GetWaitForObject(); 
                    if (obj != null && objStates.IsWaitForData(obj, false)) 
                    { 
                        if (context.NeedBreak) 
                        { 
#if DEBUG 
                            Trace.WriteLine("线程" + threadName + "未执行完跳出"); 
#endif 
                            break; 
                        } 
  
                        context.Exec(obj); 
  
#if DEBUG 
                        Trace.WriteLine(string.Format("线程{0}执行另一个进程的数据{1}", threadName, obj)); 
#endif 
                    } 
                } 
            } 
  
            evt.Set(); 
        } 
  
  
    } 
} 
复制代码

使用AsyncHelper类,请看下面的测试代码:

复制代码
using System; 
using System.Collections.Generic; 
using System.Text; 
using System.Diagnostics; 
using AppUtility; 
using System.IO; 
using System.Collections; 
using System.Threading; 
  
namespace ConsoleApplication2 
{ 
    class Program 
    { 
        static void Main(string[] args) 
        { 
            Stopwatch sw = new Stopwatch(); 
            sw.Start(); 
            /* 
            List<string> testFiles = new List<string>(); 
            for (int i = 0; i < 100; i++) 
            { 
                testFiles.Add("D:\\test\\async\\file_" + i.ToString() + ".log"); 
            } 
            AsyncHelper.DoAsync(testFiles, 10, WriteFile); 
  
            Console.WriteLine("异步写耗时"+sw.ElapsedMilliseconds + "ms"); 
            */
  
            List<string> testFiles = new List<string>(); 
            for (int i = 0; i < 200; i++) 
            { 
                testFiles.Add("D:\\test\\async\\file_" + i.ToString() + ".log"); 
            } 
              
            Hashtable result; 
  
            AsyncHelper.DoAsync(testFiles, 20, WriteFileAndReturnRowCount,false,out result); 
  
            Console.WriteLine("异步写耗时" + sw.ElapsedMilliseconds + "ms"); 
  
            Thread.Sleep(10); 
  
            if (result != null) 
            { 
                foreach (object key in result.Keys) 
                { 
                    Console.WriteLine("{0}={1}",  key,result[key]); 
                } 
  
            } 
  
            sw.Reset(); 
            sw.Start(); 
            for (int i = 0; i < 200; i++) 
            { 
                WriteFile("D:\\test\\sync\\file_" + i.ToString() + ".log"); 
            } 
  
            Console.WriteLine("同步写耗时" + sw.ElapsedMilliseconds + "ms"); 
  
            Console.Read(); 
        } 
  
        static void WriteFile(object objFilePath) 
        { 
            string filePath = (string)objFilePath; 
            string dir = Path.GetDirectoryName(filePath); 
            if (!Directory.Exists(dir)) 
            { 
                Directory.CreateDirectory(dir); 
            } 
  
            //Random r = new Random(DateTime.Now.Minute); 
            int rowCn = 10000; 
            using (StreamWriter writer = new StreamWriter(filePath, false, Encoding.Default)) 
            { 
                for (int i = 0; i < rowCn; i++) writer.WriteLine(Guid.NewGuid()); 
            } 
        } 
  
        static object WriteFileAndReturnRowCount(object objFilePath) 
        { 
            string filePath = (string)objFilePath; 
            string dir = Path.GetDirectoryName(filePath); 
            if (!Directory.Exists(dir)) 
            { 
                Directory.CreateDirectory(dir); 
            } 
  
            //Random r = new Random(DateTime.Now.Minute); 
            int rowCn = 10000; 
            using (StreamWriter writer = new StreamWriter(filePath, false, Encoding.Default)) 
            { 
                for (int i = 0; i < rowCn ; i++) writer.WriteLine(Guid.NewGuid()); 
            } 
            return DateTime.Now.ToLongTimeString(); 
        } 
    } 
}
复制代码

转自:http://www.cnblogs.com/yukaizhao/archive/2009/12/18/system_threading_threadpool_AsyncHelper_2.html

需求是从一个文件夹中读取N个文件,将N个文件平均分配到10个线程,然后分别读取每个文件里面的内容。经测试,发现在DoPrivate方法里面,每个datas.Count分配是对的,但到objStates.IsWaitForData(obj, true)方法总有不定数量返回false,因此最后没有完全执行,最后数量总少于N,而且不定,降至2个线程也一样,使用这种方法会造成漏数据,因此只能改进DoPrivate方法,去掉objStates.IsWaitForData方法判断。

本文转自欢醉博客园博客,原文链接http://www.cnblogs.com/zhangs1986/p/3625680.html如需转载请自行联系原作者


欢醉

相关文章
|
6月前
|
存储 前端开发 Java
【C++ 多线程 】C++并发编程:精细控制数据打印顺序的策略
【C++ 多线程 】C++并发编程:精细控制数据打印顺序的策略
192 1
|
6月前
|
存储 Java 数据安全/隐私保护
【JUC】ThreadLocal 如何实现数据的线程隔离?
【1月更文挑战第15天】【JUC】ThreadLocal 如何实现数据的线程隔离?ThreadLocal 导致内存泄漏问题?
|
2月前
|
消息中间件 监控 安全
服务Down机了,线程池中的数据如何保证不丢失?
在分布式系统与高并发应用开发中,服务的稳定性和数据的持久性是两个至关重要的考量点。当服务遭遇Down机时,如何确保线程池中处理的数据不丢失,是每一位开发者都需要深入思考的问题。以下,我将从几个关键方面分享如何在这种情况下保障数据的安全与完整性。
62 2
|
25天前
|
缓存 安全 Java
使用 Java 内存模型解决多线程中的数据竞争问题
【10月更文挑战第11天】在 Java 多线程编程中,数据竞争是一个常见问题。通过使用 `synchronized` 关键字、`volatile` 关键字、原子类、显式锁、避免共享可变数据、合理设计数据结构、遵循线程安全原则和使用线程池等方法,可以有效解决数据竞争问题,确保程序的正确性和稳定性。
34 2
|
6月前
|
消息中间件 Java Kafka
实时计算 Flink版操作报错之Apache Flink中的SplitFetcher线程在读取数据时遇到了未预期的情况,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
2月前
|
消息中间件 存储 Java
服务重启了,如何保证线程池中的数据不丢失?
【8月更文挑战第30天】为确保服务重启时线程池数据不丢失,可采用数据持久化(如数据库或文件存储)、使用可靠的任务队列(如消息队列或分布式任务队列系统)、状态监测与恢复机制,以及分布式锁等方式。这些方法能有效提高系统稳定性和可靠性,需根据具体需求选择合适方案并进行测试优化。
178 5
|
3月前
处理串口线程数据的函数
【8月更文挑战第4天】处理串口线程数据的函数。
27 4
|
3月前
|
数据处理 Python
解锁Python多线程编程魔法,告别漫长等待!让数据下载如飞,感受科技带来的速度与激情!
【8月更文挑战第22天】Python以简洁的语法和强大的库支持在多个领域大放异彩。尽管存在全局解释器锁(GIL),Python仍提供多线程支持,尤其适用于I/O密集型任务。通过一个多线程下载数据的例子,展示了如何使用`threading`模块创建多线程程序,并与单线程版本进行了性能对比。实验表明,多线程能显著减少总等待时间,但在CPU密集型任务上GIL可能会限制其性能提升。此案例帮助理解Python多线程的优势及其适用场景。
38 0
|
3月前
|
NoSQL Redis
Lettuce的特性和内部实现问题之在同步调用模式下,业务线程是如何拿到结果数据的
Lettuce的特性和内部实现问题之在同步调用模式下,业务线程是如何拿到结果数据的
|
5月前
|
存储 测试技术
【工作实践(多线程)】十个线程任务生成720w测试数据对系统进行性能测试
【工作实践(多线程)】十个线程任务生成720w测试数据对系统进行性能测试
54 0
【工作实践(多线程)】十个线程任务生成720w测试数据对系统进行性能测试