協程 (C++20)
協程是一種可以掛起執行並在稍後恢復的函式。協程是無棧(stackless)的:它們透過返回呼叫者來掛起執行,而恢復執行所需的資料則儲存在與棧分離的地方。這允許以順序方式編寫非同步執行的程式碼(例如在無需顯式回呼的情況下處理非阻塞 I/O),同時也支援對惰性求值的無限序列進行演算法運算以及其他用途。
若一個函式的定義包含以下任何一項,則該函式即為協程:
- co_await 表達式 —— 用於掛起執行直到被恢復
task<> tcp_echo_server() { char data[1024]; while (true) { std::size_t n = co_await socket.async_read_some(buffer(data)); co_await async_write(socket, buffer(data, n)); } }
- co_yield 表達式 —— 用於掛起執行並返回一個值
generator<unsigned int> iota(unsigned int n = 0) { while (true) co_yield n++; }
- co_return 陳述式 —— 用於完成執行並返回一個值
lazy<int> f() { co_return 7; }
每個協程必須具備一個滿足若干需求(詳見下文)的返回型別。
目錄 |
[編輯] 限制
協程不能使用 變長參數、普通的 return 陳述式,或 佔位符返回型別(auto 或 概念 (Concept))。
Consteval 函式、constexpr 函式、建構函式、解構函式 以及 main 函式 不能作為協程。
[編輯] 執行
每個協程皆關聯於:
- 承諾物件 (promise object):在協程內部進行操作。協程透過此物件提交其結果或例外。承諾物件與 std::promise 無任何關聯。
- 協程控制代碼 (coroutine handle):在協程外部進行操作。這是一個非擁有式的控制代碼,用於恢復協程的執行或銷毀協程幀。
- 協程狀態 (coroutine state):這是內部的、動態配置的儲存空間(除非該配置被最佳化移除),此物件包含:
- 承諾物件
- 參數(全部以值複製)
- 當前掛起點的某種表示,以便恢復時知道從何處繼續,以及銷毀時知道哪些區域變數在作用域內
- 生命週期跨越當前掛起點的區域變數與暫存物件。
當協程開始執行時,它會執行以下操作:
- 配置使用 operator new 的協程狀態物件。
- 將所有函式參數複製到協程狀態:按值傳遞的參數被移動或複製,按參照傳遞的參數保持為參照(因此,如果協程在參照物件的生命週期結束後才恢復,則可能會成為懸空參照 — 參見下文範例)。
- 呼叫承諾物件的建構函式。如果承諾型別擁有一個接受所有協程參數的建構函式,則呼叫該建構函式(使用複製後的協程參數)。否則,呼叫預設建構函式。
- 呼叫 promise.get_return_object() 並將結果保留在區域變數中。該呼叫的結果將在協程首次掛起時返回給呼叫者。在此步驟之前及包括此步驟在內所拋出的任何例外,都會傳播回呼叫者,而不會放入承諾物件中。
- 呼叫 promise.initial_suspend() 並
co_await其結果。典型的Promise型別會返回 std::suspend_always(用於惰性啟動的協程),或返回 std::suspend_never(用於即時啟動的協程)。 - 當 co_await promise.initial_suspend() 恢復時,開始執行協程本體。
關於參數成為懸空狀態的一些範例
#include <coroutine> #include <iostream> struct promise; struct coroutine : std::coroutine_handle<promise> { using promise_type = ::promise; }; struct promise { coroutine get_return_object() { return {coroutine::from_promise(*this)}; } std::suspend_always initial_suspend() noexcept { return {}; } std::suspend_always final_suspend() noexcept { return {}; } void return_void() {} void unhandled_exception() {} }; struct S { int i; coroutine f() { std::cout << i; co_return; } }; void bad1() { coroutine h = S{0}.f(); // S{0} destroyed h.resume(); // resumed coroutine executes std::cout << i, uses S::i after free h.destroy(); } coroutine bad2() { S s{0}; return s.f(); // returned coroutine can't be resumed without committing use after free } void bad3() { coroutine h = [i = 0]() -> coroutine // a lambda that's also a coroutine { std::cout << i; co_return; }(); // immediately invoked // lambda destroyed h.resume(); // uses (anonymous lambda type)::i after free h.destroy(); } void good() { coroutine h = [](int i) -> coroutine // make i a coroutine parameter { std::cout << i; co_return; }(0); // lambda destroyed h.resume(); // no problem, i has been copied to the coroutine // frame as a by-value parameter h.destroy(); }
當協程到達掛起點時:
- 早先獲得的返回物件會被返回給呼叫者/恢復者(必要時進行隱式轉換,轉為協程的返回型別)。
當協程到達 co_return 陳述式時,它會執行以下操作:
- 針對以下情況,呼叫 promise.return_void():
- co_return;
- co_return expr;(其中 expr 的型別為 void)
- 或者針對 co_return expr;(其中 expr 為非 void 型別),呼叫 promise.return_value(expr)。
- 銷毀所有具有自動儲存期的變數(按建立順序的相反順序)。
- 呼叫 promise.final_suspend() 並對結果進行 co_await。
協程執行完畢等同於 co_return;,除非在 Promise 的作用域中找不到 return_void 的宣告,這種情況下行為是未定義的。若函式體中不包含任何定義關鍵字,則該函式即使返回型別不是(可能帶有 cv 限定的)void,也不是一個協程,此時若執行完畢會導致未定義行為。
// assuming that task is some coroutine task type task<void> f() { // not a coroutine, undefined behavior } task<void> g() { co_return; // OK } task<void> h() { co_await g(); // OK, implicit co_return; }
如果協程以未捕獲的例外結束,它會執行以下操作:
- 在 catch 區塊內捕獲該例外並呼叫 promise.unhandled_exception()。
- 呼叫 promise.final_suspend() 並對結果進行 co_await(例如為了恢復接續或發佈結果)。從此點恢復協程屬於未定義行為。
當協程狀態被銷毀時(無論是因為 co_return 終止、未捕獲例外,還是透過其控制代碼銷毀),它會執行以下操作:
- 呼叫承諾物件的解構函式。
- 呼叫函式參數副本的解構函式。
- 呼叫 operator delete 以釋放協程狀態使用的記憶體。
- 將執行轉回給呼叫者/恢復者。
[編輯] 動態記憶體配置
協程狀態是透過非陣列的 operator new 動態配置的。
如果 Promise 型別定義了類別層級的替換,則會使用該替換,否則將使用全域的 operator new。
如果 Promise 型別定義了接受額外參數的置放(placement)形式 operator new,且這些參數符合一個引數列表(其中第一個參數是所請求的大小,型別為 std::size_t,其餘為協程函式的引數),則這些引數將傳遞給 operator new(這使得對協程使用 領先配置器慣用法 (leading-allocator-convention) 成為可能)。
對 operator new 的呼叫可以在以下情況下被最佳化(即使使用了自訂配置器):
- 協程狀態的生命週期嚴格嵌套在呼叫者的生命週期內,並且
- 協程幀的大小在呼叫點是已知的。
在這種情況下,協程狀態會嵌入在呼叫者的棧幀中(如果呼叫者是普通函式)或協程狀態中(如果呼叫者是協程)。
如果配置失敗,協程會拋出 std::bad_alloc,除非 Promise 型別定義了成員函式 Promise::get_return_object_on_allocation_failure()。若定義了該函式,則配置會使用不拋出(nothrow)形式的 operator new,且在配置失敗時,協程會立即將從 Promise::get_return_object_on_allocation_failure() 獲得的物件返回給呼叫者。
struct Coroutine::promise_type { /* ... */ // ensure the use of non-throwing operator-new static Coroutine get_return_object_on_allocation_failure() { std::cerr << __func__ << '\n'; throw std::bad_alloc(); // or, return Coroutine(nullptr); } // custom non-throwing overload of new void* operator new(std::size_t n) noexcept { if (void* mem = std::malloc(n)) return mem; return nullptr; // allocation failure } };
[編輯] 承諾物件 (Promise)
Promise 型別由編譯器根據協程的返回型別,利用 std::coroutine_traits 來確定。
形式上,設
-
R和Args...分別表示協程的返回型別和參數型別列表, -
ClassT表示若協程定義為非靜態成員函式時所屬的類別型別, - cv 表示若協程定義為非靜態成員函式時在 函式宣告 中宣告的 cv 限定符,
則其 Promise 型別確定如下:
- std::coroutine_traits<R, Args...>::promise_type,若該協程並非定義為 隱式物件成員函式,
- std::coroutine_traits<R,
cvClassT&, Args...>::promise_type,若該協程定義為非右值參照限定的隱式物件成員函式, - std::coroutine_traits<R,
cvClassT&&, Args...>::promise_type,若該協程定義為右值參照限定的隱式物件成員函式。
例如
| 若協程定義為 ... | 則其 Promise 型別為 ... |
|---|---|
| task<void> foo(int x); | std::coroutine_traits<task<void>, int>::promise_type |
| task<void> Bar::foo(int x) const; | std::coroutine_traits<task<void>, const Bar&, int>::promise_type |
| task<void> Bar::foo(int x) &&; | std::coroutine_traits<task<void>, Bar&&, int>::promise_type |
[編輯] co_await
一元運算子 co_await 會掛起協程並將控制權返回給呼叫者。
co_await expr |
|||||||||
co_await 表達式只能出現在正規 函式體(包含 lambda 表達式 的函式體)內的 潛在求值 表達式中,且不能出現在:
- 在 處理器 (handler) 中,
- 在 宣告 陳述式中(除非是在該宣告陳述式的初始化器中),
- 在 init-statement 的 簡單宣告 中(參見
if、switch、for和 range-for),除非是在該 init-statement 的初始化器中, - 在 預設引數 中,或
- 在具有靜態或執行緒 儲存期 的區塊作用域變數的初始化器中。
|
co_await 表達式不能是 契約斷言 (contract assertion) 謂詞的 潛在求值 子表達式。 |
(C++26 起) |
首先,expr 會轉換為可等待物件 (awaitable),如下所示:
- 若 expr 是由初始掛起點、最終掛起點或 yield 表達式產生的,則可等待物件即為 expr 本身。
- 否則,如果當前協程的
Promise型別擁有成員函式await_transform,則可等待物件為 promise.await_transform(expr)。 - 否則,可等待物件即為 expr 本身。
接著,獲得等待者物件 (awaiter object),如下所示:
- 若針對 operator co_await 的重載解析給出單一最佳重載,則等待者為該呼叫的結果:
- awaitable.operator co_await()(針對成員重載),
- operator co_await(static_cast<Awaitable&&>(awaitable))(針對非成員重載)。
- 否則,若重載解析找不到 operator co_await,則等待者即為可等待物件本身。
- 否則,若重載解析有歧義,則程式格式錯誤。
如果上述表達式為 純右值 (prvalue),等待者物件即為從其 實體化 的暫存物件。否則,若上述表達式為 泛左值 (glvalue),等待者物件即為其所指代之物件。
接著,呼叫 awaiter.await_ready()(這是一種捷徑,若已知結果已就緒或可同步完成,則避免掛起的成本)。如果其結果(經上下文轉換為 bool)為 false,則:
- 協程被掛起(其協程狀態填入了區域變數與當前掛起點)。
- 呼叫 awaiter.await_suspend(handle),其中 handle 是代表當前協程的協程控制代碼。在該函式內部,可透過控制代碼觀察到已掛起的協程狀態,此函式負責將其調度至某個執行器上恢復,或銷毀它(返回 false 視為已調度)。
- 如果
await_suspend返回 void,則控制權立即返回給當前協程的呼叫者/恢復者(當前協程保持掛起狀態)。 - 如果
await_suspend返回 bool:
- 值 true 將控制權返回給當前協程的呼叫者/恢復者,
- 值 false 恢復當前協程。
- 如果
await_suspend返回另一個協程的控制代碼,則該控制代碼會被恢復(透過呼叫 handle.resume())(注意這可能會連鎖導致當前協程最終恢復)。 - 如果
await_suspend拋出例外,該例外會被捕獲,協程恢復,且該例外被重新拋出。
- 如果
最後,呼叫 awaiter.await_resume()(無論協程是否被掛起),其結果即為整個 co_await expr 表達式的結果。
如果協程在 co_await 表達式中被掛起並稍後恢復,恢復點位於呼叫 awaiter.await_resume() 之前。
請注意,協程在進入 awaiter.await_suspend() 之前已完全掛起。其控制代碼可在 await_suspend() 返回之前與另一個執行緒共用並被恢復。(請注意,預設記憶體安全性規則仍然適用,因此如果協程控制代碼在未加鎖的情況下跨執行緒共用,等待者至少應使用 釋放語義 (release semantics),而恢復者至少應使用 獲取語義 (acquire semantics)。)例如,協程控制代碼可以放入回呼中,排程在非同步 I/O 操作完成時於執行緒池執行。在這種情況下,由於當前協程可能已被恢復並因此執行了等待者物件的解構函式,這一切都與 await_suspend() 在當前執行緒上繼續執行併發發生,故 await_suspend() 應將 *this 視為已銷毀,且不應在控制代碼發佈至其他執行緒後存取它。
[編輯] 範例
#include <coroutine> #include <iostream> #include <stdexcept> #include <thread> auto switch_to_new_thread(std::jthread& out) { struct awaitable { std::jthread* p_out; bool await_ready() { return false; } void await_suspend(std::coroutine_handle<> h) { std::jthread& out = *p_out; if (out.joinable()) throw std::runtime_error("Output jthread parameter not empty"); out = std::jthread([h] { h.resume(); }); // Potential undefined behavior: accessing potentially destroyed *this // std::cout << "New thread ID: " << p_out->get_id() << '\n'; std::cout << "New thread ID: " << out.get_id() << '\n'; // this is OK } void await_resume() {} }; return awaitable{&out}; } struct task { struct promise_type { task get_return_object() { return {}; } std::suspend_never initial_suspend() { return {}; } std::suspend_never final_suspend() noexcept { return {}; } void return_void() {} void unhandled_exception() {} }; }; task resuming_on_new_thread(std::jthread& out) { std::cout << "Coroutine started on thread: " << std::this_thread::get_id() << '\n'; co_await switch_to_new_thread(out); // awaiter destroyed here std::cout << "Coroutine resumed on thread: " << std::this_thread::get_id() << '\n'; } int main() { std::jthread out; resuming_on_new_thread(out); }
可能輸出
Coroutine started on thread: 139972277602112 New thread ID: 139972267284224 Coroutine resumed on thread: 139972267284224
注意:等待者物件是協程狀態的一部分(作為生命週期跨越掛起點的暫存物件),且在 co_await 表達式結束前被銷毀。它可以用於維護某些非同步 I/O API 所需的每個操作狀態,而無需採取額外的動態記憶體配置。
標準函式庫定義了兩個平凡的可等待物件:std::suspend_always 和 std::suspend_never。
| 本節尚不完整 原因:範例 |
| promise_type::await_transform 與程式提供之等待者的示範 |
|---|
[編輯] 範例執行此程式碼 #include <cassert> #include <coroutine> #include <iostream> struct tunable_coro { // An awaiter whose "readiness" is determined via constructor's parameter. class tunable_awaiter { bool ready_; public: explicit(false) tunable_awaiter(bool ready) : ready_{ready} {} // Three standard awaiter interface functions: bool await_ready() const noexcept { return ready_; } static void await_suspend(std::coroutine_handle<>) noexcept {} static void await_resume() noexcept {} }; struct promise_type { using coro_handle = std::coroutine_handle<promise_type>; auto get_return_object() { return coro_handle::from_promise(*this); } static auto initial_suspend() { return std::suspend_always(); } static auto final_suspend() noexcept { return std::suspend_always(); } static void return_void() {} static void unhandled_exception() { std::terminate(); } // A user provided transforming function which returns the custom awaiter: auto await_transform(std::suspend_always) { return tunable_awaiter(!ready_); } void disable_suspension() { ready_ = false; } private: bool ready_{true}; }; tunable_coro(promise_type::coro_handle h) : handle_(h) { assert(h); } // For simplicity, declare these 4 special functions as deleted: tunable_coro(tunable_coro const&) = delete; tunable_coro(tunable_coro&&) = delete; tunable_coro& operator=(tunable_coro const&) = delete; tunable_coro& operator=(tunable_coro&&) = delete; ~tunable_coro() { if (handle_) handle_.destroy(); } void disable_suspension() const { if (handle_.done()) return; handle_.promise().disable_suspension(); handle_(); } bool operator()() { if (!handle_.done()) handle_(); return !handle_.done(); } private: promise_type::coro_handle handle_; }; tunable_coro generate(int n) { for (int i{}; i != n; ++i) { std::cout << i << ' '; // The awaiter passed to co_await goes to promise_type::await_transform which // issues tunable_awaiter that initially causes suspension (returning back to // main at each iteration), but after a call to disable_suspension no suspension // happens and the loop runs to its end without returning to main(). co_await std::suspend_always{}; } } int main() { auto coro = generate(8); coro(); // emits only one first element == 0 for (int k{}; k < 4; ++k) { coro(); // emits 1 2 3 4, one per each iteration std::cout << ": "; } coro.disable_suspension(); coro(); // emits the tail numbers 5 6 7 all at ones } 輸出 0 1 : 2 : 3 : 4 : 5 6 7 |
[編輯] co_yield
co_yield 表達式返回一個值給呼叫者並掛起當前協程:它是可恢復生成器函式的常見建構組塊。
co_yield expr |
|||||||||
co_yield braced-init-list |
|||||||||
它等同於:
co_await promise.yield_value(expr)
典型的生成器 yield_value 會將其引數儲存(複製/移動,或僅儲存位址,因為引數的生命週期跨越了 co_await 內部的掛起點)到生成器物件中,並返回 std::suspend_always,從而將控制權轉移給呼叫者/恢復者。
#include <coroutine> #include <cstdint> #include <exception> #include <iostream> template<typename T> struct Generator { // The class name 'Generator' is our choice and it is not required for coroutine // magic. Compiler recognizes coroutine by the presence of 'co_yield' keyword. // You can use name 'MyGenerator' (or any other name) instead as long as you include // nested struct promise_type with 'MyGenerator get_return_object()' method. // (Note: It is necessary to adjust the declarations of constructors and destructors // when renaming.) struct promise_type; using handle_type = std::coroutine_handle<promise_type>; struct promise_type // required { T value_; std::exception_ptr exception_; Generator get_return_object() { return Generator(handle_type::from_promise(*this)); } std::suspend_always initial_suspend() { return {}; } std::suspend_always final_suspend() noexcept { return {}; } void unhandled_exception() { exception_ = std::current_exception(); } // saving // exception template<std::convertible_to<T> From> // C++20 concept std::suspend_always yield_value(From&& from) { value_ = std::forward<From>(from); // caching the result in promise return {}; } void return_void() {} }; handle_type h_; Generator(handle_type h) : h_(h) {} ~Generator() { h_.destroy(); } explicit operator bool() { fill(); // The only way to reliably find out whether or not we finished coroutine, // whether or not there is going to be a next value generated (co_yield) // in coroutine via C++ getter (operator () below) is to execute/resume // coroutine until the next co_yield point (or let it fall off end). // Then we store/cache result in promise to allow getter (operator() below // to grab it without executing coroutine). return !h_.done(); } T operator()() { fill(); full_ = false; // we are going to move out previously cached // result to make promise empty again return std::move(h_.promise().value_); } private: bool full_ = false; void fill() { if (!full_) { h_(); if (h_.promise().exception_) std::rethrow_exception(h_.promise().exception_); // propagate coroutine exception in called context full_ = true; } } }; Generator<std::uint64_t> fibonacci_sequence(unsigned n) { if (n == 0) co_return; if (n > 94) throw std::runtime_error("Too big Fibonacci sequence. Elements would overflow."); co_yield 0; if (n == 1) co_return; co_yield 1; if (n == 2) co_return; std::uint64_t a = 0; std::uint64_t b = 1; for (unsigned i = 2; i < n; ++i) { std::uint64_t s = a + b; co_yield s; a = b; b = s; } } int main() { try { auto gen = fibonacci_sequence(10); // max 94 before uint64_t overflows for (int j = 0; gen; ++j) std::cout << "fib(" << j << ")=" << gen() << '\n'; } catch (const std::exception& ex) { std::cerr << "Exception: " << ex.what() << '\n'; } catch (...) { std::cerr << "Unknown exception.\n"; } }
輸出
fib(0)=0 fib(1)=1 fib(2)=1 fib(3)=2 fib(4)=3 fib(5)=5 fib(6)=8 fib(7)=13 fib(8)=21 fib(9)=34
[編輯] 註記
| 功能測試巨集 | 數值 | 標準 | 功能 |
|---|---|---|---|
__cpp_impl_coroutine |
201902L |
(C++20) | 協程(編譯器支援) |
__cpp_lib_coroutine |
201902L |
(C++20) | 協程(函式庫支援) |
__cpp_lib_generator |
202207L |
(C++23) | std::generator:用於範圍的同步協程生成器 |
[編輯] 關鍵字
[編輯] 函式庫支援
協程支援函式庫定義了數種提供協程編譯期與執行期支援的型別。
[編輯] 缺陷報告
下列更改行為的缺陷報告追溯應用於之前的 C++ 標準。
| DR | 應用於 | 出版時的行為 | 正確的行為 |
|---|---|---|---|
| CWG 2556 | C++20 | 無效的 return_void 導致協程執行完畢後的行為未定義 |
在這種情況下,程式格式錯誤 formed in this case |
| CWG 2668 | C++20 | co_await 無法出現在 lambda 表達式中 | 已允許 |
| CWG 2754 | C++23 | 在為顯式物件成員函式建構承諾物件時, 採用了 *this |
在此情況下,*this 並未 被採用 |
[編輯] 參見
| (C++23) |
一種代表同步 協程 生成器的 view(類別範本) |
[編輯] 外部連結
| 1. | Lewis Baker, 2017-2022 - 非對稱傳輸 (Asymmetric Transfer)。 |
| 2. | David Mazières, 2021 - C++20 協程教學。 |
| 3. | Chuanqi Xu & Yu Qi & Yao Han, 2021 - C++20 協程原理與應用。 (中文) |
| 4. | Simon Tatham, 2023 - 撰寫自訂的 C++20 協程系統。 |