1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_EPOLL
15  
#if BOOST_COROSIO_HAS_EPOLL
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  

19  

20  
#include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
20  
#include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21  

21  

22  
#include <boost/corosio/native/detail/epoll/epoll_op.hpp>
22  
#include <boost/corosio/native/detail/epoll/epoll_op.hpp>
23  
#include <boost/corosio/detail/timer_service.hpp>
23  
#include <boost/corosio/detail/timer_service.hpp>
24  
#include <boost/corosio/native/detail/make_err.hpp>
24  
#include <boost/corosio/native/detail/make_err.hpp>
25  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
25  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
28  
#include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
28  
#include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
29  

29  

30  
#include <boost/corosio/detail/except.hpp>
30  
#include <boost/corosio/detail/except.hpp>
31  

31  

32  
#include <atomic>
32  
#include <atomic>
33  
#include <chrono>
33  
#include <chrono>
34  
#include <cstdint>
34  
#include <cstdint>
35  
#include <mutex>
35  
#include <mutex>
36  

36  

37  
#include <errno.h>
37  
#include <errno.h>
38  
#include <sys/epoll.h>
38  
#include <sys/epoll.h>
39  
#include <sys/eventfd.h>
39  
#include <sys/eventfd.h>
40  
#include <sys/timerfd.h>
40  
#include <sys/timerfd.h>
41  
#include <unistd.h>
41  
#include <unistd.h>
42  

42  

