封装多线程处理大量数据操作

简介: 们需要解决WaitAny和取得异步执行的返回值的问题。地球人都知道Thread和ThreadPool接受的委托都是没有返回值的。要想取的返回值,我们就得自己动手了,我们需要构造一个AsyncContext类,由这个类来保存异步执行的状态以并存储返回值。

们需要解决WaitAny和取得异步执行的返回值的问题。地球人都知道Thread和ThreadPool接受的委托都是没有返回值的。要想取的返回值,我们就得自己动手了,我们需要构造一个AsyncContext类,由这个类来保存异步执行的状态以并存储返回值。

using System; 
using System.Collections.Generic; 
using System.Text; 
using System.Collections; 
using System.Threading; 
using System.Diagnostics; 
  
namespace AppUtility 
{ 
 public delegate object DoGetObjTask(object state); 
    public static class AsyncHelper 
    { 
        /// <summary> 
        /// 执行多线程操作任务 
        /// </summary> 
        /// <param name="dataCollection">多线程操作的数据集合</param> 
        /// <param name="threadCn">分多少个线程来做</param> 
        /// <param name="processItemMethod">处理数据集合中单个数据使用的处理方法</param> 
        public static void DoAsync(IList dataCollection, int threadCn, WaitCallback processItemMethod) 
        { 
            DoAsync(dataCollection, threadCn, processItemMethod, true); 
        } 
  
  
        /// <summary> 
        /// 执行多线程操作任务 
        /// </summary> 
        /// <param name="dataCollection">多线程操作的数据集合</param> 
        /// <param name="threadCn">分多少个线程来做</param> 
        /// <param name="processItemMethod">处理数据集合中单个数据使用的处理方法</param> 
        /// <param name="needWaitAll">是否需要等待所有线程执行完毕才返回,为true时会等待所有线程执行完毕,否则则是在有一个线程执行完毕就返回</param> 
        public static void DoAsync(IList dataCollection, int threadCn, DoGetObjTask processItemMethod, bool needWaitAll, out Hashtable processResult) 
        { 
            DoAsyncPrivate(dataCollection, threadCn, null, processItemMethod, needWaitAll, true, out processResult); 
        } 
  
        /// <summary> 
        /// 执行多线程操作任务 
        /// </summary> 
        /// <param name="dataCollection">多线程操作的数据集合</param> 
        /// <param name="threadCn">分多少个线程来做</param> 
        /// <param name="processItemMethod">处理数据集合中单个数据使用的处理方法</param> 
        /// <param name="needWaitAll">是否需要等待所有线程执行完毕才返回,为true时会等待所有线程执行完毕,否则则是在有一个线程执行完毕就返回</param> 
        public static void DoAsync(IList dataCollection, int threadCn, DoGetObjTask processItemMethod, out Hashtable processResult) 
        { 
            DoAsyncPrivate(dataCollection, threadCn, null, processItemMethod, true, true, out processResult); 
        } 
  
        /// <summary> 
        /// 执行多线程操作任务 
        /// </summary> 
        /// <param name="dataCollection">多线程操作的数据集合</param> 
        /// <param name="threadCn">分多少个线程来做</param> 
        /// <param name="processItemMethod">处理数据集合中单个数据使用的处理方法</param> 
        /// <param name="needWaitAll">是否需要等待所有线程执行完毕才返回,为true时会等待所有线程执行完毕,否则则是在有一个线程执行完毕就返回</param> 
        public static void DoAsync(IList dataCollection, int threadCn, WaitCallback processItemMethod, bool needWaitAll) 
        { 
            Hashtable hash; 
            DoAsyncPrivate(dataCollection, threadCn, processItemMethod, null, needWaitAll, false, out hash); 
        } 
  
