封装多线程处理大量数据操作

简介:

们需要解决WaitAny和取得异步执行的返回值的问题。地球人都知道Thread和ThreadPool接受的委托都是没有返回值的。要想取的返回值,我们就得自己动手了,我们需要构造一个AsyncContext类,由这个类来保存异步执行的状态以并存储返回值。

复制代码
using System; 
using System.Collections.Generic; 
using System.Text; 
using System.Collections; 
using System.Threading; 
using System.Diagnostics; 
  
namespace AppUtility 
{ 
 public delegate object DoGetObjTask(object state); 
    public static class AsyncHelper 
    { 
        /// <summary> 
        /// 执行多线程操作任务 
        /// </summary> 
        /// <param name="dataCollection">多线程操作的数据集合</param> 
        /// <param name="threadCn">分多少个线程来做</param> 
        /// <param name="processItemMethod">处理数据集合中单个数据使用的处理方法</param> 
        public static void DoAsync(IList dataCollection, int threadCn, WaitCallback processItemMethod) 
        { 
            DoAsync(dataCollection, threadCn, processItemMethod, true); 
        } 
  
  
        /// <summary> 
        /// 执行多线程操作任务 
        /// </summary> 
        /// <param name="dataCollection">多线程操作的数据集合</param> 
        /// <param name="threadCn">分多少个线程来做</param> 
        /// <param name="processItemMethod">处理数据集合中单个数据使用的处理方法</param> 
        /// <param name="needWaitAll">是否需要等待所有线程执行完毕才返回,为true时会等待所有线程执行完毕,否则则是在有一个线程执行完毕就返回</param> 
        public static void DoAsync(IList dataCollection, int threadCn, DoGetObjTask processItemMethod, bool needWaitAll, out Hashtable processResult) 
        { 
            DoAsyncPrivate(dataCollection, threadCn, null, processItemMethod, needWaitAll, true, out processResult); 
        } 
  
        /// <summary> 
        /// 执行多线程操作任务 
        /// </summary> 
        /// <param name="dataCollection">多线程操作的数据集合</param> 
        /// <param name="threadCn">分多少个线程来做</param> 
        /// <param name="processItemMethod">处理数据集合中单个数据使用的处理方法</param> 
        /// <param name="needWaitAll">是否需要等待所有线程执行完毕才返回,为true时会等待所有线程执行完毕,否则则是在有一个线程执行完毕就返回</param> 
        public static void DoAsync(IList dataCollection, int threadCn, DoGetObjTask processItemMethod, out Hashtable processResult) 
        { 
            DoAsyncPrivate(dataCollection, threadCn, null, processItemMethod, true, true, out processResult); 
        } 
  
        /// <summary> 
        /// 执行多线程操作任务 
        /// </summary> 
        /// <param name="dataCollection">多线程操作的数据集合</param> 
        /// <param name="threadCn">分多少个线程来做</param> 
        /// <param name="processItemMethod">处理数据集合中单个数据使用的处理方法</param> 
        /// <param name="needWaitAll">是否需要等待所有线程执行完毕才返回,为true时会等待所有线程执行完毕,否则则是在有一个线程执行完毕就返回</param> 
        public static void DoAsync(IList dataCollection, int threadCn, WaitCallback processItemMethod, bool needWaitAll) 
        { 
            Hashtable hash; 
            DoAsyncPrivate(dataCollection, threadCn, processItemMethod, null, needWaitAll, false, out hash); 
        } 
  
