Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/corosio/src/detail/epoll/op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,14 @@ struct descriptor_state : scheduler_op
bool read_ready = false;
bool write_ready = false;

// Deferred cancellation: set by cancel() when the target op is not
// parked (e.g. completing inline via speculative I/O). Checked when
// the next op parks; if set, the op is immediately self-cancelled.
// This matches IOCP semantics where CancelIoEx always succeeds.
bool read_cancel_pending = false;
bool write_cancel_pending = false;
bool connect_cancel_pending = false;

// Set during registration only (no mutex needed)
std::uint32_t registered_events = 0;
int fd = -1;
Expand Down
33 changes: 29 additions & 4 deletions src/corosio/src/detail/epoll/sockets.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ epoll_socket_impl::
register_op(
epoll_op& op,
epoll_op*& desc_slot,
bool& ready_flag) noexcept
bool& ready_flag,
bool& cancel_flag) noexcept
{
svc_.work_started();

Expand All @@ -52,6 +53,12 @@ register_op(
op.errn = 0;
}

if (cancel_flag)
{
cancel_flag = false;
op.cancelled.store(true, std::memory_order_relaxed);
}

if (io_done || op.cancelled.load(std::memory_order_acquire))
{
svc_.post(&op);
Expand Down Expand Up @@ -238,7 +245,8 @@ connect(
op.start(token, this);
op.impl_ptr = shared_from_this();

register_op(op, desc_state_.connect_op, desc_state_.write_ready);
register_op(op, desc_state_.connect_op, desc_state_.write_ready,
desc_state_.connect_cancel_pending);
return std::noop_coroutine();
}

Expand Down Expand Up @@ -320,7 +328,8 @@ read_some(
op.start(token, this);
op.impl_ptr = shared_from_this();

register_op(op, desc_state_.read_op, desc_state_.read_ready);
register_op(op, desc_state_.read_op, desc_state_.read_ready,
desc_state_.read_cancel_pending);
return std::noop_coroutine();
}

Expand Down Expand Up @@ -400,7 +409,8 @@ write_some(
op.start(token, this);
op.impl_ptr = shared_from_this();

register_op(op, desc_state_.write_op, desc_state_.write_ready);
register_op(op, desc_state_.write_op, desc_state_.write_ready,
desc_state_.write_cancel_pending);
return std::noop_coroutine();
}

Expand Down Expand Up @@ -571,10 +581,16 @@ cancel() noexcept
std::lock_guard lock(desc_state_.mutex);
if (desc_state_.connect_op == &conn_)
conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
else
desc_state_.connect_cancel_pending = true;
if (desc_state_.read_op == &rd_)
rd_claimed = std::exchange(desc_state_.read_op, nullptr);
else
desc_state_.read_cancel_pending = true;
if (desc_state_.write_op == &wr_)
wr_claimed = std::exchange(desc_state_.write_op, nullptr);
else
desc_state_.write_cancel_pending = true;
}

if (conn_claimed)
Expand Down Expand Up @@ -615,6 +631,12 @@ cancel_single_op(epoll_op& op) noexcept
std::lock_guard lock(desc_state_.mutex);
if (*desc_op_ptr == &op)
claimed = std::exchange(*desc_op_ptr, nullptr);
else if (&op == &conn_)
desc_state_.connect_cancel_pending = true;
else if (&op == &rd_)
desc_state_.read_cancel_pending = true;
else if (&op == &wr_)
desc_state_.write_cancel_pending = true;
}
if (claimed)
{
Expand Down Expand Up @@ -659,6 +681,9 @@ close_socket() noexcept
desc_state_.connect_op = nullptr;
desc_state_.read_ready = false;
desc_state_.write_ready = false;
desc_state_.read_cancel_pending = false;
desc_state_.write_cancel_pending = false;
desc_state_.connect_cancel_pending = false;
}
desc_state_.registered_events = 0;

Expand Down
3 changes: 2 additions & 1 deletion src/corosio/src/detail/epoll/sockets.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ class epoll_socket_impl
void register_op(
epoll_op& op,
epoll_op*& desc_slot,
bool& ready_flag) noexcept;
bool& ready_flag,
bool& cancel_flag) noexcept;