        private static void DoAsyncPrivate(IList dataCollection, int threadCn, WaitCallback processItemMethod, DoGetObjTask getObjMethod, bool needWaitAll, bool hasReturnValue, out Hashtable processResult) 
        { 
            if (dataCollection == null) throw new ArgumentNullException("dataCollection"); 
  
            if (threadCn >= 64 || threadCn < 2) 
            { 
                throw new ArgumentOutOfRangeException("threadCn", "threadCn 参数必须在2和64之间"); 
            } 
  
            if (threadCn > dataCollection.Count) threadCn = dataCollection.Count; 
  
            IList[] colls = new ArrayList[threadCn]; 
  
            DataWithStateList dataWithStates = new DataWithStateList(); 
            AutoResetEvent[] evts = new AutoResetEvent[threadCn]; 
  
            for (int i = 0; i < threadCn; i++) 
            { 
                colls[i] = new ArrayList(); 
                evts[i] = new AutoResetEvent(false); 
            } 
  
            for (int i = 0; i < dataCollection.Count; i++) 
            { 
                object obj = dataCollection[i]; 
                int threadIndex = i % threadCn; 
                colls[threadIndex].Add(obj); 
                dataWithStates.Add(new DataWithState(obj, ProcessState.WaitForProcess)); 
            } 
  
            AsyncContext context = AsyncContext.GetContext(threadCn, dataWithStates, needWaitAll, hasReturnValue, processItemMethod, getObjMethod); 
  
            for (int i = 0; i < threadCn; i++) 
            { 
                ThreadPool.QueueUserWorkItem(DoPrivate, new object[] {  
                    colls[i],context,evts[i] 
                }); 
            } 
  
            if (needWaitAll) 
            { 
                WaitHandle.WaitAll(evts); 
            } 
            else
            { 
                WaitHandle.WaitAny(evts); 
                context.SetBreakSignal(); 
            } 
            processResult = context.ProcessResult; 
        } 
  
        private class AsyncContext 
        { 
            static public AsyncContext GetContext( 
                int threadCn, 
                DataWithStateList dataWithStates, 
                bool needWaitAll, 
                bool hasReturnValue, 
                WaitCallback processItemMethod, 
                DoGetObjTask hasReturnValueMethod 
                ) 
            { 
                AsyncContext context = new AsyncContext(); 
                context.ThreadCount = threadCn; 
                context.DataWithStates = dataWithStates; 
                context.NeedWaitAll = needWaitAll; 
                if (hasReturnValue) 
                { 
                    Hashtable processResult = Hashtable.Synchronized(new Hashtable()); 
                    context.ProcessResult = processResult; 
                    context.HasReturnValueMethod = hasReturnValueMethod; 
                } 
                else
                { 
                    context.VoidMethod = processItemMethod; 
                } 
                context.HasReturnValue = hasReturnValue; 
                return context; 
            } 
  
            internal int ThreadCount; 
  
            internal DataWithStateList DataWithStates; 
  
            internal bool NeedWaitAll; 
  
            internal bool HasReturnValue; 
  
            internal WaitCallback VoidMethod; 
  
            internal DoGetObjTask HasReturnValueMethod; 
  
            private bool _breakSignal; 
  
            private Hashtable _processResult; 
  
            internal Hashtable ProcessResult 
            { 
                get { return _processResult; } 
                set { _processResult = value; } 
            } 
  
            internal void SetReturnValue(object obj, object result) 
            { 
                lock (_processResult.SyncRoot) 
                { 
                    _processResult[obj] = result; 
                } 
            } 
  
            internal void SetBreakSignal() 
            { 
                if (NeedWaitAll) throw new NotSupportedException("设定为NeedWaitAll时不可设置BreakSignal"); 
  
                _breakSignal = true; 
            } 
  
            internal bool NeedBreak 
            { 
                get
                { 
                    return !NeedWaitAll && _breakSignal; 
                } 
            } 
  
