C#并發實戰Parallel.ForEach使用

     前言:最近給客戶開發一個伙食費計算系統,大概需要計算2000個人的伙食。需求是按照員工的預定報餐計劃對消費記錄進行檢查,如有未報餐有刷卡或者有報餐沒刷卡的要進行一定的金額扣減等一系列規則。一開始我的想法比較簡單,直接用一個for循環搞定,統計結果倒是沒問題,但是計算出來太慢了需要7,8分鐘。這樣系統服務是報超時錯誤的,讓人覺得有點不太爽。由于時間也不多就就先提交給用戶使用了,后面邏輯又增加了,計算時間變長,整個計算一遍居然要將近10分鐘了。這個對用戶來說是能接收的(原來自己手算需要好幾天呢),但是我自己接受不了,于是就開始優化了,怎么優化呢,用多線程唄。

     一提到多線程,最先想到的是Task了,畢竟.net4.0以上Task封裝了很多好用的方法。但是Task畢竟是多開一些線程去執行任務,最后整合結果,這樣可以快一些,但我想更加快速一些,于是想到了另外一個對象:Parallel。之前在維護代碼是確實有遇到過別人寫的Parallel.Invoke,只是指定這個函數的作用是并發執行多項任務,如果遇到多個耗時的操作,他們之間又不貢獻變量這個方法不錯。我的情況是要并發執行一個集合,于是就用了List.ForAll 這個方法其實是拓展方法,完整的調用為:List.AsParallel().ForAll,需要先轉換成支持并發的集合,等同于Parallel.ForEach,目的是對集合里面的元素并發執行一系列操作。

     于是乎,把原來的foreach換成了List.AsParallel().ForAll,運行起來,果然速度驚人,不到兩分鐘就插入結果了,但最后卻是報主鍵重復的錯誤,這個錯誤的原因是,由于使用了并發,這個時候變量自增,其實是在強著自增,當多個線程同時獲取到了id值,都去自增然后就重復了,舉個例子如下:

            int num = 1;
            List<int> list = new List<int>();
            for (int i = 1; i <= 2000; i++)
            {
                list.Add(i);
            }
            Console.WriteLine($"num初始值為:" + num.ToString());
            list.AsParallel().ForAll(n =>
            {
                num++;
            });
            Console.WriteLine($"不加鎖,并發{list.Count}次后為:" + num.ToString());
            Console.ReadKey();

這段代碼是讓一個變量執行2000次自增,正常結果應該是2001,但實際結果如下:

有經驗的同學,立馬能想到需要加鎖了,C#內置了很多鎖對象,如lock 互斥鎖,Interlocked 內部鎖,Monitor 這幾個比較常見,lock內部實現其實就是使用了Monitor對象。對變量自增,Interlocked對象提供了,變量自增,自減、或者相加等方法,我們使用自增方法Interlocked.Increment,函數定義為:int Increment(ref int num),該對象提供原子性的變量自增操作,傳入目標數值,返回或者ref num都是自增后的結果。 在之前的基礎上我們增加一些代碼:

           num = 1;
            Console.WriteLine($"num初始值為:" + num.ToString());
            list.AsParallel().ForAll(n =>
            {
                Interlocked.Increment(ref num);
            });
            Console.WriteLine($"使用內部鎖,并發{list.Count}次后為:" + num.ToString());
            Console.ReadKey();

我們來看運行結果:

加了鎖之后ID重復算是解決了,其實別高興太早,由于正常的環境有了ID我們還有用這些ID來構建對象呢,于是又寫了寫代碼,用集合來添加這些ID,為了更真實的模擬生產環境,我在forAll里面又加了一層循環代碼如下:

            num = 1;
            Random random = new Random();
            var total = 0;
            var m = new ConcurrentBag<int>();
            list.AsParallel().ForAll(n =>
            {
                var c = random.Next(1, 50);
                Interlocked.Add(ref total, c);
                for (int i = 0; i < c; i++)
                {
                    Interlocked.Increment(ref num);
                    m.Add(num);
                }
            });
            Console.WriteLine($"使用內部鎖,并發+內部循環{list.Count}次后為:" + num.ToString());
            Console.WriteLine($"實際值為:{total + 1}");
            var l = m.GroupBy(n => n).Where(o => o.Count() > 1);
            Console.WriteLine($"并發里面使用安全集合ConcurrentBag添加num,集合重復值:{l.Count()}個");
            Console.ReadKey();

上面的代碼里面我用到了線程安全集合ConcurrentBag<T>它的命名空間是:using System.Collections.Concurrent,盡管使用了線程安全集合,但是在并發面前仍然是不安全的,到了這里其實比較郁悶了,自增加鎖,安全集合內部應該也使用了鎖,但還是重復了。有點說不過去了,想想多線程執行時有個上下文對象,即當多個線程同時執行任務,共享了變量他們一開始傳進去的對象數值應該是相同的,由于變量自增時加了鎖,所以ID是不會重復了。我猜測問題應該出在Add方法了,就是說當num值自增后還沒有來得及傳出去就已經執行了Add方法,故添加了重復變量。于是乎,我重新寫了段代碼,讓ID自增和集合添加都放到鎖里面:

            num = 1;
            total = 0;
            using (var q = new BlockingCollection<int>())
            {
                list.AsParallel().ForAll(n =>
                {
                    var c = random.Next(1, 50);
                    Interlocked.Add(ref total, c);
                    for (int i = 0; i < c; i++)
                    {
                        
                       // Task.Delay(100);
                        q.Add(Interlocked.Increment(ref num));
                        
                        //可控
                        //lock (objLock)
                        //{
                        //    num++;
                        //    q.Add(num);
                        //}
                    }

                });
                q.CompleteAdding();
                Console.WriteLine($"num累計值為:{total},并發之后值為:{num}");
                var x = q.GroupBy(n => n).Where(o => o.Count() > 1);
                Console.WriteLine($"并發使用安全集合BlockingCollection+Interlocked添加num,集合重復值:{x.Count()}個");
                Console.ReadKey();
            }

這里我測試了另外一個線程安全的集合BlockingCollection,關于這個集合的使用請自行查找MSDN文檔,上面的關鍵代碼直接添加安全集合的返回值,可以保證集合不會重復,但其實下面的lock更適用與正式環境,因為我們添加的一般都是對象不會是基礎類型數值,運行結果如下:

至此,我們的問題解決了,計算時間由原來的9分多降至110秒左右,可見Parallel的處理還是很給力的,唯一不足的是,很占CPU,執行計算后CPU達到了88%。附上計算結果:

優化前后對比

 

      總結:C#安全集合在并發的情況下其實不一定是安全的,還是需要結合實際應用場景和驗證結果為準。Parallel.ForEach在對循環數量可觀的情況下是可以去使用的,如果有共享變量,一定要配合鎖做同步處理?;故塹蒙饔謎飧齜椒?,如果方法內部有操作數據庫的記得增加事務處理,否則就呵呵了。

posted @ 2019-08-10 08:15 <漁人> 閱讀(...) 評論(...) 編輯 收藏