由於 CPU 的時脈已經到物理上限,現在的硬體都往多核心、多 CPU 發展。同樣地,單一的大型伺服器相當昂貴,而且擴充量有限,使用多台主機組成的叢集則相對易於擴充。然而,若程式碼沒有使用共時性 (concurrency) 的特性來撰寫,則無法真正發揮平行處理 (parallel computing) 所帶來的效能提升。
Raku 提供兩套共時性 API,一套是高階 API,另一套則是低階 API。高階 API 提供一些抽象的物件,像是 Promise、Supply、Channel 等,程式設計者不需擔心底層的虛擬機器實作,可專心在撰寫共時性程式上。低階 API 則提供較為傳統的共時性物件,包括 Thread 和 Lock (即 mutex) 等。除非有特別的理由,應優先使用高階 API。
本文介紹高階 API。
附註
Perl 6 已經正式更名為 Raku,我們沿用新的名稱來稱呼這個語言。
Promise
Promise 是 R 執行共時性程式區塊的基本單位,在別的語言也稱為 Future。
建立 Promise
以下範例啟動一個 Promise:
my $p = Promise.start({
say "Hello from a Promise";
});
await $p;
由於 Promise 是共時執行,我們會用 await
等待該 Promise 結束。
也可以用以下較為簡略的寫法:
my $p = start {
say "Hello from a Promise";
};
await $p;
Promise 也可以回傳資料,如下例:
my $p = start {
(1..10).reduce({ $^a + $^b });
}
my $sum = await $p;
$sum == 55 or die "Wrong value";
如果需要確保 Promise 的順序,可以使用 then
方法,如下例:
my $p = Promise.start({
say "Hello from first promise";
})
.then({
say "Hello from second promise";
});
await $p;
由於 Promise 區塊是共時發生的,無法保證其順序。例如,多執行幾次以下程式,會發現每次順序會略為不同:
my $p1 = start {
say "Message from promise 1";
};
my $p2 = start {
say "Message from promise 2";
};
my $p3 = start {
say "Message from promise 3";
};
await $p1, $p2, $p3;
Promise 可指定在特定秒數後才執行,如下例:
my $p = Promise
.in(1)
.then({ say "Hello from promise"; });
await $p;
Promise 的狀態
Promise 建立後,有三個狀態:
Planned
:Promise 建立時的狀態Kept
:Promise 執行成功Broken
:Promise 執行失敗
必要時,可以手動控制狀態。在下例中,我們將 Promise 的狀態調到 Kept
:
my $p1 = Promise.new;
say $p1.status;
$p1.keep("result");
say $p1.status;
say $p1.result;
若 Promise 狀態變為 Broken
,則要以 CATCH
區塊去接,如下例:
my $p2 = Promise.new;
$p2.break('oh no');
say $p2.status;
# It dies because the promise is broken.
say $p2.result;
CATCH { default { say .^name, ': ', .Str } };
(待確認...)
my $promise1 = Promise.new();
my $promise2 = $promise1.then(-> $v { say "Handled but : "; say $v.result});
$promise1.break("First Result");
try $promise2.result;
say $promise2.cause;
控制多個 Promise
我們可以用 Promise 控制其他 Promise。例如,使用 allof
方法可以等待所有 Promise 執行結束後才結束該Promise:
my @ps;
for 1..5 -> $i {
push @ps, start {
sleep $i;
}
}
my $p = Promise.allof(@ps);
await $p;
say "All done";
使用 anyof
可以在其中一個 Promise 結束時即結束該 Promise:
my $p = Promise.anyof(
Promise.in(3),
Promise.in(8600),
);
await $p;
say "All done";
Supply
Supply 用來撰寫非同步資料流 (asynchronous data streaming),在別的程式語言稱為 events 或 reactive programming。
建立 Supply
Supply 以 tap
和 emit
方法搭配使用。Raku 提供兩種方法來建立 supply。
可透過 Supplier 工廠建立一個 live supply,傳入資料時才執行:
my $supplier = Supplier.new;
my $supply = $supplier.Supply;
$supply.tap( -> $v { say $v });
for 1 .. 10 {
$supplier.emit($_);
}
也可以透過 supply
函式建立一個 on-demand supply,呼叫時即執行:
my $supply = supply {
for 1 .. 10 {
emit($_);
}
};
$supply.tap( -> $v { say $v });
On-demand supply 可重覆呼叫,如下例:
my $supply = supply {
for 1 .. 10 {
emit($_);
}
};
$supply.tap( -> $v { say "First : $v" });
$supply.tap( -> $v { say "Second : $v" });
關閉 supply
tap
方法會回傳 tap 物件,我們可以藉由 tap 物件關閉 supply,如下例:
my $supplier = Supplier.new;
my $supply = $supplier.Supply;
my $tap = $supply.tap( -> $v { say $v });
$supplier.emit("OK");
$tap.close;
$supplier.emit("Won't trigger the tap");
定期傳輸資料
使用 interval
方法,可以控制 supply 在特定的秒數定期傳出資料,每次傳出的數字會遞增:
my $supply = Supply.interval(2);
$supply.tap(-> $v { say $v });
sleep 10;
grep
透過 grep
方法可從原來的 supply 過濾掉某些元素後再建立新的 supply,如下例:
my $supplier = Supplier.new;
my $supply = $supplier.Supply;
$supply.tap(-> $v { say "Original : $v" });
my $odd_supply = $supply.grep({ $_ % 2 });
$odd_supply.tap(-> $v { say "Odd : $v" });
my $even_supply = $supply.grep({ not $_ % 2 });
$even_supply.tap(-> $v { say "Even : $v" });
for 0 .. 10 {
$supplier.emit($_);
}
map
透過 map
方法可將 supply 的元素轉換後傳回新的 supply,如下例:
my $supplier = Supplier.new;
my $supply = $supplier.Supply;
$supply.tap(-> $v { say "Original : $v" });
my $sqr_supply = $supply.map({ $_ ** 2 });
$sqr_supply.tap(-> $v { say "Square : $v" });
for 0 .. 10 {
$supplier.emit($_);
}
改變流程
my $supplier = Supplier.new;
my $supply = $supplier.Supply;
$supply.tap: -> $v { say $v; },
done => { say 'Job is done.' },
quit => {
default { say "App Error: ", $_.message }
};
for 0 .. 10 {
$supplier.emit($_);
}
$supplier.done;
react 區塊
我們也可以用 react
區塊定期傳出資料,當呼叫 done()
函式時結束此區塊。
react {
whenever Supply.interval(2) -> $v {
say $v;
done() if $v == 4;
}
}
react
區塊也可以接受一個串列:
react {
whenever Supply.from-list(1..10) -> $v {
say $v;
}
}
Channel
Channel 用於共時性程式中傳遞數據。
建立 Channel
以下範例程式建立一個 channel 並傳送資料:
my $c = Channel.new;
$c.send("Hello from channel");
say $c.receive;
list
方法會回傳 channel 所有的數據,直到該 channel 關閉。
my $c = Channel.new;
await (^10).map: -> $n {
start {
sleep $n;
$c.send($n);
}
}
$c.close;
for $c.list -> $r {
say $r;
}
poll
我們也可以利用 poll
進行非阻塞性回傳資料,如下例:
my $c = Channel.new;
# Start three Promises that sleep for 1..3 seconds, and then
# send a value to our Channel
^3 .map: -> $v {
start {
sleep 3 - $v;
$c.send: "$v from thread {$*THREAD.id}";
}
}
# Wait 3 seconds before closing the channel
Promise.in(3).then: { $c.close }
# Continuously loop and poll the channel, until it's closed
loop {
if $c.poll -> $item {
say "$item received after {now - INIT now} seconds";
}
elsif $c.closed {
last;
}
}
react 區塊
Channel 也可以取代 supply,和 react 區塊搭配,如下例:
my $channel = Channel.new;
my $p = start {
react {
whenever $channel {
say $_;
}
}
}
await (^10).map: -> $r {
start {
sleep $r;
$channel.send($r);
}
}
$channel.close;
await $p;
或是將 channel 和 supply 結合,如下例:
my $supplier = Supplier.new;
my $supply = $supplier.Supply;
my $channel = $supply.Channel;
my $p = start {
react {
whenever $channel -> $item {
say "via Channel: $item";
}
}
}
await (^10).map: -> $r {
start {
sleep $r;
$supplier.emit($r);
}
}
$supplier.done;
await $p;
Proc::Async
Proc::Async
可用非同步的方式執行外部指令,如下例:
my $proc = Proc::Async.new('echo', 'foo', 'bar');
$proc.stdout.tap(-> $v { print "Output: $v" });
$proc.stderr.tap(-> $v { print "Error: $v" });
say "Starting...";
my $promise = $proc.start;
await $promise;
say "Done.";
如果需要從程式寫入標準輸入,可加入 :w
來修飾,如下例:
my $proc = Proc::Async.new(:w, 'grep', 'foo');
$proc.stdout.tap(-> $v { print "Output: $v" });
say "Starting...";
my $promise = $proc.start;
$proc.say("this line has foo");
$proc.say("this one doesn't");
$proc.close-stdin;
await $promise;
say "Done.";