            internal void Exec(object obj) 
            { 
                if (HasReturnValue) 
                { 
                    SetReturnValue(obj, HasReturnValueMethod(obj)); 
                } 
                else
                { 
                    VoidMethod(obj); 
                } 
                DataWithStates.SetState(obj, ProcessState.Processed); 
            } 
        } 
  
        private enum ProcessState : byte
        { 
            WaitForProcess = 0, 
            Processing = 1, 
            Processed = 2 
        } 
  
        private class DataWithStateList : List<DataWithState> 
        { 
            public void SetState(object obj, ProcessState state) 
            { 
                lock (((ICollection)this).SyncRoot) 
                { 
                    DataWithState dws = this.Find(delegate(DataWithState i) { return Object.Equals(i.Data, obj); }); 
  
                    if (dws != null) 
                    { 
                        dws.State = state; 
                    } 
                } 
            } 
  
            public ProcessState GetState(object obj) 
            { 
                lock (((ICollection)this).SyncRoot) 
                { 
                    DataWithState dws = this.Find(delegate(DataWithState i) { return Object.Equals(i.Data, obj); }); 
                    return dws.State; 
                } 
            } 
  
            private int GetCount(ProcessState state) 
            { 
                List<DataWithState> datas = this.FindAll(delegate(DataWithState i) { return i.State == state; }); 
                if (datas == null) return 0; 
                return datas.Count; 
            } 
  
            public int WaitForDataCount 
            { 
                get
                { 
                    return GetCount(ProcessState.WaitForProcess); 
                } 
            } 
  
            internal object GetWaitForObject() 
            { 
                lock (((ICollection)this).SyncRoot) 
                { 
                    DataWithState dws = this.Find(delegate(DataWithState i) { return i.State == ProcessState.WaitForProcess; }); 
                    if (dws == null) return null; 
                    dws.State = ProcessState.Processing; 
                    return dws.Data; 
                } 
            } 
  
            internal bool IsWaitForData(object obj, bool setState) 
            { 
                lock (((ICollection)this).SyncRoot) 
                { 
                    DataWithState dws = this.Find(delegate(DataWithState i) { return i.State == ProcessState.WaitForProcess; }); 
  
                    if (setState && dws != null) dws.State = ProcessState.Processing; 
  
                    return dws != null; 
                } 
            } 
        } 
  
        private class DataWithState 
        { 
            public readonly object Data; 
            public ProcessState State; 
  
            public DataWithState(object data, ProcessState state) 
            { 
                Data = data; 
                State = state; 
            } 
        } 
  
        private static int _threadNo = 0; 
  
        private static void DoPrivate(object state) 
        { 
            object[] objs = state as object[]; 
  
            IList datas = objs[0] as IList; 
            AsyncContext context = objs[1] as AsyncContext; 
            AutoResetEvent evt = objs[2] as AutoResetEvent; 
  
            DataWithStateList objStates = context.DataWithStates; 
  
#if DEBUG 
            Thread.CurrentThread.Name = "Thread " + _threadNo; 
  
            Interlocked.Increment(ref _threadNo); 
            string threadName = Thread.CurrentThread.Name + "[" + Thread.CurrentThread.ManagedThreadId + "]"; 
            Trace.WriteLine("线程ID:" + threadName); 
#endif 
            if (datas != null) 
            { 
                for (int i = 0; i < datas.Count; i++) 
                { 
                    if (context.NeedBreak) 
                    { 
#if DEBUG 
                        Trace.WriteLine("线程" + threadName + "未执行完跳出"); 
#endif 
                        break; 
                    } 
                    object obj = datas[i]; 
                    if (objStates.IsWaitForData(obj, true)) 
                    { 
                        if (context.NeedBreak) 
                        { 
#if DEBUG 
                            Trace.WriteLine("线程" + threadName + "未执行完跳出"); 
#endif 
                            break; 
                        } 
  
                        context.Exec(obj); 
  
#if DEBUG 
                        Trace.WriteLine(string.Format("线程{0}处理{1}", threadName, obj)); 
#endif 
                    } 
                } 
            } 
  
            if (context.NeedWaitAll) 
            { 
                //如果执行完当前进程的数据,还要查看剩下多少没有做,如果还剩下超过ThreadCount个没有做 
                while (objStates.WaitForDataCount > context.ThreadCount) 
                { 
                    if (context.NeedBreak) break; 
  
                    object obj = objStates.GetWaitForObject(); 
                    if (obj != null && objStates.IsWaitForData(obj, false)) 
                    { 
                        if (context.NeedBreak) 
                        { 
#if DEBUG 
                            Trace.WriteLine("线程" + threadName + "未执行完跳出"); 
#endif 
                            break; 
                        } 
  
                        context.Exec(obj); 
  
#if DEBUG 
                        Trace.WriteLine(string.Format("线程{0}执行另一个进程的数据{1}", threadName, obj)); 
#endif 
                    } 
                } 
            } 
  
            evt.Set(); 
        } 
  
  
    } 
} 

