2014/11/6

[入門] .Net 非同步處理與同步機制全解析 (三)

.Net 4.5 新增的 Async/Await 功能也出來一陣子了, 我相信有許多人都已經對它相當熟悉。不過我在本文中要把重點往回拉一點點, 談一下 .Net 中早已提供的非同步功能。坦白說, 這才是我一開始想寫的東西。

在「[入門] .Net 非同步處理與同步機制全解析 (一)」一文中, 我們已經看過以手動方式叫出執行緒以進行非同步作業的方法。在這裡, 我要介紹以委派方式以進行非同步作業的方法

如果你不熟悉「委派」(Delegate) 的話, 你可以參考「ASP.NET 事件與委派詳論」一文, 順便復習一下「委派」與「事件」之間的密切關聯。基本上, 委派本質上可以說就是所謂的 "Function Pointer"。而且, 其實在 .Net 中, 委派的機制其實還套用了某種設計模式, 讓使用者可以以廣播方式進行訊息傳遞。

不過若以最簡單的方式來講, 你可以把委派當作可以指向其它方法的方法, 只要參數型式相同即可 (無參數也可以)。

前置作業

在繼續說明之前, 請先建立一個 Windows Form 專案, 將它命名為 AsyncExample。

在這個範例中, 我要拿我在「[入門] .Net 非同步處理與同步機制全解析 (一)」一文中用過的 GZip 類別。所以, 請在專案中加入一個類別, 命名為 GZip。這個類別的內容如下 (我已略加修改過, 和原來的程式不同):

// 程式一
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Text;
using System.Threading;
...
public class GZip
{
    public static int seeder = 0;

    public static void CompressToFile(byte[] input)
    {
        string outputFile = string.Format("{0}.zip", Interlocked.Increment(ref seeder));
        using (FileStream fsOutput = new FileStream(outputFile, FileMode.Create, FileAccess.Write))
        using (GZipStream zip = new GZipStream(fsOutput, CompressionMode.Compress))
            zip.Write(input, 0, input.Length);
    }

    public static void CompressToMemory(byte[] input)
    {
        byte[] buffer = new byte[input.Length];
        using (MemoryStream ms = new MemoryStream(buffer))
        using (GZipStream zip = new GZipStream(ms, CompressionMode.Compress))
            zip.Write(buffer, 0, input.Length);
    }

    public static byte[] GetMemoryStream(string inputFile)
    {
        using (FileStream fs = new FileStream(inputFile, FileMode.Open))
        using (MemoryStream ms = new MemoryStream())
        {
            ms.SetLength(fs.Length);
            fs.Read(ms.GetBuffer(), 0, (int)fs.Length);
            fs.Flush();
            return ms.ToArray();
        }
    }
}

和上一篇一樣, 這個類別並不是本文的重點, 它只是用來執行較耗費系統資料的工作, 以模擬實際狀況中耗時較久的作業而已。不過這不表示這個類別真的是一無是處的; 如果你有需要, 你仍然可以把它拿去稍加修改以進行檔案的壓縮。

在這個類別中, GetMemoryStream() 方法是用來把一個檔案讀入並轉換成一個 byte[] 物件以備其它兩個方法使用。CompressToMemory() 方法是把前述轉換好的 byte[] 物件予以壓縮並存放在另一個 byte[] 物件 buffer 裡。而 CompressToFile() 方法則是把壓縮後的 byte[] 內容寫入檔案裡。它會把檔案以 0.zip、1.zip 的方式命名。

在程式中我使用了 Interlock.Increment() 方法以確保在大量發出非同步作業時不會產生重複的值。我在以下的程式中會再使用一次這個做法, 稍後再一併說明。

委派與非同步

我在「[入門] .Net 非同步處理與同步機制全解析 (一)」一文中已介紹過如何建立 Thread 物件以發動一個非同步作業, 在「[入門] .Net 非同步處理與同步機制全解析 (二)」一文中則介紹過如何使用 .Net 4.5 中新加入的 Async/Await 功能。現在, 我要介紹另一種做法, 就是透過委派 (delegate) 來達成幾乎完全相同的功能。

首先, 請先宣告一個 delegate 如下:

public delegate void Compress(byte[] input);

如同我在「ASP.NET 事件與委派詳論」一文介紹過的, 當我們在撰寫 .Net 程式時, 我們其實已經隨時在使用它的委派機制, 只是初學者也許並沒有感覺到而已。當使用者在一個 ASP.NET 網頁中按下按鈕時, 他正透過控制項的 OnClick 方法觸發了一個事件, 而其背後則是透過 .Net 的委派機制進行著非同步作業。想想看, 那時候 ASP.NET 可能正在處理其他使用者的其它要求, 所以這當然是個非同步作業。