friend struct epoll_op;
friend struct epoll_connect_op;
Expand Down
8 changes: 8 additions & 0 deletions src/corosio/src/detail/kqueue/op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@ struct descriptor_state : scheduler_op
bool read_ready = false;
bool write_ready = false;

// Deferred cancellation: set by cancel() when the target op is not
// parked (e.g. completing inline via speculative I/O). Checked when
// the next op parks; if set, the op is immediately self-cancelled.
// This matches IOCP semantics where CancelIoEx always succeeds.
bool read_cancel_pending = false;
bool write_cancel_pending = false;
bool connect_cancel_pending = false;

// Set during registration only (no mutex needed)
std::uint32_t registered_events = 0;
int fd = -1;
Expand Down
47 changes: 45 additions & 2 deletions src/corosio/src/detail/kqueue/sockets.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,11 @@ connect(
else
{
desc_state_.connect_op = &op;
if (desc_state_.connect_cancel_pending)
{
desc_state_.connect_cancel_pending = false;
op.cancelled.store(true, std::memory_order_relaxed);
}
}
}

Expand All @@ -237,6 +242,11 @@ connect(
continue;
}
desc_state_.connect_op = &op;
if (desc_state_.connect_cancel_pending)
{
desc_state_.connect_cancel_pending = false;
op.cancelled.store(true, std::memory_order_relaxed);
}
break;
}
return std::noop_coroutine();
Expand Down Expand Up @@ -310,6 +320,11 @@ do_read_io()
else
{
desc_state_.read_op = &op;
if (desc_state_.read_cancel_pending)
{
desc_state_.read_cancel_pending = false;
op.cancelled.store(true, std::memory_order_relaxed);
}
}
}

Expand All @@ -332,9 +347,13 @@ do_read_io()
continue;
}
desc_state_.read_op = &op;
if (desc_state_.read_cancel_pending)
{
desc_state_.read_cancel_pending = false;
op.cancelled.store(true, std::memory_order_relaxed);
}
break;
}
return;
}

if (op.cancelled.load(std::memory_order_acquire))
Expand Down Expand Up @@ -394,6 +413,11 @@ do_write_io()
else
{
desc_state_.write_op = &op;
if (desc_state_.write_cancel_pending)
{
desc_state_.write_cancel_pending = false;
op.cancelled.store(true, std::memory_order_relaxed);
}
}
}

Expand All @@ -416,9 +440,13 @@ do_write_io()
continue;
}
desc_state_.write_op = &op;
if (desc_state_.write_cancel_pending)
{
desc_state_.write_cancel_pending = false;
op.cancelled.store(true, std::memory_order_relaxed);
}
break;
}
return;
}

if (op.cancelled.load(std::memory_order_acquire))
Expand Down Expand Up @@ -692,10 +720,16 @@ cancel() noexcept
std::lock_guard lock(desc_state_.mutex);
if (desc_state_.connect_op == &conn_)
conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
else
desc_state_.connect_cancel_pending = true;
if (desc_state_.read_op == &rd_)
rd_claimed = std::exchange(desc_state_.read_op, nullptr);
else
desc_state_.read_cancel_pending = true;
if (desc_state_.write_op == &wr_)
wr_claimed = std::exchange(desc_state_.write_op, nullptr);
else
desc_state_.write_cancel_pending = true;
}

if (conn_claimed)
Expand Down Expand Up @@ -736,6 +770,12 @@ cancel_single_op(kqueue_op& op) noexcept
std::lock_guard lock(desc_state_.mutex);
if (*desc_op_ptr == &op)
claimed = std::exchange(*desc_op_ptr, nullptr);
else if (&op == &conn_)
desc_state_.connect_cancel_pending = true;
else if (&op == &rd_)
desc_state_.read_cancel_pending = true;
else if (&op == &wr_)
desc_state_.write_cancel_pending = true;
}
if (claimed)
{
Expand Down Expand Up @@ -782,6 +822,9 @@ close_socket() noexcept
desc_state_.connect_op = nullptr;
desc_state_.read_ready = false;
desc_state_.write_ready = false;
desc_state_.read_cancel_pending = false;
desc_state_.write_cancel_pending = false;
desc_state_.connect_cancel_pending = false;
}
desc_state_.registered_events = 0;

Expand Down
Loading