使用AsyncHelper类,请看下面的测试代码:

using System; 
using System.Collections.Generic; 
using System.Text; 
using System.Diagnostics; 
using AppUtility; 
using System.IO; 
using System.Collections; 
using System.Threading; 
  
namespace ConsoleApplication2 
{ 
    class Program 
    { 
        static void Main(string[] args) 
        { 
            Stopwatch sw = new Stopwatch(); 
            sw.Start(); 
            /* 
            List<string> testFiles = new List<string>(); 
            for (int i = 0; i < 100; i++) 
            { 
                testFiles.Add("D:\\test\\async\\file_" + i.ToString() + ".log"); 
            } 
            AsyncHelper.DoAsync(testFiles, 10, WriteFile); 
  
            Console.WriteLine("异步写耗时"+sw.ElapsedMilliseconds + "ms"); 
            */
  
            List<string> testFiles = new List<string>(); 
            for (int i = 0; i < 200; i++) 
            { 
                testFiles.Add("D:\\test\\async\\file_" + i.ToString() + ".log"); 
            } 
              
            Hashtable result; 
  
            AsyncHelper.DoAsync(testFiles, 20, WriteFileAndReturnRowCount,false,out result); 
  
            Console.WriteLine("异步写耗时" + sw.ElapsedMilliseconds + "ms"); 
  
            Thread.Sleep(10); 
  
            if (result != null) 
            { 
                foreach (object key in result.Keys) 
                { 
                    Console.WriteLine("{0}={1}",  key,result[key]); 
                } 
  
            } 
  
            sw.Reset(); 
            sw.Start(); 
            for (int i = 0; i < 200; i++) 
            { 
                WriteFile("D:\\test\\sync\\file_" + i.ToString() + ".log"); 
            } 
  
            Console.WriteLine("同步写耗时" + sw.ElapsedMilliseconds + "ms"); 
  
            Console.Read(); 
        } 
  
        static void WriteFile(object objFilePath) 
        { 
            string filePath = (string)objFilePath; 
            string dir = Path.GetDirectoryName(filePath); 
            if (!Directory.Exists(dir)) 
            { 
                Directory.CreateDirectory(dir); 
            } 
  
            //Random r = new Random(DateTime.Now.Minute); 
            int rowCn = 10000; 
            using (StreamWriter writer = new StreamWriter(filePath, false, Encoding.Default)) 
            { 
                for (int i = 0; i < rowCn; i++) writer.WriteLine(Guid.NewGuid()); 
            } 
        } 
  
        static object WriteFileAndReturnRowCount(object objFilePath) 
        { 
            string filePath = (string)objFilePath; 
            string dir = Path.GetDirectoryName(filePath); 
            if (!Directory.Exists(dir)) 
            { 
                Directory.CreateDirectory(dir); 
            } 
  