事實上, 我們在上面宣告了 Compress 這個 delegate 型別之後, .Net 會等同於在背後建立一個繼承了 MultiCastDelegate 的類別, 而這個類別提供了 BeginInvoke() 和 EndInvoke() 兩個方法:

// 程式二
public sealed class Compress: MulticastDelegate
{
    public IAsyncResult BeginInvoke(byte[] input, AsyncCallback cb, object state);
    public int EndInvoke(IAsyncResult result);
}

當然, 並不是真的有這個類別存在, 但實際上我們的確可以依照上述類別的用法來使用這個 Compress 型別。這種做法屬於「非同步程式設計模型 」(Asynchronous Programming Model, 簡稱 APM) 的一種, 而我在「[入門] .Net 非同步處理與同步機制全解析 (二)」一文中所介紹的 Async/Await 則是屬於「工作架構非同步模式」(Task-based Asynchronous Pattern, 簡稱 TAP)。

現在, 我們拉回來重新看看 Compress 的宣告方式:

public delegate void Compress(byte[] input);

在這裡, Compress 所使用的參數型式 (亦即 "byte[] xxx") 跟程式一裡 "GZip.CompressToMemory(byte[] input)" 和 "GZip.CompressToFile(byte[] input)" 必須一致。這就是我在上面講過的,「你可以把委派當作可以指向其它方法的方法, 只要參數型式相同即可」; 就是這個意思。這裡所謂的「參數型式」, 指的就是方法中參數 (parameter) 的指定方式, 例如, 如果有幾個方法都接受相同的參數型式, 像 (byte[] xxx), 那麼我們就可以使用 delegate 來做委派和切換, 不管這幾個方法是不是屬於相同的類別。不過, 在 MSDN 文件中, 它習慣以「簽章」(signature) 來稱呼我這裡所謂的「參數型式」; 讀者們請留意一下。

換句話說, 如果你在上述 GZip 中把其中一個方法所採用的參數型式改變了 (例如 "GZip.CompressToFile(string fileName)" ), 待會這個 delegate 就會無法在這兩個方法之間切換 (因為 delegate 是認參數型式而不是認方法的)。

使用委派發動非同步作業

宣告 delegate 型別之後, 接著, 我們要來建立這個 delegate 物件:

Compress compress = new Compress(GZip.CompressToMemory);

對於初學者而言, 可能會對上述寫法感覺陌生。的確, delegate 的宣告和實作和一般類別看來有點像、又不太一樣; 你只要記得, delegate 並不是類別, 但是它的確是一種型別, 其位階和 class、struct、enum... 等等是一樣的。縱使它的宣告方式有點奇怪, 如果你把它當作一個型別來使用, 就沒有什麼奇怪之處了。

話雖如此, 但 .Net 為 delegate 自動在背後建立的兩個方法 (見程式二) 和其參數與回傳型別, 你只能在 VS 的 IDE 裡以 Intellisense 方式查到, 你並無法在 MSDN 裡方便地找到它們的說明。不過, 如果需要的話, 你可以隨時回來本文查閱程式二。

接著, 我們可以透過這個 delegate 物件的 BeginInvoke() 方法發動一個非同步作業:

// 程式三
byte[] input = GZip.GetMemoryStream(file);
IAsyncResult iar = compress.BeginInvoke(input,
    new AsyncCallback(compressed),
    "Async job " + i + " has completed.");

在這裡, BeginInvoke() 會回傳一個 IAsyncResult 物件, 而它的倒數第二個參數, 是一個 AsyncCallback 物件, 其中我代入了一個叫做 compressed 的回呼方法 (待會說明) ; 而它的倒數第一個參數, 則是一個自由參數 (型別為 object), 看你想帶什麼值進去都可以。

至於第一個參數, 則使用了我們在宣告這個 delegate 時使用的參數型式。在這裡我們使用了 "byte[] input"。如果你使用的參數型式不是 "byte[] xx", 而是, 例如 "int x, int y", 那麼它的寫法應該改成:

int x, y;
IAsyncResult iar = compress.BeginInvoke(x, y,
    new AsyncCallback(compressed),
    "Async job " + i + " has completed.");

