位元詩人 [Raku] 程式設計教學:共時性 (Concurrency) (高階 API)

Facebook Twitter LinkedIn LINE Skype EverNote GMail Yahoo Email

由於 CPU 的時脈已經到物理上限,現在的硬體都往多核心、多 CPU 發展。同樣地,單一的大型伺服器相當昂貴,而且擴充量有限,使用多台主機組成的叢集則相對易於擴充。然而,若程式碼沒有使用共時性 (concurrency) 的特性來撰寫,則無法真正發揮平行處理 (parallel computing) 所帶來的效能提升。

Raku 提供兩套共時性 API,一套是高階 API,另一套則是低階 API。高階 API 提供一些抽象的物件,像是 Promise、Supply、Channel 等,程式設計者不需擔心底層的虛擬機器實作,可專心在撰寫共時性程式上。低階 API 則提供較為傳統的共時性物件,包括 Thread 和 Lock (即 mutex) 等。除非有特別的理由,應優先使用高階 API。

本文介紹高階 API。

附註

Perl 6 已經正式更名為 Raku,我們沿用新的名稱來稱呼這個語言。

Promise

Promise 是 R 執行共時性程式區塊的基本單位,在別的語言也稱為 Future。

建立 Promise

以下範例啟動一個 Promise:

my $p = Promise.start({
    say "Hello from a Promise";
});

await $p;

由於 Promise 是共時執行,我們會用 await 等待該 Promise 結束。

也可以用以下較為簡略的寫法:

my $p = start {
    say "Hello from a Promise";
};

await $p;

Promise 也可以回傳資料,如下例:

my $p = start {
    (1..10).reduce({ $^a + $^b });
}

my $sum = await $p;
$sum == 55 or die "Wrong value";

如果需要確保 Promise 的順序,可以使用 then 方法,如下例:

my $p = Promise.start({
    say "Hello from first promise";
})
.then({
    say "Hello from second promise";
});

await $p;

由於 Promise 區塊是共時發生的,無法保證其順序。例如,多執行幾次以下程式,會發現每次順序會略為不同:

my $p1 = start {
    say "Message from promise 1";
};

my $p2 = start {
    say "Message from promise 2";
};

my $p3 = start {
    say "Message from promise 3";
};

await $p1, $p2, $p3;

Promise 可指定在特定秒數後才執行,如下例:

my $p = Promise
    .in(1)
    .then({ say "Hello from promise"; });

await $p;

Promise 的狀態

Promise 建立後,有三個狀態:

  • Planned:Promise 建立時的狀態
  • Kept:Promise 執行成功
  • Broken:Promise 執行失敗

必要時,可以手動控制狀態。在下例中,我們將 Promise 的狀態調到 Kept

my $p1 = Promise.new;
say $p1.status;

$p1.keep("result");
say $p1.status;
say $p1.result;

若 Promise 狀態變為 Broken,則要以 CATCH 區塊去接,如下例:

my $p2 = Promise.new;

$p2.break('oh no');
say $p2.status;

# It dies because the promise is broken.
say $p2.result; 

CATCH { default { say .^name, ': ', .Str } };

(待確認...)

my $promise1 = Promise.new();
my $promise2 = $promise1.then(-> $v { say "Handled but : "; say $v.result});

$promise1.break("First Result");
try $promise2.result;

say $promise2.cause;

控制多個 Promise

我們可以用 Promise 控制其他 Promise。例如,使用 allof 方法可以等待所有 Promise 執行結束後才結束該Promise:

my @ps;

for 1..5 -> $i {
    push @ps, start {
        sleep $i;
    }
}

my $p = Promise.allof(@ps);

await $p;
say "All done";

使用 anyof 可以在其中一個 Promise 結束時即結束該 Promise:

my $p = Promise.anyof(
    Promise.in(3),
    Promise.in(8600),
);

await $p;
say "All done";

Supply

Supply 用來撰寫非同步資料流 (asynchronous data streaming),在別的程式語言稱為 events 或 reactive programming。

建立 Supply

Supply 以 tapemit 方法搭配使用。Raku 提供兩種方法來建立 supply。

可透過 Supplier 工廠建立一個 live supply,傳入資料時才執行:

my $supplier = Supplier.new;
my $supply   = $supplier.Supply;

$supply.tap( -> $v { say $v });

for 1 .. 10 {
    $supplier.emit($_);
}

也可以透過 supply 函式建立一個 on-demand supply,呼叫時即執行:

my $supply = supply {
    for 1 .. 10 {
        emit($_);
    }
};

$supply.tap( -> $v { say $v });

On-demand supply 可重覆呼叫,如下例:

my $supply = supply {
    for 1 .. 10 {
        emit($_);
    }
};

$supply.tap( -> $v { say "First : $v" });
$supply.tap( -> $v { say "Second : $v" });

關閉 supply

tap 方法會回傳 tap 物件,我們可以藉由 tap 物件關閉 supply,如下例:

my $supplier = Supplier.new;
my $supply   = $supplier.Supply;

my $tap = $supply.tap( -> $v { say $v });