            //Random r = new Random(DateTime.Now.Minute); 
            int rowCn = 10000; 
            using (StreamWriter writer = new StreamWriter(filePath, false, Encoding.Default)) 
            { 
                for (int i = 0; i < rowCn ; i++) writer.WriteLine(Guid.NewGuid()); 
            } 
            return DateTime.Now.ToLongTimeString(); 
        } 
    } 
}

转自:http://www.cnblogs.com/yukaizhao/archive/2009/12/18/system_threading_threadpool_AsyncHelper_2.html

需求是从一个文件夹中读取N个文件,将N个文件平均分配到10个线程,然后分别读取每个文件里面的内容。经测试,发现在DoPrivate方法里面,每个datas.Count分配是对的,但到objStates.IsWaitForData(obj, true)方法总有不定数量返回false,因此最后没有完全执行,最后数量总少于N,而且不定,降至2个线程也一样,使用这种方法会造成漏数据,因此只能改进DoPrivate方法,去掉objStates.IsWaitForData方法判断。

目录
相关文章
|
9天前
|
小程序 数据管理 数据处理
小程序数据绑定机制的优点
【10月更文挑战第23天】小程序数据绑定机制具有众多优点,它极大地提升了小程序的开发效率、交互性、可维护性和用户体验,是小程序开发中不可或缺的重要机制。你还可以根据实际情况进一步扩展和细化相关内容,使其更具针对性和实用性。
|
2月前
|
IDE Java 开发工具
流操作代码开发后端逻辑
该文档指导开发者在魔笔平台上下载代码模板并进行自定义逻辑流操作开发。首先登录魔笔并下载`bundle.zip`模板,解压缩后获得一个包含`custom-action-core`模块的Java工程。核心模块已预置接口与实现类,开发者需在`execute`方法中编写业务逻辑。工程要求JDK11+及Maven3.5+环境,并提供IDEA调试建议。每个自定义操作对应一个实现类,`MobiContext`参数简化了实体与结构体操作。注意不要修改工程特定目录以外的内容以确保兼容性。
34 2
|
3月前
|
存储 Java
线程池的底层工作原理是什么?
【8月更文挑战第8天】线程池的底层工作原理是什么?
109 8
|
4月前
|
缓存 NoSQL API
分享大厂对于缓存操作的封装
作者shigen分享了关于Redis缓存的封装,以避免常见问题如穿透、击穿、雪崩。封装包括四个文件:CacheEnum、CacheLoader、CacheService和CacheServiceImpl。CacheEnum用于统一管理缓存名和过期时间,CacheService定义缓存操作接口,CacheServiceImpl是实现类,使用Semaphore解决缓存击穿问题。
44 1
分享大厂对于缓存操作的封装
|
6月前
|
Linux 程序员 C++
【C++ 常见的异步机制】探索现代异步编程:从 ASIO 到协程的底层机制解析
【C++ 常见的异步机制】探索现代异步编程:从 ASIO 到协程的底层机制解析
963 2
|
设计模式 Java 数据库连接
Java责任链模式:优雅解耦系统处理流程,实现高效灵活的请求处理与分发(下)
Java责任链模式:优雅解耦系统处理流程,实现高效灵活的请求处理与分发
275 0
|
存储 设计模式 Java
Java责任链模式:优雅解耦系统处理流程,实现高效灵活的请求处理与分发(上)
Java责任链模式:优雅解耦系统处理流程,实现高效灵活的请求处理与分发
204 0
|
监控 NoSQL Java
简单高效的代码优化-事务后异步处理
基于skywalking监控分析,简单高效的优化代码,提升90%+效率,设计事务后异步处理,EventListener,Mysql隔离级别等知识点
223 0
简单高效的代码优化-事务后异步处理
|
SQL 安全 PHP
封装数据库操作类(读操作)|学习笔记
快速学习封装数据库操作类(读操作)
封装数据库操作类(读操作)|学习笔记
数据结构107-获取操作封装代码
数据结构107-获取操作封装代码
66 0