TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 :
20 : #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21 :
22 : #include <boost/corosio/native/detail/epoll/epoll_op.hpp>
23 : #include <boost/corosio/detail/timer_service.hpp>
24 : #include <boost/corosio/native/detail/make_err.hpp>
25 : #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26 : #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27 : #include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
28 : #include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
29 :
30 : #include <boost/corosio/detail/except.hpp>
31 :
32 : #include <atomic>
33 : #include <chrono>
34 : #include <cstdint>
35 : #include <mutex>
36 :
37 : #include <errno.h>
38 : #include <sys/epoll.h>
39 : #include <sys/eventfd.h>
40 : #include <sys/timerfd.h>
41 : #include <unistd.h>
42 :
43 : namespace boost::corosio::detail {
44 :
45 : struct epoll_op;
46 : struct descriptor_state;
47 :
48 : /** Linux scheduler using epoll for I/O multiplexing.
49 :
50 : This scheduler implements the scheduler interface using Linux epoll
51 : for efficient I/O event notification. It uses a single reactor model
52 : where one thread runs epoll_wait while other threads
53 : wait on a condition variable for handler work. This design provides:
54 :
55 : - Handler parallelism: N posted handlers can execute on N threads
56 : - No thundering herd: condition_variable wakes exactly one thread
57 : - IOCP parity: Behavior matches Windows I/O completion port semantics
58 :
59 : When threads call run(), they first try to execute queued handlers.
60 : If the queue is empty and no reactor is running, one thread becomes
61 : the reactor and runs epoll_wait. Other threads wait on a condition
62 : variable until handlers are available.
63 :
64 : @par Thread Safety
65 : All public member functions are thread-safe.
66 : */
67 : class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler_base
68 : {
69 : public:
70 : /** Construct the scheduler.
71 :
72 : Creates an epoll instance, eventfd for reactor interruption,
73 : and timerfd for kernel-managed timer expiry.
74 :
75 : @param ctx Reference to the owning execution_context.
76 : @param concurrency_hint Hint for expected thread count (unused).
77 : */
78 : epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
79 :
80 : /// Destroy the scheduler.
81 : ~epoll_scheduler() override;
82 :
83 : epoll_scheduler(epoll_scheduler const&) = delete;
84 : epoll_scheduler& operator=(epoll_scheduler const&) = delete;
85 :
86 : /// Shut down the scheduler, draining pending operations.
87 : void shutdown() override;
88 :
89 : /** Return the epoll file descriptor.
90 :
91 : Used by socket services to register file descriptors
92 : for I/O event notification.
93 :
94 : @return The epoll file descriptor.
95 : */
96 : int epoll_fd() const noexcept
97 : {
98 : return epoll_fd_;
99 : }
100 :
101 : /** Register a descriptor for persistent monitoring.
102 :
103 : The fd is registered once and stays registered until explicitly
104 : deregistered. Events are dispatched via descriptor_state which
105 : tracks pending read/write/connect operations.
106 :
107 : @param fd The file descriptor to register.
108 : @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
109 : */
110 : void register_descriptor(int fd, descriptor_state* desc) const;
111 :
112 : /** Deregister a persistently registered descriptor.
113 :
114 : @param fd The file descriptor to deregister.
115 : */
116 : void deregister_descriptor(int fd) const;
117 :
118 : private:
119 : void
120 : run_task(std::unique_lock<std::mutex>& lock, context_type* ctx,
121 : long timeout_us) override;
122 : void interrupt_reactor() const override;
123 : void update_timerfd() const;
124 :
125 : int epoll_fd_;
126 : int event_fd_;
127 : int timer_fd_;
128 :
129 : // Edge-triggered eventfd state
130 : mutable std::atomic<bool> eventfd_armed_{false};
131 :
132 : // Set when the earliest timer changes; flushed before epoll_wait
133 : mutable std::atomic<bool> timerfd_stale_{false};
134 : };
135 :
136 HIT 322 : inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
137 322 : : epoll_fd_(-1)
138 322 : , event_fd_(-1)
139 322 : , timer_fd_(-1)
140 : {
141 322 : epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
142 322 : if (epoll_fd_ < 0)
143 MIS 0 : detail::throw_system_error(make_err(errno), "epoll_create1");
144 :
145 HIT 322 : event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
146 322 : if (event_fd_ < 0)
147 : {
148 MIS 0 : int errn = errno;
149 0 : ::close(epoll_fd_);
150 0 : detail::throw_system_error(make_err(errn), "eventfd");
151 : }
152 :
153 HIT 322 : timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
154 322 : if (timer_fd_ < 0)
155 : {
156 MIS 0 : int errn = errno;
157 0 : ::close(event_fd_);
158 0 : ::close(epoll_fd_);
159 0 : detail::throw_system_error(make_err(errn), "timerfd_create");
160 : }
161 :
162 HIT 322 : epoll_event ev{};
163 322 : ev.events = EPOLLIN | EPOLLET;
164 322 : ev.data.ptr = nullptr;
165 322 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
166 : {
167 MIS 0 : int errn = errno;
168 0 : ::close(timer_fd_);
169 0 : ::close(event_fd_);
170 0 : ::close(epoll_fd_);
171 0 : detail::throw_system_error(make_err(errn), "epoll_ctl");
172 : }
173 :
174 HIT 322 : epoll_event timer_ev{};
175 322 : timer_ev.events = EPOLLIN | EPOLLERR;
176 322 : timer_ev.data.ptr = &timer_fd_;
177 322 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
178 : {
179 MIS 0 : int errn = errno;
180 0 : ::close(timer_fd_);
181 0 : ::close(event_fd_);
182 0 : ::close(epoll_fd_);
183 0 : detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
184 : }
185 :
186 HIT 322 : timer_svc_ = &get_timer_service(ctx, *this);
187 322 : timer_svc_->set_on_earliest_changed(
188 2362 : timer_service::callback(this, [](void* p) {
189 2040 : auto* self = static_cast<epoll_scheduler*>(p);
190 2040 : self->timerfd_stale_.store(true, std::memory_order_release);
191 2040 : self->interrupt_reactor();
192 2040 : }));
193 :
194 322 : get_resolver_service(ctx, *this);
195 322 : get_signal_service(ctx, *this);
196 322 : get_stream_file_service(ctx, *this);
197 322 : get_random_access_file_service(ctx, *this);
198 :
199 322 : completed_ops_.push(&task_op_);
200 322 : }
201 :
202 644 : inline epoll_scheduler::~epoll_scheduler()
203 : {
204 322 : if (timer_fd_ >= 0)
205 322 : ::close(timer_fd_);
206 322 : if (event_fd_ >= 0)
207 322 : ::close(event_fd_);
208 322 : if (epoll_fd_ >= 0)
209 322 : ::close(epoll_fd_);
210 644 : }
211 :
212 : inline void
213 322 : epoll_scheduler::shutdown()
214 : {
215 322 : shutdown_drain();
216 :
217 322 : if (event_fd_ >= 0)
218 322 : interrupt_reactor();
219 322 : }
220 :
221 : inline void
222 3788 : epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
223 : {
224 3788 : epoll_event ev{};
225 3788 : ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
226 3788 : ev.data.ptr = desc;
227 :
228 3788 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
229 MIS 0 : detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
230 :
231 HIT 3788 : desc->registered_events = ev.events;
232 3788 : desc->fd = fd;
233 3788 : desc->scheduler_ = this;
234 3788 : desc->ready_events_.store(0, std::memory_order_relaxed);
235 :
236 3788 : std::lock_guard lock(desc->mutex);
237 3788 : desc->impl_ref_.reset();
238 3788 : desc->read_ready = false;
239 3788 : desc->write_ready = false;
240 3788 : }
241 :
242 : inline void
243 3788 : epoll_scheduler::deregister_descriptor(int fd) const
244 : {
245 3788 : ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
246 3788 : }
247 :
248 : inline void
249 2650 : epoll_scheduler::interrupt_reactor() const
250 : {
251 2650 : bool expected = false;
252 2650 : if (eventfd_armed_.compare_exchange_strong(
253 : expected, true, std::memory_order_release,
254 : std::memory_order_relaxed))
255 : {
256 2438 : std::uint64_t val = 1;
257 2438 : [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
258 : }
259 2650 : }
260 :
261 : inline void
262 4037 : epoll_scheduler::update_timerfd() const
263 : {
264 4037 : auto nearest = timer_svc_->nearest_expiry();
265 :
266 4037 : itimerspec ts{};
267 4037 : int flags = 0;
268 :
269 4037 : if (nearest == timer_service::time_point::max())
270 : {
271 : // No timers — disarm by setting to 0 (relative)
272 : }
273 : else
274 : {
275 3983 : auto now = std::chrono::steady_clock::now();
276 3983 : if (nearest <= now)
277 : {
278 : // Use 1ns instead of 0 — zero disarms the timerfd
279 114 : ts.it_value.tv_nsec = 1;
280 : }
281 : else
282 : {
283 3869 : auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
284 3869 : nearest - now)
285 3869 : .count();
286 3869 : ts.it_value.tv_sec = nsec / 1000000000;
287 3869 : ts.it_value.tv_nsec = nsec % 1000000000;
288 3869 : if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
289 MIS 0 : ts.it_value.tv_nsec = 1;
290 : }
291 : }
292 :
293 HIT 4037 : if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
294 MIS 0 : detail::throw_system_error(make_err(errno), "timerfd_settime");
295 HIT 4037 : }
296 :
297 : inline void
298 19388 : epoll_scheduler::run_task(
299 : std::unique_lock<std::mutex>& lock, context_type* ctx, long timeout_us)
300 : {
301 : int timeout_ms;
302 19388 : if (task_interrupted_)
303 13825 : timeout_ms = 0;
304 5563 : else if (timeout_us < 0)
305 5555 : timeout_ms = -1;
306 : else
307 8 : timeout_ms = static_cast<int>((timeout_us + 999) / 1000);
308 :
309 19388 : if (lock.owns_lock())
310 5563 : lock.unlock();
311 :
312 19388 : task_cleanup on_exit{this, &lock, ctx};
313 :
314 : // Flush deferred timerfd programming before blocking
315 19388 : if (timerfd_stale_.exchange(false, std::memory_order_acquire))
316 2018 : update_timerfd();
317 :
318 : epoll_event events[128];
319 19388 : int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
320 :
321 19388 : if (nfds < 0 && errno != EINTR)
322 MIS 0 : detail::throw_system_error(make_err(errno), "epoll_wait");
323 :
324 HIT 19388 : bool check_timers = false;
325 19388 : op_queue local_ops;
326 :
327 46989 : for (int i = 0; i < nfds; ++i)
328 : {
329 27601 : if (events[i].data.ptr == nullptr)
330 : {
331 : std::uint64_t val;
332 : // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
333 2116 : [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
334 2116 : eventfd_armed_.store(false, std::memory_order_relaxed);
335 2116 : continue;
336 2116 : }
337 :
338 25485 : if (events[i].data.ptr == &timer_fd_)
339 : {
340 : std::uint64_t expirations;
341 : // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
342 : [[maybe_unused]] auto r =
343 2019 : ::read(timer_fd_, &expirations, sizeof(expirations));
344 2019 : check_timers = true;
345 2019 : continue;
346 2019 : }
347 :
348 23466 : auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
349 23466 : desc->add_ready_events(events[i].events);
350 :
351 23466 : bool expected = false;
352 23466 : if (desc->is_enqueued_.compare_exchange_strong(
353 : expected, true, std::memory_order_release,
354 : std::memory_order_relaxed))
355 : {
356 23466 : local_ops.push(desc);
357 : }
358 : }
359 :
360 19388 : if (check_timers)
361 : {
362 2019 : timer_svc_->process_expired();
363 2019 : update_timerfd();
364 : }
365 :
366 19388 : lock.lock();
367 :
368 19388 : if (!local_ops.empty())
369 13319 : completed_ops_.splice(local_ops);
370 19388 : }
371 :
372 : } // namespace boost::corosio::detail
373 :
374 : #endif // BOOST_COROSIO_HAS_EPOLL
375 :
376 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
|