$supplier.emit("OK");
$tap.close;
$supplier.emit("Won't trigger the tap");

定期傳輸資料

使用 interval 方法,可以控制 supply 在特定的秒數定期傳出資料,每次傳出的數字會遞增:

my $supply = Supply.interval(2);
$supply.tap(-> $v { say $v });
sleep 10;

grep

透過 grep 方法可從原來的 supply 過濾掉某些元素後再建立新的 supply,如下例:

my $supplier = Supplier.new;
my $supply = $supplier.Supply;

$supply.tap(-> $v { say "Original : $v" });

my $odd_supply = $supply.grep({ $_ % 2 });
$odd_supply.tap(-> $v { say "Odd : $v" });

my $even_supply = $supply.grep({ not $_ % 2 });
$even_supply.tap(-> $v { say "Even : $v" });

for 0 .. 10 {
    $supplier.emit($_);
}

map

透過 map 方法可將 supply 的元素轉換後傳回新的 supply,如下例:

my $supplier = Supplier.new;
my $supply = $supplier.Supply;

$supply.tap(-> $v { say "Original : $v" });

my $sqr_supply = $supply.map({ $_ ** 2 });
$sqr_supply.tap(-> $v { say "Square : $v" });

for 0 .. 10 {
    $supplier.emit($_);
}

改變流程

my $supplier = Supplier.new;
my $supply = $supplier.Supply;

$supply.tap: -> $v { say $v; },
    done => { say 'Job is done.' },
    quit => {
        default { say "App Error: ", $_.message }
    };

for 0 .. 10 {
    $supplier.emit($_);
}

$supplier.done;

react 區塊

我們也可以用 react 區塊定期傳出資料,當呼叫 done() 函式時結束此區塊。

react {
    whenever Supply.interval(2) -> $v {
        say $v;
        done() if $v == 4;
    }
}

react 區塊也可以接受一個串列:

react {
    whenever Supply.from-list(1..10) -> $v {
        say $v;
    }
}

Channel

Channel 用於共時性程式中傳遞數據。

建立 Channel

以下範例程式建立一個 channel 並傳送資料:

my $c = Channel.new;
$c.send("Hello from channel");
say $c.receive;

list 方法會回傳 channel 所有的數據,直到該 channel 關閉。

my $c = Channel.new;
await (^10).map: -> $n {
    start {
        sleep $n;
        $c.send($n);
    }
}
$c.close;
for $c.list -> $r {
    say $r;
}

poll

我們也可以利用 poll 進行非阻塞性回傳資料,如下例:

my $c = Channel.new;

# Start three Promises that sleep for 1..3 seconds, and then
# send a value to our Channel
^3 .map: -> $v {
    start {
        sleep 3 - $v;
        $c.send: "$v from thread {$*THREAD.id}";
    }
}

# Wait 3 seconds before closing the channel
Promise.in(3).then: { $c.close }

# Continuously loop and poll the channel, until it's closed
loop {
    if $c.poll -> $item {
        say "$item received after {now - INIT now} seconds";
    }
    elsif $c.closed {
        last;
    }
}

react 區塊

Channel 也可以取代 supply,和 react 區塊搭配,如下例:

my $channel = Channel.new;
my $p = start {
    react {
        whenever $channel {
            say $_;
        }
    }
}

await (^10).map: -> $r {
    start {
        sleep $r;
        $channel.send($r);
    }
}

$channel.close;
await $p;

或是將 channel 和 supply 結合,如下例:

my $supplier = Supplier.new;
my $supply   = $supplier.Supply;
my $channel = $supply.Channel;

my $p = start {
    react  {
        whenever $channel -> $item {
            say "via Channel: $item";
        }
    }
}

await (^10).map: -> $r {
    start {
        sleep $r;
        $supplier.emit($r);
    }
}

$supplier.done;
await $p;

Proc::Async

Proc::Async 可用非同步的方式執行外部指令,如下例:

my $proc = Proc::Async.new('echo', 'foo', 'bar');

$proc.stdout.tap(-> $v { print "Output: $v" });
$proc.stderr.tap(-> $v { print "Error:  $v" });

say "Starting...";
my $promise = $proc.start;

await $promise;
say "Done.";

如果需要從程式寫入標準輸入,可加入 :w 來修飾,如下例:

my $proc = Proc::Async.new(:w, 'grep', 'foo');

$proc.stdout.tap(-> $v { print "Output: $v" });

say "Starting...";
my $promise = $proc.start;

$proc.say("this line has foo");
$proc.say("this one doesn't");

$proc.close-stdin;
await $promise;
say "Done.";
關於作者

身為資訊領域碩士,位元詩人 (ByteBard) 認為開發應用程式的目的是為社會帶來價值。如果在這個過程中該軟體能成為永續經營的項目,那就是開發者和使用者雙贏的局面。

位元詩人喜歡用開源技術來解決各式各樣的問題,但必要時對專有技術也不排斥。閒暇之餘,位元詩人將所學寫成文章,放在這個網站上和大家分享。