43  
namespace boost::corosio::detail {
43  
namespace boost::corosio::detail {
44  

44  

45  
struct epoll_op;
45  
struct epoll_op;
46  
struct descriptor_state;
46  
struct descriptor_state;
47  

47  

48  
/** Linux scheduler using epoll for I/O multiplexing.
48  
/** Linux scheduler using epoll for I/O multiplexing.
49  

49  

50  
    This scheduler implements the scheduler interface using Linux epoll
50  
    This scheduler implements the scheduler interface using Linux epoll
51  
    for efficient I/O event notification. It uses a single reactor model
51  
    for efficient I/O event notification. It uses a single reactor model
52  
    where one thread runs epoll_wait while other threads
52  
    where one thread runs epoll_wait while other threads
53  
    wait on a condition variable for handler work. This design provides:
53  
    wait on a condition variable for handler work. This design provides:
54  

54  

55  
    - Handler parallelism: N posted handlers can execute on N threads
55  
    - Handler parallelism: N posted handlers can execute on N threads
56  
    - No thundering herd: condition_variable wakes exactly one thread
56  
    - No thundering herd: condition_variable wakes exactly one thread
57  
    - IOCP parity: Behavior matches Windows I/O completion port semantics
57  
    - IOCP parity: Behavior matches Windows I/O completion port semantics
58  

58  

59  
    When threads call run(), they first try to execute queued handlers.
59  
    When threads call run(), they first try to execute queued handlers.
60  
    If the queue is empty and no reactor is running, one thread becomes
60  
    If the queue is empty and no reactor is running, one thread becomes
61  
    the reactor and runs epoll_wait. Other threads wait on a condition
61  
    the reactor and runs epoll_wait. Other threads wait on a condition
62  
    variable until handlers are available.
62  
    variable until handlers are available.
63  

63  

64  
    @par Thread Safety
64  
    @par Thread Safety
65  
    All public member functions are thread-safe.
65  
    All public member functions are thread-safe.
66  
*/
66  
*/
67  
class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler_base
67  
class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler_base
68  
{
68  
{
69  
public:
69  
public:
70  
    /** Construct the scheduler.
70  
    /** Construct the scheduler.
71  

71  

72  
        Creates an epoll instance, eventfd for reactor interruption,
72  
        Creates an epoll instance, eventfd for reactor interruption,
73  
        and timerfd for kernel-managed timer expiry.
73  
        and timerfd for kernel-managed timer expiry.
74  

74  

75  
        @param ctx Reference to the owning execution_context.
75  
        @param ctx Reference to the owning execution_context.
76  
        @param concurrency_hint Hint for expected thread count (unused).
76  
        @param concurrency_hint Hint for expected thread count (unused).
77  
    */
77  
    */
78  
    epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
78  
    epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
79  

79  

80  
    /// Destroy the scheduler.
80  
    /// Destroy the scheduler.
81  
    ~epoll_scheduler() override;
81  
    ~epoll_scheduler() override;
82  

82  

83  
    epoll_scheduler(epoll_scheduler const&)            = delete;
83  
    epoll_scheduler(epoll_scheduler const&)            = delete;
84  
    epoll_scheduler& operator=(epoll_scheduler const&) = delete;
84  
    epoll_scheduler& operator=(epoll_scheduler const&) = delete;
85  

85  

86  
    /// Shut down the scheduler, draining pending operations.
86  
    /// Shut down the scheduler, draining pending operations.
87  
    void shutdown() override;
87  
    void shutdown() override;
88  

88  

89  
    /** Return the epoll file descriptor.
89  
    /** Return the epoll file descriptor.
90  

90  

91  
        Used by socket services to register file descriptors
91  
        Used by socket services to register file descriptors
92  
        for I/O event notification.
92  
        for I/O event notification.
93  

93  

94  
        @return The epoll file descriptor.
94  
        @return The epoll file descriptor.
95  
    */
95  
    */
96  
    int epoll_fd() const noexcept
96  
    int epoll_fd() const noexcept
97  
    {
97  
    {
98  
        return epoll_fd_;
98  
        return epoll_fd_;
99  
    }
99  
    }
100  

100  

101  
    /** Register a descriptor for persistent monitoring.
101  
    /** Register a descriptor for persistent monitoring.
102  

102  

103  
        The fd is registered once and stays registered until explicitly
103  
        The fd is registered once and stays registered until explicitly
104  
        deregistered. Events are dispatched via descriptor_state which
104  
        deregistered. Events are dispatched via descriptor_state which
105  
        tracks pending read/write/connect operations.
105  
        tracks pending read/write/connect operations.
106  

106  

107  
        @param fd The file descriptor to register.
107  
        @param fd The file descriptor to register.
108  
        @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
108  
        @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
109  
    */
109  
    */
110  
    void register_descriptor(int fd, descriptor_state* desc) const;
110  
    void register_descriptor(int fd, descriptor_state* desc) const;
111  

111  

112  
    /** Deregister a persistently registered descriptor.
112  
    /** Deregister a persistently registered descriptor.
113  

113  

114  
        @param fd The file descriptor to deregister.
114  
        @param fd The file descriptor to deregister.
115  
    */
115  
    */
116  
    void deregister_descriptor(int fd) const;
116  
    void deregister_descriptor(int fd) const;
117  

117  

118  
private:
118  
private:
119  
    void
119  
    void
120 -
    run_task(std::unique_lock<std::mutex>& lock, context_type* ctx) override;
120 +
    run_task(std::unique_lock<std::mutex>& lock, context_type* ctx,
 
121 +
        long timeout_us) override;
121  
    void interrupt_reactor() const override;
122  
    void interrupt_reactor() const override;
122  
    void update_timerfd() const;
123  
    void update_timerfd() const;
123  

124  

124  
    int epoll_fd_;
125  
    int epoll_fd_;
125  
    int event_fd_;
126  
    int event_fd_;
126  
    int timer_fd_;
127  
    int timer_fd_;
127  

128  

128  
    // Edge-triggered eventfd state
129  
    // Edge-triggered eventfd state
129  
    mutable std::atomic<bool> eventfd_armed_{false};
130  
    mutable std::atomic<bool> eventfd_armed_{false};
130  

131  

131  
    // Set when the earliest timer changes; flushed before epoll_wait
132  
    // Set when the earliest timer changes; flushed before epoll_wait
132  
    mutable std::atomic<bool> timerfd_stale_{false};
133  
    mutable std::atomic<bool> timerfd_stale_{false};
133  
};
134  
};
134  

135  

135  
inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
136  
inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
136  
    : epoll_fd_(-1)
137  
    : epoll_fd_(-1)
137  
    , event_fd_(-1)
138  
    , event_fd_(-1)
138  
    , timer_fd_(-1)
139  
    , timer_fd_(-1)
139  
{
140  
{
140  
    epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
141  
    epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
141  
    if (epoll_fd_ < 0)
142  
    if (epoll_fd_ < 0)
142  
        detail::throw_system_error(make_err(errno), "epoll_create1");
143  
        detail::throw_system_error(make_err(errno), "epoll_create1");
143  

144  

144  
    event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
145  
    event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
145  
    if (event_fd_ < 0)
146  
    if (event_fd_ < 0)
146  
    {
147  
    {
147  
        int errn = errno;
148  
        int errn = errno;
148  
        ::close(epoll_fd_);
149  
        ::close(epoll_fd_);
149  
        detail::throw_system_error(make_err(errn), "eventfd");
150  
        detail::throw_system_error(make_err(errn), "eventfd");
150  
    }
151  
    }
151  

152  

152  
    timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
153  
    timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
153  
    if (timer_fd_ < 0)
154  
    if (timer_fd_ < 0)
154  
    {
155  
    {
155  
        int errn = errno;
156  
        int errn = errno;
156  
        ::close(event_fd_);
157  
        ::close(event_fd_);
157  
        ::close(epoll_fd_);
158  
        ::close(epoll_fd_);
158  
        detail::throw_system_error(make_err(errn), "timerfd_create");
159  
        detail::throw_system_error(make_err(errn), "timerfd_create");
159  
    }
160  
    }
160  

161  

161  
    epoll_event ev{};
162  
    epoll_event ev{};
162  
    ev.events   = EPOLLIN | EPOLLET;
163  
    ev.events   = EPOLLIN | EPOLLET;
163  
    ev.data.ptr = nullptr;
164  
    ev.data.ptr = nullptr;
164  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
165  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
165  
    {
166  
    {
166  
        int errn = errno;
167  
        int errn = errno;
167  
        ::close(timer_fd_);
168  
        ::close(timer_fd_);
168  
        ::close(event_fd_);
169  
        ::close(event_fd_);
169  
        ::close(epoll_fd_);
170  
        ::close(epoll_fd_);
170  
        detail::throw_system_error(make_err(errn), "epoll_ctl");
171  
        detail::throw_system_error(make_err(errn), "epoll_ctl");
171  
    }
172  
    }
172  

173  

173  
    epoll_event timer_ev{};
174  
    epoll_event timer_ev{};
174  
    timer_ev.events   = EPOLLIN | EPOLLERR;
175  
    timer_ev.events   = EPOLLIN | EPOLLERR;
175  
    timer_ev.data.ptr = &timer_fd_;
176  
    timer_ev.data.ptr = &timer_fd_;
176  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
177  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
177  
    {
178  
    {
178  
        int errn = errno;
179  
        int errn = errno;
179  
        ::close(timer_fd_);
180  
        ::close(timer_fd_);
180  
        ::close(event_fd_);
181  
        ::close(event_fd_);
181  
        ::close(epoll_fd_);
182  
        ::close(epoll_fd_);
182  
        detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
183  
        detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
183  
    }
184  
    }
184  

185  

185  
    timer_svc_ = &get_timer_service(ctx, *this);
186  
    timer_svc_ = &get_timer_service(ctx, *this);
186  
    timer_svc_->set_on_earliest_changed(
187  
    timer_svc_->set_on_earliest_changed(
187  
        timer_service::callback(this, [](void* p) {
188  
        timer_service::callback(this, [](void* p) {
188  
            auto* self = static_cast<epoll_scheduler*>(p);
189  
            auto* self = static_cast<epoll_scheduler*>(p);
189  
            self->timerfd_stale_.store(true, std::memory_order_release);
190  
            self->timerfd_stale_.store(true, std::memory_order_release);
190  
            self->interrupt_reactor();
191  
            self->interrupt_reactor();
191  
        }));
192  
        }));
192  

193  

193  
    get_resolver_service(ctx, *this);
194  
    get_resolver_service(ctx, *this);
194  
    get_signal_service(ctx, *this);
195  
    get_signal_service(ctx, *this);
195  
    get_stream_file_service(ctx, *this);
196  
    get_stream_file_service(ctx, *this);
196  
    get_random_access_file_service(ctx, *this);
197  
    get_random_access_file_service(ctx, *this);
197  

198  

198  
    completed_ops_.push(&task_op_);
199  
    completed_ops_.push(&task_op_);
199  
}
200  
}
200  

201  

201  
inline epoll_scheduler::~epoll_scheduler()
202  
inline epoll_scheduler::~epoll_scheduler()
202  
{
203  
{
203  
    if (timer_fd_ >= 0)
204  
    if (timer_fd_ >= 0)
204  
        ::close(timer_fd_);
205  
        ::close(timer_fd_);
205  
    if (event_fd_ >= 0)
206  
    if (event_fd_ >= 0)
206  
        ::close(event_fd_);
207  
        ::close(event_fd_);
207  
    if (epoll_fd_ >= 0)
208  
    if (epoll_fd_ >= 0)
208  
        ::close(epoll_fd_);
209  
        ::close(epoll_fd_);
209  
}
210  
}
210  

211  

211  
inline void
212  
inline void
212  
epoll_scheduler::shutdown()
213  
epoll_scheduler::shutdown()
213  
{
214  
{
214  
    shutdown_drain();
215  
    shutdown_drain();
215  

216  

216  
    if (event_fd_ >= 0)
217  
    if (event_fd_ >= 0)
217  
        interrupt_reactor();
218  
        interrupt_reactor();
218  
}
219  
}
219  

220  

220  
inline void
221  
inline void
221  
epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
222  
epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
222  
{
223  
{
223  
    epoll_event ev{};
224  
    epoll_event ev{};
224  
    ev.events   = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
225  
    ev.events   = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
225  
    ev.data.ptr = desc;
226  
    ev.data.ptr = desc;
226  

227  

227  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
228  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
228  
        detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
229  
        detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
229  

230  

230  
    desc->registered_events = ev.events;
231  
    desc->registered_events = ev.events;
231  
    desc->fd                = fd;
232  
    desc->fd                = fd;
232  
    desc->scheduler_        = this;
233  
    desc->scheduler_        = this;
233  
    desc->ready_events_.store(0, std::memory_order_relaxed);
234  
    desc->ready_events_.store(0, std::memory_order_relaxed);
234  

235  

235  
    std::lock_guard lock(desc->mutex);
236  
    std::lock_guard lock(desc->mutex);
236  
    desc->impl_ref_.reset();
237  
    desc->impl_ref_.reset();
237  
    desc->read_ready  = false;
238  
    desc->read_ready  = false;
238  
    desc->write_ready = false;
239  
    desc->write_ready = false;
239  
}
240  
}
240  

241  

241  
inline void
242  
inline void
242  
epoll_scheduler::deregister_descriptor(int fd) const
243  
epoll_scheduler::deregister_descriptor(int fd) const
243  
{
244  
{
244  
    ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
245  
    ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
245  
}
246  
}
246  

247  

247  
inline void
248  
inline void
248  
epoll_scheduler::interrupt_reactor() const
249  
epoll_scheduler::interrupt_reactor() const
249  
{
250  
{
250  
    bool expected = false;
251  
    bool expected = false;
251  
    if (eventfd_armed_.compare_exchange_strong(
252  
    if (eventfd_armed_.compare_exchange_strong(
252  
            expected, true, std::memory_order_release,
253  
            expected, true, std::memory_order_release,
253  
            std::memory_order_relaxed))
254  
            std::memory_order_relaxed))
254  
    {
255  
    {
255  
        std::uint64_t val       = 1;
256  
        std::uint64_t val       = 1;
256  
        [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
257  
        [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
257  
    }
258  
    }
258  
}
259  
}
259  

260  

260  
inline void
261  
inline void
261  
epoll_scheduler::update_timerfd() const
262  
epoll_scheduler::update_timerfd() const
262  
{
263  
{
263  
    auto nearest = timer_svc_->nearest_expiry();
264  
    auto nearest = timer_svc_->nearest_expiry();
264  

265  

265  
    itimerspec ts{};
266  
    itimerspec ts{};
266  
    int flags = 0;
267  
    int flags = 0;
267  

268  

268  
    if (nearest == timer_service::time_point::max())
269  
    if (nearest == timer_service::time_point::max())
269  
    {
270  
    {
270  
        // No timers — disarm by setting to 0 (relative)
271  
        // No timers — disarm by setting to 0 (relative)
271  
    }
272  
    }
272  
    else
273  
    else
273  
    {
274  
    {
274  
        auto now = std::chrono::steady_clock::now();
275  
        auto now = std::chrono::steady_clock::now();
275  
        if (nearest <= now)
276  
        if (nearest <= now)
276  
        {
277  
        {
277  
            // Use 1ns instead of 0 — zero disarms the timerfd
278  
            // Use 1ns instead of 0 — zero disarms the timerfd
278  
            ts.it_value.tv_nsec = 1;
279  
            ts.it_value.tv_nsec = 1;
279  
        }
280  
        }
280  
        else
281  
        else
281  
        {
282  
        {
282  
            auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
283  
            auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
283  
                            nearest - now)
284  
                            nearest - now)
284  
                            .count();
285  
                            .count();
285  
            ts.it_value.tv_sec  = nsec / 1000000000;
286  
            ts.it_value.tv_sec  = nsec / 1000000000;
286  
            ts.it_value.tv_nsec = nsec % 1000000000;
287  
            ts.it_value.tv_nsec = nsec % 1000000000;
287  
            if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
288  
            if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
288  
                ts.it_value.tv_nsec = 1;
289  
                ts.it_value.tv_nsec = 1;
289  
        }
290  
        }
290  
    }
291  
    }
291  

292  

292  
    if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
293  
    if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
293  
        detail::throw_system_error(make_err(errno), "timerfd_settime");
294  
        detail::throw_system_error(make_err(errno), "timerfd_settime");
294  
}
295  
}
295  

296  

296  
inline void
297  
inline void
297 -
epoll_scheduler::run_task(std::unique_lock<std::mutex>& lock, context_type* ctx)
298 +
epoll_scheduler::run_task(
 
299 +
    std::unique_lock<std::mutex>& lock, context_type* ctx, long timeout_us)
298  
{
300  
{
299 -
    int timeout_ms = task_interrupted_ ? 0 : -1;
301 +
    int timeout_ms;
 
302 +
    if (task_interrupted_)
 
303 +
        timeout_ms = 0;
 
304 +
    else if (timeout_us < 0)
 
305 +
        timeout_ms = -1;
 
306 +
    else
 
307 +
        timeout_ms = static_cast<int>((timeout_us + 999) / 1000);
300  

308  

301  
    if (lock.owns_lock())
309  
    if (lock.owns_lock())
302  
        lock.unlock();
310  
        lock.unlock();
303  

311  

304  
    task_cleanup on_exit{this, &lock, ctx};
312  
    task_cleanup on_exit{this, &lock, ctx};
305  

313  

306  
    // Flush deferred timerfd programming before blocking
314  
    // Flush deferred timerfd programming before blocking
307  
    if (timerfd_stale_.exchange(false, std::memory_order_acquire))
315  
    if (timerfd_stale_.exchange(false, std::memory_order_acquire))
308  
        update_timerfd();
316  
        update_timerfd();
309  

317  

310  
    epoll_event events[128];
318  
    epoll_event events[128];
311  
    int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
319  
    int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
312  

320  

313  
    if (nfds < 0 && errno != EINTR)
321  
    if (nfds < 0 && errno != EINTR)
314  
        detail::throw_system_error(make_err(errno), "epoll_wait");
322  
        detail::throw_system_error(make_err(errno), "epoll_wait");
315  

323  

316  
    bool check_timers = false;
324  
    bool check_timers = false;
317  
    op_queue local_ops;
325  
    op_queue local_ops;
318  

326  

319  
    for (int i = 0; i < nfds; ++i)
327  
    for (int i = 0; i < nfds; ++i)
320  
    {
328  
    {
321  
        if (events[i].data.ptr == nullptr)
329  
        if (events[i].data.ptr == nullptr)
322  
        {
330  
        {
323  
            std::uint64_t val;
331  
            std::uint64_t val;
324  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
332  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
325  
            [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
333  
            [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
326  
            eventfd_armed_.store(false, std::memory_order_relaxed);
334  
            eventfd_armed_.store(false, std::memory_order_relaxed);
327  
            continue;
335  
            continue;
328  
        }
336  
        }
329  

337  

330  
        if (events[i].data.ptr == &timer_fd_)
338  
        if (events[i].data.ptr == &timer_fd_)
331  
        {
339  
        {
332  
            std::uint64_t expirations;
340  
            std::uint64_t expirations;
333  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
341  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
334  
            [[maybe_unused]] auto r =
342  
            [[maybe_unused]] auto r =
335  
                ::read(timer_fd_, &expirations, sizeof(expirations));
343  
                ::read(timer_fd_, &expirations, sizeof(expirations));
336  
            check_timers = true;
344  
            check_timers = true;
337  
            continue;
345  
            continue;
338  
        }
346  
        }
339  

347  

340  
        auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
348  
        auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
341  
        desc->add_ready_events(events[i].events);
349  
        desc->add_ready_events(events[i].events);
342  

350  

343  
        bool expected = false;
351  
        bool expected = false;
344  
        if (desc->is_enqueued_.compare_exchange_strong(
352  
        if (desc->is_enqueued_.compare_exchange_strong(
345  
                expected, true, std::memory_order_release,
353  
                expected, true, std::memory_order_release,
346  
                std::memory_order_relaxed))
354  
                std::memory_order_relaxed))
347  
        {
355  
        {
348  
            local_ops.push(desc);
356  
            local_ops.push(desc);
349  
        }
357  
        }
350  
    }
358  
    }
351  

359  

352  
    if (check_timers)
360  
    if (check_timers)
353  
    {
361  
    {
354  
        timer_svc_->process_expired();
362  
        timer_svc_->process_expired();
355  
        update_timerfd();
363  
        update_timerfd();
356  
    }
364  
    }
357  

365  

358  
    lock.lock();
366  
    lock.lock();
359  

367  

360  
    if (!local_ops.empty())
368  
    if (!local_ops.empty())
361  
        completed_ops_.splice(local_ops);
369  
        completed_ops_.splice(local_ops);
362  
}
370  
}
363  

371  

364  
} // namespace boost::corosio::detail
372  
} // namespace boost::corosio::detail
365  

373  

366  
#endif // BOOST_COROSIO_HAS_EPOLL
374  
#endif // BOOST_COROSIO_HAS_EPOLL
367  

375  

368  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
376  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP