From ca7de00685f189aee76e909ab8dd7c0ef322ad89 Mon Sep 17 00:00:00 2001 From: Michael Vandeberg Date: Wed, 11 Feb 2026 13:54:41 -0700 Subject: [PATCH] kqueue/epoll: make cancel() reliable when ops complete inline Add cancel_pending flags to descriptor_state so cancellation intent persists across speculative I/O completions, matching IOCP's CancelIoEx semantics. Symmetry --- src/corosio/src/detail/epoll/op.hpp | 8 ++++ src/corosio/src/detail/epoll/sockets.cpp | 33 ++++++++++++++-- src/corosio/src/detail/epoll/sockets.hpp | 3 +- src/corosio/src/detail/kqueue/op.hpp | 8 ++++ src/corosio/src/detail/kqueue/sockets.cpp | 47 ++++++++++++++++++++++- 5 files changed, 92 insertions(+), 7 deletions(-) diff --git a/src/corosio/src/detail/epoll/op.hpp b/src/corosio/src/detail/epoll/op.hpp index 112b1cce..425599f5 100644 --- a/src/corosio/src/detail/epoll/op.hpp +++ b/src/corosio/src/detail/epoll/op.hpp @@ -121,6 +121,14 @@ struct descriptor_state : scheduler_op bool read_ready = false; bool write_ready = false; + // Deferred cancellation: set by cancel() when the target op is not + // parked (e.g. completing inline via speculative I/O). Checked when + // the next op parks; if set, the op is immediately self-cancelled. + // This matches IOCP semantics where CancelIoEx always succeeds. + bool read_cancel_pending = false; + bool write_cancel_pending = false; + bool connect_cancel_pending = false; + // Set during registration only (no mutex needed) std::uint32_t registered_events = 0; int fd = -1; diff --git a/src/corosio/src/detail/epoll/sockets.cpp b/src/corosio/src/detail/epoll/sockets.cpp index 0967d5a9..d27ec6c6 100644 --- a/src/corosio/src/detail/epoll/sockets.cpp +++ b/src/corosio/src/detail/epoll/sockets.cpp @@ -37,7 +37,8 @@ epoll_socket_impl:: register_op( epoll_op& op, epoll_op*& desc_slot, - bool& ready_flag) noexcept + bool& ready_flag, + bool& cancel_flag) noexcept { svc_.work_started(); @@ -52,6 +53,12 @@ register_op( op.errn = 0; } + if (cancel_flag) + { + cancel_flag = false; + op.cancelled.store(true, std::memory_order_relaxed); + } + if (io_done || op.cancelled.load(std::memory_order_acquire)) { svc_.post(&op); @@ -238,7 +245,8 @@ connect( op.start(token, this); op.impl_ptr = shared_from_this(); - register_op(op, desc_state_.connect_op, desc_state_.write_ready); + register_op(op, desc_state_.connect_op, desc_state_.write_ready, + desc_state_.connect_cancel_pending); return std::noop_coroutine(); } @@ -320,7 +328,8 @@ read_some( op.start(token, this); op.impl_ptr = shared_from_this(); - register_op(op, desc_state_.read_op, desc_state_.read_ready); + register_op(op, desc_state_.read_op, desc_state_.read_ready, + desc_state_.read_cancel_pending); return std::noop_coroutine(); } @@ -400,7 +409,8 @@ write_some( op.start(token, this); op.impl_ptr = shared_from_this(); - register_op(op, desc_state_.write_op, desc_state_.write_ready); + register_op(op, desc_state_.write_op, desc_state_.write_ready, + desc_state_.write_cancel_pending); return std::noop_coroutine(); } @@ -571,10 +581,16 @@ cancel() noexcept std::lock_guard lock(desc_state_.mutex); if (desc_state_.connect_op == &conn_) conn_claimed = std::exchange(desc_state_.connect_op, nullptr); + else + desc_state_.connect_cancel_pending = true; if (desc_state_.read_op == &rd_) rd_claimed = std::exchange(desc_state_.read_op, nullptr); + else + desc_state_.read_cancel_pending = true; if (desc_state_.write_op == &wr_) wr_claimed = std::exchange(desc_state_.write_op, nullptr); + else + desc_state_.write_cancel_pending = true; } if (conn_claimed) @@ -615,6 +631,12 @@ cancel_single_op(epoll_op& op) noexcept std::lock_guard lock(desc_state_.mutex); if (*desc_op_ptr == &op) claimed = std::exchange(*desc_op_ptr, nullptr); + else if (&op == &conn_) + desc_state_.connect_cancel_pending = true; + else if (&op == &rd_) + desc_state_.read_cancel_pending = true; + else if (&op == &wr_) + desc_state_.write_cancel_pending = true; } if (claimed) { @@ -659,6 +681,9 @@ close_socket() noexcept desc_state_.connect_op = nullptr; desc_state_.read_ready = false; desc_state_.write_ready = false; + desc_state_.read_cancel_pending = false; + desc_state_.write_cancel_pending = false; + desc_state_.connect_cancel_pending = false; } desc_state_.registered_events = 0; diff --git a/src/corosio/src/detail/epoll/sockets.hpp b/src/corosio/src/detail/epoll/sockets.hpp index 80cdcde4..0206885e 100644 --- a/src/corosio/src/detail/epoll/sockets.hpp +++ b/src/corosio/src/detail/epoll/sockets.hpp @@ -169,7 +169,8 @@ class epoll_socket_impl void register_op( epoll_op& op, epoll_op*& desc_slot, - bool& ready_flag) noexcept; + bool& ready_flag, + bool& cancel_flag) noexcept; friend struct epoll_op; friend struct epoll_connect_op; diff --git a/src/corosio/src/detail/kqueue/op.hpp b/src/corosio/src/detail/kqueue/op.hpp index 9ff95ab8..6d77d238 100644 --- a/src/corosio/src/detail/kqueue/op.hpp +++ b/src/corosio/src/detail/kqueue/op.hpp @@ -132,6 +132,14 @@ struct descriptor_state : scheduler_op bool read_ready = false; bool write_ready = false; + // Deferred cancellation: set by cancel() when the target op is not + // parked (e.g. completing inline via speculative I/O). Checked when + // the next op parks; if set, the op is immediately self-cancelled. + // This matches IOCP semantics where CancelIoEx always succeeds. + bool read_cancel_pending = false; + bool write_cancel_pending = false; + bool connect_cancel_pending = false; + // Set during registration only (no mutex needed) std::uint32_t registered_events = 0; int fd = -1; diff --git a/src/corosio/src/detail/kqueue/sockets.cpp b/src/corosio/src/detail/kqueue/sockets.cpp index a8b394ea..5f7a9536 100644 --- a/src/corosio/src/detail/kqueue/sockets.cpp +++ b/src/corosio/src/detail/kqueue/sockets.cpp @@ -215,6 +215,11 @@ connect( else { desc_state_.connect_op = &op; + if (desc_state_.connect_cancel_pending) + { + desc_state_.connect_cancel_pending = false; + op.cancelled.store(true, std::memory_order_relaxed); + } } } @@ -237,6 +242,11 @@ connect( continue; } desc_state_.connect_op = &op; + if (desc_state_.connect_cancel_pending) + { + desc_state_.connect_cancel_pending = false; + op.cancelled.store(true, std::memory_order_relaxed); + } break; } return std::noop_coroutine(); @@ -310,6 +320,11 @@ do_read_io() else { desc_state_.read_op = &op; + if (desc_state_.read_cancel_pending) + { + desc_state_.read_cancel_pending = false; + op.cancelled.store(true, std::memory_order_relaxed); + } } } @@ -332,9 +347,13 @@ do_read_io() continue; } desc_state_.read_op = &op; + if (desc_state_.read_cancel_pending) + { + desc_state_.read_cancel_pending = false; + op.cancelled.store(true, std::memory_order_relaxed); + } break; } - return; } if (op.cancelled.load(std::memory_order_acquire)) @@ -394,6 +413,11 @@ do_write_io() else { desc_state_.write_op = &op; + if (desc_state_.write_cancel_pending) + { + desc_state_.write_cancel_pending = false; + op.cancelled.store(true, std::memory_order_relaxed); + } } } @@ -416,9 +440,13 @@ do_write_io() continue; } desc_state_.write_op = &op; + if (desc_state_.write_cancel_pending) + { + desc_state_.write_cancel_pending = false; + op.cancelled.store(true, std::memory_order_relaxed); + } break; } - return; } if (op.cancelled.load(std::memory_order_acquire)) @@ -692,10 +720,16 @@ cancel() noexcept std::lock_guard lock(desc_state_.mutex); if (desc_state_.connect_op == &conn_) conn_claimed = std::exchange(desc_state_.connect_op, nullptr); + else + desc_state_.connect_cancel_pending = true; if (desc_state_.read_op == &rd_) rd_claimed = std::exchange(desc_state_.read_op, nullptr); + else + desc_state_.read_cancel_pending = true; if (desc_state_.write_op == &wr_) wr_claimed = std::exchange(desc_state_.write_op, nullptr); + else + desc_state_.write_cancel_pending = true; } if (conn_claimed) @@ -736,6 +770,12 @@ cancel_single_op(kqueue_op& op) noexcept std::lock_guard lock(desc_state_.mutex); if (*desc_op_ptr == &op) claimed = std::exchange(*desc_op_ptr, nullptr); + else if (&op == &conn_) + desc_state_.connect_cancel_pending = true; + else if (&op == &rd_) + desc_state_.read_cancel_pending = true; + else if (&op == &wr_) + desc_state_.write_cancel_pending = true; } if (claimed) { @@ -782,6 +822,9 @@ close_socket() noexcept desc_state_.connect_op = nullptr; desc_state_.read_ready = false; desc_state_.write_ready = false; + desc_state_.read_cancel_pending = false; + desc_state_.write_cancel_pending = false; + desc_state_.connect_cancel_pending = false; } desc_state_.registered_events = 0;