多线程实践

简介: 一直对多线程不感冒,常见的场景下用不到这种技术,所以一直不愿去了解,但是遇到一些耗时的任务时就要考虑了。下面的例子是项目中遇到的,不想说这个如何的高深,只想说我也不是很了解到底如何工作的,到底带来了多少的效率提升。

     一直对多线程不感冒,常见的场景下用不到这种技术,所以一直不愿去了解,但是遇到一些耗时的任务时就要考虑了。下面的例子是项目中遇到的,不想说这个如何的高深,只想说我也不是很了解到底如何工作的,到底带来了多少的效率提升。书上的理论要多枯燥有多枯燥,枯燥的我都不想去动手写里面的例子,下面的例子是实际用到的,还有些意思,不管怎么说开个头吧。

  1.ManualResetEvent[] doEvents = new ManualResetEvent[threadCount];
  通知一个或多个正在等待的线程已发生事件。 谁通知谁,发生的事情是指?一头雾水

  2.ThreadPool.QueueUserWorkItem(new WaitCallback(DealData), new object[] { i, doEvents[i] });
  
线程池中创建一个线程池线程来执行指定方法(用委托WaitCallBack表示),并将该线程排入线程池的队列等待执行。

  3.doEvent.Set();
  将事件状态设置为终止状态,允许一个或多个等待线程继续。

  4.WaitHandle.WaitAll(doEvents);
   等待指定数组中的所有元素都收到信号。

来看下面的例子,
SendDealSucessEmailFromQueue()方法:threadCount是一个配置项,指定一次要开多少个线程,曾有人建议为了安全起见,机器是有好个核就开多少个线程这里假设是5,不知道有没有道理有待研究。dataNum也是一个配置项,指定一个线程要处理多少订单数据,这里假设是50。ManualResetEvent[] doEvents = new ManualResetEvent[threadCount],指定每次启动5个线程。然后循环doEvents[i] = new ManualResetEvent(false);使用false作为参数通知当前线程等待此线程完成之后再完成当前线程的工作。ThreadPool.QueueUserWorkItem(new WaitCallback(DealData), new object[] { i, doEvents[i] }); 在线程池中添加线程来执行DealData方法,给这个方法传递一个参数,这个参数是一个对象数组,数组的第一项是当前第几个线程,就是线程号,第二个是线程的事件(请原谅我这么翻译,貌似没有看到ManualResetEvent这个类的中文名字是什么,看样子像手动重置事件,我这里简称线程事件)。注意for循环外面的一句话WaitHandle.WaitAll(doEvents);,这里先不解释待说完循环中调用的DealData方法之后再说。

DealData(object eventParams)方法:上面说到给他传递的参数是一个对象数组,所以int threadId = (int)(((object[])eventParams)[0]); ManualResetEvent doEvent = (ManualResetEvent)(((object[])eventParams)[1]);这两句很好理解,就是从这个对象数组中把实参值转换过来。int tmpCount = orderList.Count / threadCount;计算每个线程要处理的订单数量,要说的是因为要判断存在有要处理的订单才会开线程来处理,所以使用静态变量orderList。下面的这个for循环需要耐心琢磨,其实在SendDealSucessEmailFromQueue()这个方法执行完之后就已经开了5个线程,这时每个线程在做些什么事情我们是不能控制的,只知道有5个线程像脱缰野马一样地跑,在执行DealData(object eventParams)的时候是不知道当前是的线程是这5匹马中的那一匹,所以在这个for循环中我们就需要给他们分配工作,具体来说就是第0个线程处理0-49个订单,第1个线程处理50-99个,第2个线程处理100-149个,第3个线程处理200-249个,注意我这里都是用数组下标来解释。分配完即执行,执行完之后调用doEvent.Set();将当前线程置为终止。