        private static void DoAsyncPrivate(IList dataCollection, int threadCn, WaitCallback processItemMethod, DoGetObjTask getObjMethod, bool needWaitAll, bool hasReturnValue, out Hashtable processResult) 
        { 
            if (dataCollection == null) throw new ArgumentNullException("dataCollection"); 
  
            if (threadCn >= 64 || threadCn < 2) 
            { 
                throw new ArgumentOutOfRangeException("threadCn", "threadCn 参数必须在2和64之间"); 
            } 
  
            if (threadCn > dataCollection.Count) threadCn = dataCollection.Count; 
  
            IList[] colls = new ArrayList[threadCn]; 
  
            DataWithStateList dataWithStates = new DataWithStateList(); 
            AutoResetEvent[] evts = new AutoResetEvent[threadCn]; 
  
            for (int i = 0; i < threadCn; i++) 
            { 
                colls[i] = new ArrayList(); 
                evts[i] = new AutoResetEvent(false); 
            } 
  
            for (int i = 0; i < dataCollection.Count; i++) 
            { 
                object obj = dataCollection[i]; 
                int threadIndex = i % threadCn; 
                colls[threadIndex].Add(obj); 
                dataWithStates.Add(new DataWithState(obj, ProcessState.WaitForProcess)); 
            } 
  
            AsyncContext context = AsyncContext.GetContext(threadCn, dataWithStates, needWaitAll, hasReturnValue, processItemMethod, getObjMethod); 
  
            for (int i = 0; i < threadCn; i++) 
            { 
                ThreadPool.QueueUserWorkItem(DoPrivate, new object[] {  
                    colls[i],context,evts[i] 
                }); 
            } 
  
            if (needWaitAll) 
            { 
                WaitHandle.WaitAll(evts); 
            } 
            else
            { 
                WaitHandle.WaitAny(evts); 
                context.SetBreakSignal(); 
            } 
            processResult = context.ProcessResult; 
        } 
  
        private class AsyncContext 
        { 
            static public AsyncContext GetContext( 
                int threadCn, 
                DataWithStateList dataWithStates, 
                bool needWaitAll, 
                bool hasReturnValue, 
                WaitCallback processItemMethod, 
                DoGetObjTask hasReturnValueMethod 
                ) 
            { 
                AsyncContext context = new AsyncContext(); 
                context.ThreadCount = threadCn; 
                context.DataWithStates = dataWithStates; 
                context.NeedWaitAll = needWaitAll; 
                if (hasReturnValue) 
                { 
                    Hashtable processResult = Hashtable.Synchronized(new Hashtable()); 
                    context.ProcessResult = processResult; 
                    context.HasReturnValueMethod = hasReturnValueMethod; 
                } 
                else
                { 
                    context.VoidMethod = processItemMethod; 
                } 
                context.HasReturnValue = hasReturnValue; 
                return context; 
            } 
  
            internal int ThreadCount; 
  
            internal DataWithStateList DataWithStates; 
  
            internal bool NeedWaitAll; 
  
            internal bool HasReturnValue; 
  
            internal WaitCallback VoidMethod; 
  
            internal DoGetObjTask HasReturnValueMethod; 
  
            private bool _breakSignal; 
  
            private Hashtable _processResult; 
  
            internal Hashtable ProcessResult 
            { 
                get { return _processResult; } 
                set { _processResult = value; } 
            } 
  
            internal void SetReturnValue(object obj, object result) 
            { 
                lock (_processResult.SyncRoot) 
                { 
                    _processResult[obj] = result; 
                } 
            } 
  
            internal void SetBreakSignal() 
            { 
                if (NeedWaitAll) throw new NotSupportedException("设定为NeedWaitAll时不可设置BreakSignal"); 
  
                _breakSignal = true; 
            } 
  
            internal bool NeedBreak 
            { 
                get
                { 
                    return !NeedWaitAll && _breakSignal; 
                } 
            } 
  
            internal void Exec(object obj) 
            { 
                if (HasReturnValue) 
                { 
                    SetReturnValue(obj, HasReturnValueMethod(obj)); 
                } 
                else
                { 
                    VoidMethod(obj); 
                } 
                DataWithStates.SetState(obj, ProcessState.Processed); 
            } 
        } 
  
        private enum ProcessState : byte
        { 
            WaitForProcess = 0, 
            Processing = 1, 
            Processed = 2 
        } 
  
        private class DataWithStateList : List<DataWithState> 
        { 
            public void SetState(object obj, ProcessState state) 
            { 
                lock (((ICollection)this).SyncRoot) 
                { 
                    DataWithState dws = this.Find(delegate(DataWithState i) { return Object.Equals(i.Data, obj); }); 
  
                    if (dws != null) 
                    { 
                        dws.State = state; 
                    } 
                } 
            } 
  
            public ProcessState GetState(object obj) 
            { 
                lock (((ICollection)this).SyncRoot) 
                { 
                    DataWithState dws = this.Find(delegate(DataWithState i) { return Object.Equals(i.Data, obj); }); 
                    return dws.State; 
                } 
            } 
  
            private int GetCount(ProcessState state) 
            { 
                List<DataWithState> datas = this.FindAll(delegate(DataWithState i) { return i.State == state; }); 
                if (datas == null) return 0; 
                return datas.Count; 
            } 
  
            public int WaitForDataCount 
            { 
                get
                { 
                    return GetCount(ProcessState.WaitForProcess); 
                } 
            } 
  
            internal object GetWaitForObject() 
            { 
                lock (((ICollection)this).SyncRoot) 
                { 
                    DataWithState dws = this.Find(delegate(DataWithState i) { return i.State == ProcessState.WaitForProcess; }); 
                    if (dws == null) return null; 
                    dws.State = ProcessState.Processing; 
                    return dws.Data; 
                } 
            } 
  
            internal bool IsWaitForData(object obj, bool setState) 
            { 
                lock (((ICollection)this).SyncRoot) 
                { 
                    DataWithState dws = this.Find(delegate(DataWithState i) { return i.State == ProcessState.WaitForProcess; }); 
  
                    if (setState && dws != null) dws.State = ProcessState.Processing; 
  
                    return dws != null; 
                } 
            } 
        } 
  
        private class DataWithState 
        { 
            public readonly object Data; 
            public ProcessState State; 
  
            public DataWithState(object data, ProcessState state) 
            { 
                Data = data; 
                State = state; 
            } 
        } 
  
        private static int _threadNo = 0; 
  
        private static void DoPrivate(object state) 
        { 
            object[] objs = state as object[]; 
  
            IList datas = objs[0] as IList; 
            AsyncContext context = objs[1] as AsyncContext; 
            AutoResetEvent evt = objs[2] as AutoResetEvent; 
  
            DataWithStateList objStates = context.DataWithStates; 
  
#if DEBUG 
            Thread.CurrentThread.Name = "Thread " + _threadNo; 
  
            Interlocked.Increment(ref _threadNo); 
            string threadName = Thread.CurrentThread.Name + "[" + Thread.CurrentThread.ManagedThreadId + "]"; 
            Trace.WriteLine("线程ID:" + threadName); 
#endif 
            if (datas != null) 
            { 
                for (int i = 0; i < datas.Count; i++) 
                { 
                    if (context.NeedBreak) 
                    { 
#if DEBUG 
                        Trace.WriteLine("线程" + threadName + "未执行完跳出"); 
#endif 
                        break; 
                    } 
                    object obj = datas[i]; 
                    if (objStates.IsWaitForData(obj, true)) 
                    { 
                        if (context.NeedBreak) 
                        { 
#if DEBUG 
                            Trace.WriteLine("线程" + threadName + "未执行完跳出"); 
#endif 
                            break; 
                        } 
  
                        context.Exec(obj); 
  
#if DEBUG 
                        Trace.WriteLine(string.Format("线程{0}处理{1}", threadName, obj)); 
#endif 
                    } 
                } 
            } 
  
            if (context.NeedWaitAll) 
            { 
                //如果执行完当前进程的数据,还要查看剩下多少没有做,如果还剩下超过ThreadCount个没有做 
                while (objStates.WaitForDataCount > context.ThreadCount) 
                { 
                    if (context.NeedBreak) break; 
  
                    object obj = objStates.GetWaitForObject(); 
                    if (obj != null && objStates.IsWaitForData(obj, false)) 
                    { 
                        if (context.NeedBreak) 
                        { 
#if DEBUG 
                            Trace.WriteLine("线程" + threadName + "未执行完跳出"); 
#endif 
                            break; 
                        } 
  
                        context.Exec(obj); 
  
#if DEBUG 
                        Trace.WriteLine(string.Format("线程{0}执行另一个进程的数据{1}", threadName, obj)); 
#endif 
                    } 
                } 
            } 
  
            evt.Set(); 
        } 
  
  
    } 
} 
复制代码

使用AsyncHelper类,请看下面的测试代码:

复制代码
using System; 
using System.Collections.Generic; 
using System.Text; 
using System.Diagnostics; 
using AppUtility; 
using System.IO; 
using System.Collections; 
using System.Threading; 
  
namespace ConsoleApplication2 
{ 
    class Program 
    { 
        static void Main(string[] args) 
        { 
            Stopwatch sw = new Stopwatch(); 
            sw.Start(); 
            /* 
            List<string> testFiles = new List<string>(); 
            for (int i = 0; i < 100; i++) 
            { 
                testFiles.Add("D:\\test\\async\\file_" + i.ToString() + ".log"); 
            } 
            AsyncHelper.DoAsync(testFiles, 10, WriteFile); 
  
            Console.WriteLine("异步写耗时"+sw.ElapsedMilliseconds + "ms"); 
            */
  
            List<string> testFiles = new List<string>(); 
            for (int i = 0; i < 200; i++) 
            { 
                testFiles.Add("D:\\test\\async\\file_" + i.ToString() + ".log"); 
            } 
              
            Hashtable result; 
  
            AsyncHelper.DoAsync(testFiles, 20, WriteFileAndReturnRowCount,false,out result); 
  
            Console.WriteLine("异步写耗时" + sw.ElapsedMilliseconds + "ms"); 
  
            Thread.Sleep(10); 
  
            if (result != null) 
            { 
                foreach (object key in result.Keys) 
                { 
                    Console.WriteLine("{0}={1}",  key,result[key]); 
                } 
  
            } 
  
            sw.Reset(); 
            sw.Start(); 
            for (int i = 0; i < 200; i++) 
            { 
                WriteFile("D:\\test\\sync\\file_" + i.ToString() + ".log"); 
            } 
  
            Console.WriteLine("同步写耗时" + sw.ElapsedMilliseconds + "ms"); 
  
            Console.Read(); 
        } 
  
        static void WriteFile(object objFilePath) 
        { 
            string filePath = (string)objFilePath; 
            string dir = Path.GetDirectoryName(filePath); 
            if (!Directory.Exists(dir)) 
            { 
                Directory.CreateDirectory(dir); 
            } 
  
            //Random r = new Random(DateTime.Now.Minute); 
            int rowCn = 10000; 
            using (StreamWriter writer = new StreamWriter(filePath, false, Encoding.Default)) 
            { 
                for (int i = 0; i < rowCn; i++) writer.WriteLine(Guid.NewGuid()); 
            } 
        } 
  
        static object WriteFileAndReturnRowCount(object objFilePath) 
        { 
            string filePath = (string)objFilePath; 
            string dir = Path.GetDirectoryName(filePath); 
            if (!Directory.Exists(dir)) 
            { 
                Directory.CreateDirectory(dir); 
            } 
  
            //Random r = new Random(DateTime.Now.Minute); 
            int rowCn = 10000; 
            using (StreamWriter writer = new StreamWriter(filePath, false, Encoding.Default)) 
            { 
                for (int i = 0; i < rowCn ; i++) writer.WriteLine(Guid.NewGuid()); 
            } 
            return DateTime.Now.ToLongTimeString(); 
        } 
    } 
}
复制代码

转自:http://www.cnblogs.com/yukaizhao/archive/2009/12/18/system_threading_threadpool_AsyncHelper_2.html

需求是从一个文件夹中读取N个文件,将N个文件平均分配到10个线程,然后分别读取每个文件里面的内容。经测试,发现在DoPrivate方法里面,每个datas.Count分配是对的,但到objStates.IsWaitForData(obj, true)方法总有不定数量返回false,因此最后没有完全执行,最后数量总少于N,而且不定,降至2个线程也一样,使用这种方法会造成漏数据,因此只能改进DoPrivate方法,去掉objStates.IsWaitForData方法判断。

本文转自欢醉博客园博客,原文链接http://www.cnblogs.com/zhangs1986/p/3625680.html如需转载请自行联系原作者


欢醉