在程式三中, 當這個 BeginInvoke() 方法執行之後, 緊跟其後的程式會繼續執行, 而被 BeginInvoke() 方法呼叫的程式 (即 GZip.CompressToMemory() 方法) 則同時在另一個執行緒中以非同步方式執行。等到 GZip.CompressToMemory() 方法結束, 剛才指定的回程程序 (即 compress() 方法) 會被呼叫執行:

// 程式四
void compressed(IAsyncResult iar)
{
    jobsDone = Interlocked.Increment(ref seed); // Ensure atomic operations
    //jobsDone = ++seed; // By doing this way, this application may never end
    msg += string.Format("{0} on thread #{1}.\r\n",
        (string)iar.AsyncState,
        Thread.CurrentThread.ManagedThreadId);

    AsyncResult ar = (AsyncResult)iar;
    Compress c = (Compress)ar.AsyncDelegate;
    c.EndInvoke(iar);
}

在這個回呼程序中, 一個 IAsyncResult 物件會被帶入, 它實際上就是我們使用 BeginInvoke() 方法發動非同步作業時的那個回傳值 (見程式三)。透過這個物件, 我們可以取得 state 資訊 (參考程式二), 也就是我們在程式三塞入的 "Async job " + i + " has completed." 這行字。不過, 更重要的是, 我們可以對這個物件執行 EndInvoke() 方法, 以結束該筆非同步作業。當然, 我們並不一定要採用這種做法, 不過在回呼程序中執行 EndInvoke() 在多數情況下是一種保險的做法。

在上述程式中, 我使用 jobsDone 這個變數來計算總共有多少筆作業已執行完畢, 另外使用 msg 這個變數來儲存訊息。其中 jobsDone 屬於不可丟失的關鍵資料, 所以必須使用 Interlocked 類別的方法以確保資訊的準確性 (待會會再提到)。但 msg 則屬於參考性質的資訊, 在某些情況下, 我們可能可以觀察到有些資訊的確遺失了。

發動平行作業

使用 GZip 來做壓縮, 的確是個耗時的作業。但是現在的處理器都很快, 如果我們真的要模擬非常耗時的作業, 那麼一次發動很多個沒那麼耗時的作業, 也許是個好方法。

在本文的範例程式中, 我要讓這些非同步作業執行 100、1000、甚至 10000 次:

// 程式五
int repeat = 100,
     jobsDone = 0;
byte[] input = GZip.GetMemoryStream(file);
Compress compress =  new Compress(GZip.CompressToMemory);
for (int i = 0; i < repeat; i++)
{
    IAsyncResult iar = compress.BeginInvoke(input,
            new AsyncCallback(compressed),
            "Async job " + i + " has completed");
}
while (jobsDone < repeat)
{
    pb.Visible = true;
    int progress = (int)((float)(jobsDone+1) / (float)repeat * 100);
    pb.Value = progress;
}

要做這類工作, 其實使用我在「[入門] .Net 非同步處理與同步機制全解析 (一)」一文中介紹過的 Parallel.For() 方法是最快的。但是如果我們不使用 Paralle.For(), 就可以採用如上的做法。

避免競逐問題

請注意我在程式五裡最後的 while 迴圈。這裡 pb 是一個 Windows Form 的 Progress Bar 控制項, 它可以在所有非同步作業尚未完成時用來顯示作業進度, 它的 UI 並不會被凍住。依照我的寫法, 這裡的 jobsDone 變數值必須非常精確, 不能丟失, 否則這個 while 迴圈就會永遠執行不完。

在平行作業中有件事情非常重要, 可能在同一個時間裡有多個處理緒同時存取同一個變數 (例如這個 jobsDone 變數)。這個變數在 compressed 方法中 (見程式四) 會被逐次加一。如果剛好有兩個程序同時將這個值加一, 那麼, 例如這個值原本是 0, 被兩個程序加一後, 應該變成 2, 但是由於這兩個程序同時做加一的動作, 結果它的值變成了 1, 而不是 2。這種現象就是俗稱的「競逐」(competetion) 現象。

要避免這個現象發現, .Net 在 Threading 命名空間下提供了 InterLocked 類別, 其下提供了幾個方法確保「單元作業」(Atomic Operation) 的執行。InterLocked.Increment() 方法就是其中的一種。換句話說, 若使用 Interlocked.Increment() 方法將一個值做加一的動作, 它絕對不會發生如上述一個變數值應該為 2 卻變成 1 的情況。