再回到SendDealSucessEmailFromQueue()方法的最后一句WaitHandle.WaitAll(doEvents);等待线程事件中的所有任务执行完成之后继续当前线程,就是继续往下跑了,就是主程序了,可以理解为回到Main()方法中了。

    /// <summary>
    /// 处理订单邮件
    /// </summary>
    public class OrderEmailProcess
    {
        /// <summary>
        /// 日志
        /// </summary>
        private static readonly ILog logger = LogManager.GetLogger(typeof(OrderEmailProcess));
        static List<Corp.CorpFlightADTKJob.Entity.FltOrderInfoEntity.OrderInfoEntity> orderList = new List<Entity.FltOrderInfoEntity.OrderInfoEntity>();
        static int threadCount = 0;
        static int dataNum = 0;

        /// <summary>
        /// 发送成交确认邮件
        /// 因为发邮件接口中占用了emailtype:8,所以使用4096,这里需要处理emailtype
        /// </summary>
        public static void SendDealSucessEmailFromQueue()
        {
            threadCount = int.Parse(ConfigurationManager.AppSettings["ThreadCount"]);
            dataNum = int.Parse(ConfigurationManager.AppSettings["DataNum"]);
            string logMsg = "";
            orderList = CorpProcessDB.GetDealConfirmEmailQueue(ConnStrFactory.CorpProcessDB_Select, threadCount * dataNum);
            if (orderList != null && orderList.Count > 0)
            {
                //记录日志
                for (int i = 0; i < orderList.Count; i++)
                {
                    logMsg += string.Format("OrderID:{0} Uid:{1} EmailType:{2}", orderList[i].OrderID, orderList[i].Uid, orderList[i].EmailType) + "\n";
                }
                logger.Info("自动发邮件", logMsg);
                ManualResetEvent[] doEvents = new ManualResetEvent[threadCount];
                for (int i = 0; i < threadCount; i++)
                {
                    doEvents[i] = new ManualResetEvent(false);
                    ThreadPool.QueueUserWorkItem(new WaitCallback(DealData), new object[] { i, doEvents[i] });
                }
                WaitHandle.WaitAll(doEvents);
            }
        }

        /// <summary>
        /// 处理线程方法
        /// </summary>
        /// <param name="eventParams"></param>
        private static void DealData(object eventParams)
        {
            int threadId = (int)(((object[])eventParams)[0]);      //当前线程ID
            ManualResetEvent doEvent = (ManualResetEvent)(((object[])eventParams)[1]);
            int tmpCount = orderList.Count / threadCount;       //平均每个线程处理数据

            for (int i = 0; i < orderList.Count; i++)
            {
                if (i < (threadId) * tmpCount)
                    continue;

                if ((i >= (threadId + 1) * tmpCount) && threadId != (threadCount - 1))
                    break;

                doDetailEvent(orderList[i]);
            }
            doEvent.Set();
        }

        /// <summary>
        /// 发邮件
        /// </summary>
        /// <param name="item"></param>
        /// <param name="threadId"></param>
        private static void doDetailEvent(Corp.CorpFlightADTKJob.Entity.FltOrderInfoEntity.OrderInfoEntity order)
        {
            int emailsendtime = order.EmailType;
            if ((emailsendtime & 8) == 8)
            {
                emailsendtime = emailsendtime + 4088;
            }
            CorpEmaiWSlHelper.SendCorpConfirmMail(order.OrderID, order.Uid, "N", emailsendtime, ""); 
        }
    }

如果你觉得上面的例子跑不起来,那么下面的例子可以让你在自己的机器上跑起来。这是一个web页面,OrderInfoEntity这个类写在另外一个文件中,这里和在一起展示。

    public partial class About : System.Web.UI.Page
    {
        int threadCount = 5;
        int dataNum = 50;
        static string msg = "";
        protected void Page_Load(object sender, EventArgs e)
        {
            msg = "";
            SendDealSucessEmailFromQueue();
            Response.Write(msg);
        }

        public List<OrderInfoEntity> orderList;
        public void SendDealSucessEmailFromQueue()
        {
            threadCount = 5;
            dataNum = 50;
            orderList = GetDealConfirmEmailQueue(threadCount * dataNum);
            if (orderList != null && orderList.Count > 0)
            {
                ManualResetEvent[] doEvents = new ManualResetEvent[threadCount];
                for (int i = 0; i < threadCount; i++)
                {
                    doEvents[i] = new ManualResetEvent(false);
                    ThreadPool.QueueUserWorkItem(new WaitCallback(DealData), new object[] { i, doEvents[i] });
                }
                WaitHandle.WaitAll(doEvents);
            }
        }

        public void DealData(object eventParams)
        {
            int threadId = (int)(((object[])eventParams)[0]);      //当前线程ID
            ManualResetEvent doEvent = (ManualResetEvent)(((object[])eventParams)[1]);
            int tmpCount = orderList.Count / threadCount;       //平均每个线程处理数据

            for (int i = 0; i < orderList.Count; i++)
            {
                if (i < (threadId) * tmpCount)
                    continue;

                if ((i >= (threadId + 1) * tmpCount) && threadId != (threadCount - 1))
                    break;

                doDetailEvent(orderList[i]);
            }
            doEvent.Set();
        }
        
        static void doDetailEvent(OrderInfoEntity order)
        {
            msg += order.OrderID.ToString() + "</br>";
        }

        /// <summary>
        /// 添加订单列表
        /// </summary>
        /// <param name="p"></param>
        /// <returns></returns>
        private static List<OrderInfoEntity> GetDealConfirmEmailQueue(int p)
        {
            List<OrderInfoEntity> result = new List<OrderInfoEntity>();
            for (int i = 0; i < p; i++)
            {
                OrderInfoEntity entity = new OrderInfoEntity(i + 1);
                result.Add(entity);
            }
            return result;
        }
    }

    public class OrderInfoEntity
    {
        private int _orderID;
        public int OrderID
        { get { return _orderID; } set { _orderID = value; } }
        public OrderInfoEntity(int orderid)
        {
            this._orderID = orderid;
        }
    }

 

 这个方法也是异步读取的方法,不过简单一些。

                #region 异步读取数据
                long logID1 = RequestManager.newSubRequest(), logID2 = RequestManager.newSubRequest(), logID3 = RequestManager.newSubRequest();
                Task<List<CorpFlightSearchFlightsEntity>> taskNormalFlts = Task.Factory.StartNew<List<CorpFlightSearchFlightsEntity>>(() => SearchNormalFlights(searchRequest, logID1));
                Task<List<CorpFlightSearchFlightsEntity>> taskCorpFlts = Task.Factory.StartNew<List<CorpFlightSearchFlightsEntity>>(() => SearchCorpFlightsOnly(searchRequest, logID2));
                Task<List<CorpFlightSearchFlightsEntity>> taskMultiSpecialFlts = null;
                if (searchRequest.FlightWay == "D")
                {
                    taskMultiSpecialFlts = Task.Factory.StartNew<List<CorpFlightSearchFlightsEntity>>(() => SearchMultiSpecialFlights(searchRequest, logID3));
                    Task.WaitAll(taskNormalFlts, taskCorpFlts, taskMultiSpecialFlts);
                }
                else
                {
                    Task.WaitAll(taskNormalFlts, taskCorpFlts);
                }

                normalFlts = taskNormalFlts.Result;
                corpFlts = taskCorpFlts.Result;
                multiSpecialFlts = taskMultiSpecialFlts != null ? taskMultiSpecialFlts.Result : null;
                #endregion

 据新来的高手说下面这样写才能保证速度,上面的方法只能提高吞吐量,并不能防止阻塞。

                Task<List<CorpFlightSearchFlightsEntity>> taskNormalFlts = Task.Factory.StartNew<List<CorpFlightSearchFlightsEntity>>(() => SearchNormalFlights(searchRequest, logID1));
                Task<List<CorpFlightSearchFlightsEntity>> taskCorpFlts = Task.Factory.StartNew<List<CorpFlightSearchFlightsEntity>>(() => SearchCorpFlightsOnly(searchRequest, logID2));
                Task<List<CorpFlightSearchFlightsEntity>> taskMultiSpecialFlts = null;
                if (searchRequest.FlightWay == "D")
                {
                    taskMultiSpecialFlts = Task.Factory.StartNew<List<CorpFlightSearchFlightsEntity>>(() => SearchMultiSpecialFlights(searchRequest, logID3));
                    //Task.WaitAll(taskNormalFlts, taskCorpFlts, taskMultiSpecialFlts);
                    normalFlts = taskNormalFlts.ContinueWith(t => t.Result).Result;
                    corpFlts = taskCorpFlts.ContinueWith(t => t.Result).Result;
                    multiSpecialFlts = taskMultiSpecialFlts.ContinueWith(t => t.Result).Result;
                }
                else
                {
                    //Task.WaitAll(taskNormalFlts, taskCorpFlts);
                    normalFlts = taskNormalFlts.ContinueWith(t => t.Result).Result;
                    corpFlts = taskCorpFlts.ContinueWith(t => t.Result).Result;
                }

                normalFlts = taskNormalFlts.Result;
                corpFlts = taskCorpFlts.Result;
                multiSpecialFlts = taskMultiSpecialFlts != null ? taskMultiSpecialFlts.Result : null;

 还有一种简写方法,在没有前后依赖关系,没有返回值的情况下还可以使用Parallel.Invoke方法,如下:

                if (this._appFlightsList.Count > 0)
                {
                    Parallel.Invoke(() =>
                    {
                        
                        Parallel.ForEach(_appFlightsList, ShowFlightsUICommon.SetAppFlight);
                        
                        this._appFlightsList.Sort(AppFlightSorter.CorpSort);
                    },
                    () =>
                    {
                        
                        if (this._passengerTypeString.Trim().ToUpper() == "ADU")
                        {
                            
                            if (this._lowestPrices != null && this._lowestPrices.Count > 0)
                            {
                                
                                this._lowestPrices.Sort(LowestPriceSoter.CompareLowestPriceByDate);
                            }
                            
                            ShowFlightsUICommon.SetCorpSingleTripAppSpecialSummary4OnlineCNOnly(
                                this._specialSummary, corpFlightsResponse.SingleTripSummaryList);
                            
                            if (this._corpSearchMultipleSpecialResponse != null &&
                                this._corpSearchMultipleSpecialResponse.Groups.Count > 0)
                            {
                                ShowFlightsUICommon.SetCorpMultipleAppSpecialSummary(
                                    this._specialSummary,
                                    this._corpSearchMultipleSpecialResponse.Groups);
                            }
                            
                            if (this._corpSearchTransitSpecialResponse != null &&
                                this._corpSearchTransitSpecialResponse.LowGroups.Count > 0)
                            {
                                ShowFlightsUICommon.SetCorpTransitAppSpecialSummary(
                                    this._specialSummary,
                                    this._corpSearchTransitSpecialResponse.LowGroups);
                            }

                            
                            ShowFlightsUICommon.SetCorpAppSpecialPrice(this._specialSummary,
                                                                        this._IsCalcNetPrice,
                                                                        this._SaleRate);
                            
                            FlightProcess.FlightsFilterWithAccount(this._specialSummary,
                                                                    this._AccountID);
                        }
                    });
                }

 

 

 

 

