include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp

84.6% Lines (121/143) 100.0% List of functions (9/9)
epoll_scheduler.hpp
f(x) Functions (9)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19
20 #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21
22 #include <boost/corosio/native/detail/epoll/epoll_op.hpp>
23 #include <boost/corosio/detail/timer_service.hpp>
24 #include <boost/corosio/native/detail/make_err.hpp>
25 #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26 #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27 #include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
28 #include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
29
30 #include <boost/corosio/detail/except.hpp>
31
32 #include <atomic>
33 #include <chrono>
34 #include <cstdint>
35 #include <mutex>
36
37 #include <errno.h>
38 #include <sys/epoll.h>
39 #include <sys/eventfd.h>
40 #include <sys/timerfd.h>
41 #include <unistd.h>
42
43 namespace boost::corosio::detail {
44
45 struct epoll_op;
46 struct descriptor_state;
47
48 /** Linux scheduler using epoll for I/O multiplexing.
49
50 This scheduler implements the scheduler interface using Linux epoll
51 for efficient I/O event notification. It uses a single reactor model
52 where one thread runs epoll_wait while other threads
53 wait on a condition variable for handler work. This design provides:
54
55 - Handler parallelism: N posted handlers can execute on N threads
56 - No thundering herd: condition_variable wakes exactly one thread
57 - IOCP parity: Behavior matches Windows I/O completion port semantics
58
59 When threads call run(), they first try to execute queued handlers.
60 If the queue is empty and no reactor is running, one thread becomes
61 the reactor and runs epoll_wait. Other threads wait on a condition
62 variable until handlers are available.
63
64 @par Thread Safety
65 All public member functions are thread-safe.
66 */
67 class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler_base
68 {
69 public:
70 /** Construct the scheduler.
71
72 Creates an epoll instance, eventfd for reactor interruption,
73 and timerfd for kernel-managed timer expiry.
74
75 @param ctx Reference to the owning execution_context.
76 @param concurrency_hint Hint for expected thread count (unused).
77 */
78 epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
79
80 /// Destroy the scheduler.
81 ~epoll_scheduler() override;
82
83 epoll_scheduler(epoll_scheduler const&) = delete;
84 epoll_scheduler& operator=(epoll_scheduler const&) = delete;
85
86 /// Shut down the scheduler, draining pending operations.
87 void shutdown() override;
88
89 /** Return the epoll file descriptor.
90
91 Used by socket services to register file descriptors
92 for I/O event notification.
93
94 @return The epoll file descriptor.
95 */
96 int epoll_fd() const noexcept
97 {
98 return epoll_fd_;
99 }
100
101 /** Register a descriptor for persistent monitoring.
102
103 The fd is registered once and stays registered until explicitly
104 deregistered. Events are dispatched via descriptor_state which
105 tracks pending read/write/connect operations.
106
107 @param fd The file descriptor to register.
108 @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
109 */
110 void register_descriptor(int fd, descriptor_state* desc) const;
111
112 /** Deregister a persistently registered descriptor.
113
114 @param fd The file descriptor to deregister.
115 */
116 void deregister_descriptor(int fd) const;
117
118 private:
119 void
120 run_task(std::unique_lock<std::mutex>& lock, context_type* ctx,
121 long timeout_us) override;
122 void interrupt_reactor() const override;
123 void update_timerfd() const;
124
125 int epoll_fd_;
126 int event_fd_;
127 int timer_fd_;
128
129 // Edge-triggered eventfd state
130 mutable std::atomic<bool> eventfd_armed_{false};
131
132 // Set when the earliest timer changes; flushed before epoll_wait
133 mutable std::atomic<bool> timerfd_stale_{false};
134 };
135
136 322x inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
137 322x : epoll_fd_(-1)
138 322x , event_fd_(-1)
139 322x , timer_fd_(-1)
140 {
141 322x epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
142 322x if (epoll_fd_ < 0)
143 detail::throw_system_error(make_err(errno), "epoll_create1");
144
145 322x event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
146 322x if (event_fd_ < 0)
147 {
148 int errn = errno;
149 ::close(epoll_fd_);
150 detail::throw_system_error(make_err(errn), "eventfd");
151 }
152
153 322x timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
154 322x if (timer_fd_ < 0)
155 {
156 int errn = errno;
157 ::close(event_fd_);
158 ::close(epoll_fd_);
159 detail::throw_system_error(make_err(errn), "timerfd_create");
160 }
161
162 322x epoll_event ev{};
163 322x ev.events = EPOLLIN | EPOLLET;
164 322x ev.data.ptr = nullptr;
165 322x if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
166 {
167 int errn = errno;
168 ::close(timer_fd_);
169 ::close(event_fd_);
170 ::close(epoll_fd_);
171 detail::throw_system_error(make_err(errn), "epoll_ctl");
172 }
173
174 322x epoll_event timer_ev{};
175 322x timer_ev.events = EPOLLIN | EPOLLERR;
176 322x timer_ev.data.ptr = &timer_fd_;
177 322x if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
178 {
179 int errn = errno;
180 ::close(timer_fd_);
181 ::close(event_fd_);
182 ::close(epoll_fd_);
183 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
184 }
185
186 322x timer_svc_ = &get_timer_service(ctx, *this);
187 322x timer_svc_->set_on_earliest_changed(
188 2362x timer_service::callback(this, [](void* p) {
189 2040x auto* self = static_cast<epoll_scheduler*>(p);
190 2040x self->timerfd_stale_.store(true, std::memory_order_release);
191 2040x self->interrupt_reactor();
192 2040x }));
193
194 322x get_resolver_service(ctx, *this);
195 322x get_signal_service(ctx, *this);
196 322x get_stream_file_service(ctx, *this);
197 322x get_random_access_file_service(ctx, *this);
198
199 322x completed_ops_.push(&task_op_);
200 322x }
201
202 644x inline epoll_scheduler::~epoll_scheduler()
203 {
204 322x if (timer_fd_ >= 0)
205 322x ::close(timer_fd_);
206 322x if (event_fd_ >= 0)
207 322x ::close(event_fd_);
208 322x if (epoll_fd_ >= 0)
209 322x ::close(epoll_fd_);
210 644x }
211
212 inline void
213 322x epoll_scheduler::shutdown()
214 {
215 322x shutdown_drain();
216
217 322x if (event_fd_ >= 0)
218 322x interrupt_reactor();
219 322x }
220
221 inline void
222 3788x epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
223 {
224 3788x epoll_event ev{};
225 3788x ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
226 3788x ev.data.ptr = desc;
227
228 3788x if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
229 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
230
231 3788x desc->registered_events = ev.events;
232 3788x desc->fd = fd;
233 3788x desc->scheduler_ = this;
234 3788x desc->ready_events_.store(0, std::memory_order_relaxed);
235
236 3788x std::lock_guard lock(desc->mutex);
237 3788x desc->impl_ref_.reset();
238 3788x desc->read_ready = false;
239 3788x desc->write_ready = false;
240 3788x }
241
242 inline void
243 3788x epoll_scheduler::deregister_descriptor(int fd) const
244 {
245 3788x ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
246 3788x }
247
248 inline void
249 2650x epoll_scheduler::interrupt_reactor() const
250 {
251 2650x bool expected = false;
252 2650x if (eventfd_armed_.compare_exchange_strong(
253 expected, true, std::memory_order_release,
254 std::memory_order_relaxed))
255 {
256 2438x std::uint64_t val = 1;
257 2438x [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
258 }
259 2650x }
260
261 inline void
262 4037x epoll_scheduler::update_timerfd() const
263 {
264 4037x auto nearest = timer_svc_->nearest_expiry();
265
266 4037x itimerspec ts{};
267 4037x int flags = 0;
268
269 4037x if (nearest == timer_service::time_point::max())
270 {
271 // No timers — disarm by setting to 0 (relative)
272 }
273 else
274 {
275 3983x auto now = std::chrono::steady_clock::now();
276 3983x if (nearest <= now)
277 {
278 // Use 1ns instead of 0 — zero disarms the timerfd
279 114x ts.it_value.tv_nsec = 1;
280 }
281 else
282 {
283 3869x auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
284 3869x nearest - now)
285 3869x .count();
286 3869x ts.it_value.tv_sec = nsec / 1000000000;
287 3869x ts.it_value.tv_nsec = nsec % 1000000000;
288 3869x if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
289 ts.it_value.tv_nsec = 1;
290 }
291 }
292
293 4037x if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
294 detail::throw_system_error(make_err(errno), "timerfd_settime");
295 4037x }
296
297 inline void
298 19388x epoll_scheduler::run_task(
299 std::unique_lock<std::mutex>& lock, context_type* ctx, long timeout_us)
300 {
301 int timeout_ms;
302 19388x if (task_interrupted_)
303 13825x timeout_ms = 0;
304 5563x else if (timeout_us < 0)
305 5555x timeout_ms = -1;
306 else
307 8x timeout_ms = static_cast<int>((timeout_us + 999) / 1000);
308
309 19388x if (lock.owns_lock())
310 5563x lock.unlock();
311
312 19388x task_cleanup on_exit{this, &lock, ctx};
313
314 // Flush deferred timerfd programming before blocking
315 19388x if (timerfd_stale_.exchange(false, std::memory_order_acquire))
316 2018x update_timerfd();
317
318 epoll_event events[128];
319 19388x int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
320
321 19388x if (nfds < 0 && errno != EINTR)
322 detail::throw_system_error(make_err(errno), "epoll_wait");
323
324 19388x bool check_timers = false;
325 19388x op_queue local_ops;
326
327 46989x for (int i = 0; i < nfds; ++i)
328 {
329 27601x if (events[i].data.ptr == nullptr)
330 {
331 std::uint64_t val;
332 // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
333 2116x [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
334 2116x eventfd_armed_.store(false, std::memory_order_relaxed);
335 2116x continue;
336 2116x }
337
338 25485x if (events[i].data.ptr == &timer_fd_)
339 {
340 std::uint64_t expirations;
341 // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
342 [[maybe_unused]] auto r =
343 2019x ::read(timer_fd_, &expirations, sizeof(expirations));
344 2019x check_timers = true;
345 2019x continue;
346 2019x }
347
348 23466x auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
349 23466x desc->add_ready_events(events[i].events);
350
351 23466x bool expected = false;
352 23466x if (desc->is_enqueued_.compare_exchange_strong(
353 expected, true, std::memory_order_release,
354 std::memory_order_relaxed))
355 {
356 23466x local_ops.push(desc);
357 }
358 }
359
360 19388x if (check_timers)
361 {
362 2019x timer_svc_->process_expired();
363 2019x update_timerfd();
364 }
365
366 19388x lock.lock();
367
368 19388x if (!local_ops.empty())
369 13319x completed_ops_.splice(local_ops);
370 19388x }
371
372 } // namespace boost::corosio::detail
373
374 #endif // BOOST_COROSIO_HAS_EPOLL
375
376 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
377