在程式四中, 如果你把 jobsDone = Interlocked.Increment(ref seed); 這行指令改成 jobsDone = ++seed; 這行指令, 在某種情況下 (例如採用 CompressToMemory 方法, 將重複次數增加到 10000 次), 你就有可能會發現程式執行不完。這是因為 jobsDone 的值永遠不會等於 repeat。這就是因為競逐問題而引起的。

程式範例

這個 Windows Form 專案可以在此下載。請使用 VS2013 (或以上) 開啟。

程式執行後, 你可以選擇 Compress to memory 或是 Compress to file。在稍右方的文字框中, 可以選擇輸入執行的迴圈數, 最小為 100, 最大為 10000。輸入完成後, 按下 Start 按鈕就會開始執行。

如果你選擇 Compress to file, 程式會在執行子目錄下產生 *.zip 的壓縮檔。壓縮的來源是一張 1.5M 大小左右的 JPG 圖檔 Dragon.jpg。你可以自行把這張圖換成更大的, 這樣效果會更顯著一點。不過, 提醒你一點, 我建議你不要把迴圈數設得太大, 維持為 100 就好 (尤其是如果你的 RAM 甚至不到 8G 的話)。否則它真的會佔去太多系統資源, 搞不好連 Visual Studio 恐怕都會沒有回應。

在執行期間, 整個應用程式的 UI 都不會有回應, 只有下方的 Progress Bar 會顯示目前的進度。這裡我使用的 ProgressBar 並不是平常的 ProgressBar 控制項, 而是 ToolStripProgressBar 控制項。唯有使用 ToolstripProgressBar 控制項才能在非同步作業時, 仍然呈現目前的進度, 它的 UI 不會被凍住。

前面已經講過, 我使用 Interlocked.Increment() 以確保 jobsDone 變數值不會受競逐現象影響, 但是很顯然 msg 變數就會受競逐現象所影響。我把程式的部份輸出拷貝到文書處理器, 你可以看看它的行號:

如上圖, 在程式中, 若選擇 Compress to memory 選項, 然後將迴圈數設定為 10000, 當執行完畢後, 會發現 msg 變數中所儲存的文字行數可能不到 10000 (像上圖只有 9989)。這表示總共有 11 行因為受到競逐現象而憑空消失了。

因此, 各位在進行非同步或平行運算時, 一定要特別留意競逐問題。對於關鍵性的資料 (例如本例中的 jobsDone 變數), 一定要採用所有可能的方法保證其正確, 否則將會很難除錯。

現在假設這個 msg 儲存的訊息也是關鍵資訊, 該怎麼處理? 這個變數的型別是 string, 不是 int, 我們沒辦法使用 Interlocked 之下的各種方法來銷定它。

遇到這種情況, 我們可以使用 Monitor.Enter()Monitor.Exit() 方法來將變數進鎖定。請把原來的程式修改如下:

Monitor.Enter(msg);
try
{    
    msg += string.Format("{0} on thread #{1}.\r\n",
        (string)iar.AsyncState,
        Thread.CurrentThread.ManagedThreadId);
}
finally
{
    Monitor.Exit(msg);
}

使用如上的方法, 就可以將以 try 區段包住的程式碼轉變為 atomic operation。這麼做之後, 果然就不會有資訊逸失的問題了, 如下圖:

其實, 也許是因為這個鎖定區段的動作太常被用到了, C# 提供了一種便捷語法 (shorthand), 讓你可以把上一段程式編寫如下:

// 程式六
lock (msg)
{
    msg += string.Format("{0} on thread #{1}.\r\n",
        (string)iar.AsyncState,
        Thread.CurrentThread.ManagedThreadId);;
}

如果你這麼寫的話, C# 編譯程式會自動幫你把它轉換成 Monitor.Enter() / Exit() 的程式 (其實 C# 4.0 之前和之後所轉換出來的程式略微有點不同)。對於一般使用者, 使用 lock 指令會是比較簡便的做法。如果你是 VB 使用者的話, 請注意它的對應語法是 SyncLock 而不是 lock。

請注意, 這類鎖定物件/區段的語法或方法並不僅限於本文所述的非同步做法; 無論是 APM 或 TAP, 它們在本系列文章裡介紹的絕大多數情況下都是同樣適用的。

相關文章

  1. [入門] .Net 非同步處理與同步機制全解析 (一)
  2. [入門] .Net 非同步處理與同步機制全解析 (二)
  3. [入門] .Net 非同步處理與同步機制全解析 (三)

沒有留言:

張貼留言