作者:Tyler Ning
出处:http://www.cnblogs.com/tylerdonet/
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,如有问题,可以通过以下邮箱地址williamningdong@gmail.com  联系我,非常感谢。

目录
相关文章
|
2月前
|
并行计算 Java 数据处理
SpringBoot高级并发实践:自定义线程池与@Async异步调用深度解析
SpringBoot高级并发实践:自定义线程池与@Async异步调用深度解析
208 0
|
15天前
|
存储 监控 小程序
Java中的线程池优化实践####
本文深入探讨了Java中线程池的工作原理,分析了常见的线程池类型及其适用场景,并通过实际案例展示了如何根据应用需求进行线程池的优化配置。文章首先介绍了线程池的基本概念和核心参数,随后详细阐述了几种常见的线程池实现(如FixedThreadPool、CachedThreadPool、ScheduledThreadPool等)的特点及使用场景。接着,通过一个电商系统订单处理的实际案例,分析了线程池参数设置不当导致的性能问题,并提出了相应的优化策略。最终,总结了线程池优化的最佳实践,旨在帮助开发者更好地利用Java线程池提升应用性能和稳定性。 ####
|
16天前
|
缓存 Java 开发者
Java多线程并发编程:同步机制与实践应用
本文深入探讨Java多线程中的同步机制,分析了多线程并发带来的数据不一致等问题,详细介绍了`synchronized`关键字、`ReentrantLock`显式锁及`ReentrantReadWriteLock`读写锁的应用,结合代码示例展示了如何有效解决竞态条件,提升程序性能与稳定性。
54 6
|
16天前
|
安全 Java 开发者
Java中的多线程编程:从基础到实践
本文深入探讨了Java多线程编程的核心概念和实践技巧,旨在帮助读者理解多线程的工作原理,掌握线程的创建、管理和同步机制。通过具体示例和最佳实践,本文展示了如何在Java应用中有效地利用多线程技术,提高程序性能和响应速度。
52 1
|
25天前
|
Java 开发者
Java多线程编程的艺术与实践####
本文深入探讨了Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的技术文档,本文以实战为导向,通过生动的实例和详尽的代码解析,引领读者领略多线程编程的魅力,掌握其在提升应用性能、优化资源利用方面的关键作用。无论你是Java初学者还是有一定经验的开发者,本文都将为你打开多线程编程的新视角。 ####
|
1月前
|
存储 安全 Java
Java多线程编程的艺术:从基础到实践####
本文深入探讨了Java多线程编程的核心概念、应用场景及其实现方式,旨在帮助开发者理解并掌握多线程编程的基本技能。文章首先概述了多线程的重要性和常见挑战,随后详细介绍了Java中创建和管理线程的两种主要方式:继承Thread类与实现Runnable接口。通过实例代码,本文展示了如何正确启动、运行及同步线程,以及如何处理线程间的通信与协作问题。最后,文章总结了多线程编程的最佳实践,为读者在实际项目中应用多线程技术提供了宝贵的参考。 ####
|
29天前
|
监控 安全 Java
Java中的多线程编程:从入门到实践####
本文将深入浅出地探讨Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的摘要形式,本文将以一个简短的代码示例作为开篇,直接展示多线程的魅力,随后再详细解析其背后的原理与实现方式,旨在帮助读者快速理解并掌握Java多线程编程的基本技能。 ```java // 简单的多线程示例:创建两个线程,分别打印不同的消息 public class SimpleMultithreading { public static void main(String[] args) { Thread thread1 = new Thread(() -> System.out.prin
|
1月前
|
Java UED
Java中的多线程编程基础与实践
【10月更文挑战第35天】在Java的世界中,多线程是提升应用性能和响应性的利器。本文将深入浅出地介绍如何在Java中创建和管理线程,以及如何利用同步机制确保数据一致性。我们将从简单的“Hello, World!”线程示例出发,逐步探索线程池的高效使用,并讨论常见的多线程问题。无论你是Java新手还是希望深化理解,这篇文章都将为你打开多线程的大门。
|
1月前
|
缓存 Java 调度
Java中的多线程编程:从基础到实践
【10月更文挑战第24天】 本文旨在为读者提供一个关于Java多线程编程的全面指南。我们将从多线程的基本概念开始,逐步深入到Java中实现多线程的方法,包括继承Thread类、实现Runnable接口以及使用Executor框架。此外,我们还将探讨多线程编程中的常见问题和最佳实践,帮助读者在实际项目中更好地应用多线程技术。
26 3
|
1月前
|
监控 安全 Java
Java多线程编程的艺术与实践
【10月更文挑战第22天】 在现代软件开发中,多线程编程是一项不可或缺的技能。本文将深入探讨Java多线程编程的核心概念、常见问题以及最佳实践,帮助开发者掌握这一强大的工具。我们将从基础概念入手,逐步深入到高级主题,包括线程的创建与管理、同步机制、线程池的使用等。通过实际案例分析,本文旨在提供一种系统化的学习方法,使读者能够在实际项目中灵活运用多线程技术。