相关文章
|
8月前
|
Java 索引
多线程向设备发送数据
多线程向设备发送数据
131 0
|
8月前
|
数据采集 监控 调度
干货分享“用 多线程 爬取数据”:单线程 + 协程的效率反超 3 倍,这才是 Python 异步的正确打开方式
在 Python 爬虫中,多线程因 GIL 和切换开销效率低下,而协程通过用户态调度实现高并发,大幅提升爬取效率。本文详解协程原理、实战对比多线程性能,并提供最佳实践,助你掌握异步爬虫核心技术。
|
消息中间件 监控 安全
服务Down机了,线程池中的数据如何保证不丢失?
在分布式系统与高并发应用开发中,服务的稳定性和数据的持久性是两个至关重要的考量点。当服务遭遇Down机时,如何确保线程池中处理的数据不丢失,是每一位开发者都需要深入思考的问题。以下,我将从几个关键方面分享如何在这种情况下保障数据的安全与完整性。
358 2
|
消息中间件 监控 Java
线程池关闭时未完成的任务如何保证数据的一致性?
保证线程池关闭时未完成任务的数据一致性需要综合运用多种方法和机制。通过备份与恢复、事务管理、任务状态记录与恢复、数据同步与协调、错误处理与补偿、监控与预警等手段的结合,以及结合具体业务场景进行分析和制定策略,能够最大程度地确保数据的一致性,保障系统的稳定运行和业务的顺利开展。同时,不断地优化和改进这些方法和机制,也是提高系统性能和可靠性的重要途径。
367 62
|
SQL 数据建模 BI
【YashanDB 知识库】用 yasldr 配置 Bulkload 模式作单线程迁移 300G 的业务数据到分布式数据库,迁移任务频繁出错
问题描述 详细版本:YashanDB Server Enterprise Edition Release 23.2.4.100 x86_64 6db1237 影响范围: 离线数据迁移场景,影响业务数据入库。 外场将部分 NewCIS 的报表业务放到分布式数据库,验证 SQL 性能水平。 操作系统环境配置: 125G 内存 32C CPU 2T 的 HDD 磁盘 问题出现的步骤/操作: 1、部署崖山分布式数据库 1mm 1cn 3dn 单线启动 yasldr 数据迁移任务,设置 32 线程的 bulk load 模式 2、观察 yasldr.log 是否出现如下错
|
安全 Java 开发者
【JAVA】封装多线程原理
Java 中的多线程封装旨在简化使用、提高安全性和增强可维护性。通过抽象和隐藏底层细节,提供简洁接口。常见封装方式包括基于 Runnable 和 Callable 接口的任务封装,以及线程池的封装。Runnable 适用于无返回值任务,Callable 支持有返回值任务。线程池(如 ExecutorService)则用于管理和复用线程,减少性能开销。示例代码展示了如何实现这些封装,使多线程编程更加高效和安全。
|
缓存 安全 Java
面试中的难题:线程异步执行后如何共享数据?
本文通过一个面试故事,详细讲解了Java中线程内部开启异步操作后如何安全地共享数据。介绍了异步操作的基本概念及常见实现方式(如CompletableFuture、ExecutorService),并重点探讨了volatile关键字、CountDownLatch和CompletableFuture等工具在线程间数据共享中的应用,帮助读者理解线程安全和内存可见性问题。通过这些方法,可以有效解决多线程环境下的数据共享挑战,提升编程效率和代码健壮性。
404 6
|
12月前
|
数据采集 存储 安全
Python爬虫实战:利用短效代理IP爬取京东母婴纸尿裤数据,多线程池并行处理方案详解
本文分享了一套结合青果网络短效代理IP和多线程池技术的电商数据爬取方案,针对京东母婴纸尿裤类目商品信息进行高效采集。通过动态代理IP规避访问限制,利用多线程提升抓取效率,同时确保数据采集的安全性和合法性。方案详细介绍了爬虫开发步骤、网页结构分析及代码实现,适用于大规模电商数据采集场景。
|
缓存 安全 Java
使用 Java 内存模型解决多线程中的数据竞争问题
【10月更文挑战第11天】在 Java 多线程编程中,数据竞争是一个常见问题。通过使用 `synchronized` 关键字、`volatile` 关键字、原子类、显式锁、避免共享可变数据、合理设计数据结构、遵循线程安全原则和使用线程池等方法,可以有效解决数据竞争问题,确保程序的正确性和稳定性。
386 57

热门文章

最新文章