include/boost/corosio/native/detail/reactor/reactor_scheduler.hpp

76.1% Lines (236/310) 81.0% List of functions (34/42)
reactor_scheduler.hpp
f(x) Functions (42)
Function Calls Lines Blocks
boost::corosio::detail::reactor_scheduler_context::reactor_scheduler_context(boost::corosio::detail::reactor_scheduler_base const*, boost::corosio::detail::reactor_scheduler_context*) :65 457x 100.0% 100.0% boost::corosio::detail::reactor_find_context(boost::corosio::detail::reactor_scheduler_base const*) :82 551356x 100.0% 86.0% boost::corosio::detail::reactor_flush_private_work(boost::corosio::detail::reactor_scheduler_context*, std::atomic<long>&) :94 0 0.0% 0.0% boost::corosio::detail::reactor_drain_private_queue(boost::corosio::detail::reactor_scheduler_context*, std::atomic<long>&, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :111 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler_base::reactor_scheduler_base() :237 517x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::task_op::operator()() :275 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler_base::task_op::destroy() :276 0 0.0% 0.0% boost::corosio::detail::reactor_thread_context_guard::reactor_thread_context_guard(boost::corosio::detail::reactor_scheduler_base const*) :322 457x 100.0% 100.0% boost::corosio::detail::reactor_thread_context_guard::~reactor_thread_context_guard() :330 457x 66.7% 80.0% boost::corosio::detail::reactor_scheduler_base::reset_inline_budget() const :342 70541x 54.5% 45.0% boost::corosio::detail::reactor_scheduler_base::try_consume_inline_budget() const :363 310255x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::post(std::__n4861::coroutine_handle<void>) const :377 2117x 100.0% 84.0% boost::corosio::detail::reactor_scheduler_base::post(std::__n4861::coroutine_handle<void>) const::post_handler::post_handler(std::__n4861::coroutine_handle<void>) :383 2117x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::post(std::__n4861::coroutine_handle<void>) const::post_handler::~post_handler() :384 4234x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::post(std::__n4861::coroutine_handle<void>) const::post_handler::operator()() :386 2108x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::post(std::__n4861::coroutine_handle<void>) const::post_handler::destroy() :395 9x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::post(boost::corosio::detail::scheduler_op*) const :420 71733x 100.0% 87.0% boost::corosio::detail::reactor_scheduler_base::running_in_this_thread() const :437 1219x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::stop() :443 418x 100.0% 78.0% boost::corosio::detail::reactor_scheduler_base::stopped() const :455 62x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::restart() :462 91x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::run() :469 388x 100.0% 76.0% boost::corosio::detail::reactor_scheduler_base::run_one() :494 2x 75.0% 64.0% boost::corosio::detail::reactor_scheduler_base::wait_one(long) :508 102x 100.0% 70.0% boost::corosio::detail::reactor_scheduler_base::poll() :522 6x 100.0% 76.0% boost::corosio::detail::reactor_scheduler_base::poll_one() :547 4x 100.0% 70.0% boost::corosio::detail::reactor_scheduler_base::work_started() :561 14410x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::work_finished() :567 21145x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::compensating_work_started() const :574 95491x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::drain_thread_queue(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&, long) const :582 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler_base::post_deferred_completions(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) const :595 8228x 30.0% 35.0% boost::corosio::detail::reactor_scheduler_base::shutdown_drain() :612 517x 100.0% 88.0% boost::corosio::detail::reactor_scheduler_base::signal_all(std::unique_lock<std::mutex>&) const :629 893x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::maybe_unlock_and_signal_one(std::unique_lock<std::mutex>&) const :636 2283x 57.1% 50.0% boost::corosio::detail::reactor_scheduler_base::unlock_and_signal_one(std::unique_lock<std::mutex>&) const :650 203452x 85.7% 80.0% boost::corosio::detail::reactor_scheduler_base::clear_signal() const :662 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler_base::wait_for_signal(std::unique_lock<std::mutex>&) const :668 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler_base::wait_for_signal_for(std::unique_lock<std::mutex>&, long) const :680 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler_base::wake_one_thread_and_unlock(std::unique_lock<std::mutex>&) const :692 2283x 87.5% 92.0% boost::corosio::detail::reactor_scheduler_base::work_cleanup::~work_cleanup() :710 177552x 92.3% 92.0% boost::corosio::detail::reactor_scheduler_base::task_cleanup::~task_cleanup() :734 110604x 83.3% 86.0% boost::corosio::detail::reactor_scheduler_base::do_one(std::unique_lock<std::mutex>&, long, boost::corosio::detail::reactor_scheduler_context*) :755 177954x 68.9% 55.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/capy/ex/execution_context.hpp>
15
16 #include <boost/corosio/native/native_scheduler.hpp>
17 #include <boost/corosio/detail/scheduler_op.hpp>
18 #include <boost/corosio/detail/thread_local_ptr.hpp>
19
20 #include <atomic>
21 #include <chrono>
22 #include <condition_variable>
23 #include <coroutine>
24 #include <cstddef>
25 #include <cstdint>
26 #include <limits>
27 #include <memory>
28 #include <mutex>
29
30 namespace boost::corosio::detail {
31
32 // Forward declaration
33 class reactor_scheduler_base;
34
35 /** Per-thread state for a reactor scheduler.
36
37 Each thread running a scheduler's event loop has one of these
38 on a thread-local stack. It holds a private work queue and
39 inline completion budget for speculative I/O fast paths.
40 */
41 struct BOOST_COROSIO_SYMBOL_VISIBLE reactor_scheduler_context
42 {
43 /// Scheduler this context belongs to.
44 reactor_scheduler_base const* key;
45
46 /// Next context frame on this thread's stack.
47 reactor_scheduler_context* next;
48
49 /// Private work queue for reduced contention.
50 op_queue private_queue;
51
52 /// Unflushed work count for the private queue.
53 std::int64_t private_outstanding_work;
54
55 /// Remaining inline completions allowed this cycle.
56 int inline_budget;
57
58 /// Maximum inline budget (adaptive, 2-16).
59 int inline_budget_max;
60
61 /// True if no other thread absorbed queued work last cycle.
62 bool unassisted;
63
64 /// Construct a context frame linked to @a n.
65 457x reactor_scheduler_context(
66 reactor_scheduler_base const* k, reactor_scheduler_context* n)
67 457x : key(k)
68 457x , next(n)
69 457x , private_outstanding_work(0)
70 457x , inline_budget(0)
71 457x , inline_budget_max(2)
72 457x , unassisted(false)
73 {
74 457x }
75 };
76
77 /// Thread-local context stack for reactor schedulers.
78 inline thread_local_ptr<reactor_scheduler_context> reactor_context_stack;
79
80 /// Find the context frame for a scheduler on this thread.
81 inline reactor_scheduler_context*
82 551356x reactor_find_context(reactor_scheduler_base const* self) noexcept
83 {
84 551356x for (auto* c = reactor_context_stack.get(); c != nullptr; c = c->next)
85 {
86 548384x if (c->key == self)
87 548384x return c;
88 }
89 2972x return nullptr;
90 }
91
92 /// Flush private work count to global counter.
93 inline void
94 reactor_flush_private_work(
95 reactor_scheduler_context* ctx,
96 std::atomic<std::int64_t>& outstanding_work) noexcept
97 {
98 if (ctx && ctx->private_outstanding_work > 0)
99 {
100 outstanding_work.fetch_add(
101 ctx->private_outstanding_work, std::memory_order_relaxed);
102 ctx->private_outstanding_work = 0;
103 }
104 }
105
106 /** Drain private queue to global queue, flushing work count first.
107
108 @return True if any ops were drained.
109 */
110 inline bool
111 reactor_drain_private_queue(
112 reactor_scheduler_context* ctx,
113 std::atomic<std::int64_t>& outstanding_work,
114 op_queue& completed_ops) noexcept
115 {
116 if (!ctx || ctx->private_queue.empty())
117 return false;
118
119 reactor_flush_private_work(ctx, outstanding_work);
120 completed_ops.splice(ctx->private_queue);
121 return true;
122 }
123
124 /** Non-template base for reactor-backed scheduler implementations.
125
126 Provides the complete threading model shared by epoll, kqueue,
127 and select schedulers: signal state machine, inline completion
128 budget, work counting, run/poll methods, and the do_one event
129 loop.
130
131 Derived classes provide platform-specific hooks by overriding:
132 - `run_task(lock, ctx)` to run the reactor poll
133 - `interrupt_reactor()` to wake a blocked reactor
134
135 De-templated from the original CRTP design to eliminate
136 duplicate instantiations when multiple backends are compiled
137 into the same binary. Virtual dispatch for run_task (called
138 once per reactor cycle, before a blocking syscall) has
139 negligible overhead.
140
141 @par Thread Safety
142 All public member functions are thread-safe.
143 */
144 class reactor_scheduler_base
145 : public native_scheduler
146 , public capy::execution_context::service
147 {
148 public:
149 using key_type = scheduler;
150 using context_type = reactor_scheduler_context;
151
152 /// Post a coroutine for deferred execution.
153 void post(std::coroutine_handle<> h) const override;
154
155 /// Post a scheduler operation for deferred execution.
156 void post(scheduler_op* h) const override;
157
158 /// Return true if called from a thread running this scheduler.
159 bool running_in_this_thread() const noexcept override;
160
161 /// Request the scheduler to stop dispatching handlers.
162 void stop() override;
163
164 /// Return true if the scheduler has been stopped.
165 bool stopped() const noexcept override;
166
167 /// Reset the stopped state so `run()` can resume.
168 void restart() override;
169
170 /// Run the event loop until no work remains.
171 std::size_t run() override;
172
173 /// Run until one handler completes or no work remains.
174 std::size_t run_one() override;
175
176 /// Run until one handler completes or @a usec elapses.
177 std::size_t wait_one(long usec) override;
178
179 /// Run ready handlers without blocking.
180 std::size_t poll() override;
181
182 /// Run at most one ready handler without blocking.
183 std::size_t poll_one() override;
184
185 /// Increment the outstanding work count.
186 void work_started() noexcept override;
187
188 /// Decrement the outstanding work count, stopping on zero.
189 void work_finished() noexcept override;
190
191 /** Reset the thread's inline completion budget.
192
193 Called at the start of each posted completion handler to
194 grant a fresh budget for speculative inline completions.
195 */
196 void reset_inline_budget() const noexcept;
197
198 /** Consume one unit of inline budget if available.
199
200 @return True if budget was available and consumed.
201 */
202 bool try_consume_inline_budget() const noexcept;
203
204 /** Offset a forthcoming work_finished from work_cleanup.
205
206 Called by descriptor_state when all I/O returned EAGAIN and
207 no handler will be executed. Must be called from a scheduler
208 thread.
209 */
210 void compensating_work_started() const noexcept;
211
212 /** Drain work from thread context's private queue to global queue.
213
214 Flushes private work count to the global counter, then
215 transfers the queue under mutex protection.
216
217 @param queue The private queue to drain.
218 @param count Private work count to flush before draining.
219 */
220 void drain_thread_queue(op_queue& queue, std::int64_t count) const;
221
222 /** Post completed operations for deferred invocation.
223
224 If called from a thread running this scheduler, operations
225 go to the thread's private queue (fast path). Otherwise,
226 operations are added to the global queue under mutex and a
227 waiter is signaled.
228
229 @par Preconditions
230 work_started() must have been called for each operation.
231
232 @param ops Queue of operations to post.
233 */
234 void post_deferred_completions(op_queue& ops) const;
235
236 protected:
237 517x reactor_scheduler_base() = default;
238
239 /** Drain completed_ops during shutdown.
240
241 Pops all operations from the global queue and destroys them,
242 skipping the task sentinel. Signals all waiting threads.
243 Derived classes call this from their shutdown() override
244 before performing platform-specific cleanup.
245 */
246 void shutdown_drain();
247
248 /// RAII guard that re-inserts the task sentinel after `run_task`.
249 struct task_cleanup
250 {
251 reactor_scheduler_base const* sched;
252 std::unique_lock<std::mutex>* lock;
253 context_type* ctx;
254 ~task_cleanup();
255 };
256
257 mutable std::mutex mutex_;
258 mutable std::condition_variable cond_;
259 mutable op_queue completed_ops_;
260 mutable std::atomic<std::int64_t> outstanding_work_{0};
261 bool stopped_ = false;
262 mutable std::atomic<bool> task_running_{false};
263 mutable bool task_interrupted_ = false;
264
265 /// Bit 0 of `state_`: set when the condvar should be signaled.
266 static constexpr std::size_t signaled_bit = 1;
267
268 /// Increment per waiting thread in `state_`.
269 static constexpr std::size_t waiter_increment = 2;
270 mutable std::size_t state_ = 0;
271
272 /// Sentinel op that triggers a reactor poll when dequeued.
273 struct task_op final : scheduler_op
274 {
275 void operator()() override {}
276 void destroy() override {}
277 };
278 task_op task_op_;
279
280 /// Run the platform-specific reactor poll.
281 virtual void
282 run_task(std::unique_lock<std::mutex>& lock, context_type* ctx,
283 long timeout_us) = 0;
284
285 /// Wake a blocked reactor (e.g. write to eventfd or pipe).
286 virtual void interrupt_reactor() const = 0;
287
288 private:
289 struct work_cleanup
290 {
291 reactor_scheduler_base* sched;
292 std::unique_lock<std::mutex>* lock;
293 context_type* ctx;
294 ~work_cleanup();
295 };
296
297 std::size_t do_one(
298 std::unique_lock<std::mutex>& lock, long timeout_us, context_type* ctx);
299
300 void signal_all(std::unique_lock<std::mutex>& lock) const;
301 bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
302 bool unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
303 void clear_signal() const;
304 void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
305 void wait_for_signal_for(
306 std::unique_lock<std::mutex>& lock, long timeout_us) const;
307 void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
308 };
309
310 /** RAII guard that pushes/pops a scheduler context frame.
311
312 On construction, pushes a new context frame onto the
313 thread-local stack. On destruction, drains any remaining
314 private queue items to the global queue and pops the frame.
315 */
316 struct reactor_thread_context_guard
317 {
318 /// The context frame managed by this guard.
319 reactor_scheduler_context frame_;
320
321 /// Construct the guard, pushing a frame for @a sched.
322 457x explicit reactor_thread_context_guard(
323 reactor_scheduler_base const* sched) noexcept
324 457x : frame_(sched, reactor_context_stack.get())
325 {
326 457x reactor_context_stack.set(&frame_);
327 457x }
328
329 /// Destroy the guard, draining private work and popping the frame.
330 457x ~reactor_thread_context_guard() noexcept
331 {
332 457x if (!frame_.private_queue.empty())
333 frame_.key->drain_thread_queue(
334 frame_.private_queue, frame_.private_outstanding_work);
335 457x reactor_context_stack.set(frame_.next);
336 457x }
337 };
338
339 // ---- Inline implementations ------------------------------------------------
340
341 inline void
342 70541x reactor_scheduler_base::reset_inline_budget() const noexcept
343 {
344 70541x if (auto* ctx = reactor_find_context(this))
345 {
346 // Cap when no other thread absorbed queued work
347 70541x if (ctx->unassisted)
348 {
349 70541x ctx->inline_budget_max = 4;
350 70541x ctx->inline_budget = 4;
351 70541x return;
352 }
353 // Ramp up when previous cycle fully consumed budget
354 if (ctx->inline_budget == 0)
355 ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16);
356 else if (ctx->inline_budget < ctx->inline_budget_max)
357 ctx->inline_budget_max = 2;
358 ctx->inline_budget = ctx->inline_budget_max;
359 }
360 }
361
362 inline bool
363 310255x reactor_scheduler_base::try_consume_inline_budget() const noexcept
364 {
365 310255x if (auto* ctx = reactor_find_context(this))
366 {
367 310255x if (ctx->inline_budget > 0)
368 {
369 248222x --ctx->inline_budget;
370 248222x return true;
371 }
372 }
373 62033x return false;
374 }
375
376 inline void
377 2117x reactor_scheduler_base::post(std::coroutine_handle<> h) const
378 {
379 struct post_handler final : scheduler_op
380 {
381 std::coroutine_handle<> h_;
382
383 2117x explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
384 4234x ~post_handler() override = default;
385
386 2108x void operator()() override
387 {
388 2108x auto saved = h_;
389 2108x delete this;
390 // Ensure stores from the posting thread are visible
391 std::atomic_thread_fence(std::memory_order_acquire);
392 2108x saved.resume();
393 2108x }
394
395 9x void destroy() override
396 {
397 9x auto saved = h_;
398 9x delete this;
399 9x saved.destroy();
400 9x }
401 };
402
403 2117x auto ph = std::make_unique<post_handler>(h);
404
405 2117x if (auto* ctx = reactor_find_context(this))
406 {
407 6x ++ctx->private_outstanding_work;
408 6x ctx->private_queue.push(ph.release());
409 6x return;
410 }
411
412 2111x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
413
414 2111x std::unique_lock lock(mutex_);
415 2111x completed_ops_.push(ph.release());
416 2111x wake_one_thread_and_unlock(lock);
417 2117x }
418
419 inline void
420 71733x reactor_scheduler_base::post(scheduler_op* h) const
421 {
422 71733x if (auto* ctx = reactor_find_context(this))
423 {
424 71561x ++ctx->private_outstanding_work;
425 71561x ctx->private_queue.push(h);
426 71561x return;
427 }
428
429 172x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
430
431 172x std::unique_lock lock(mutex_);
432 172x completed_ops_.push(h);
433 172x wake_one_thread_and_unlock(lock);
434 172x }
435
436 inline bool
437 1219x reactor_scheduler_base::running_in_this_thread() const noexcept
438 {
439 1219x return reactor_find_context(this) != nullptr;
440 }
441
442 inline void
443 418x reactor_scheduler_base::stop()
444 {
445 418x std::unique_lock lock(mutex_);
446 418x if (!stopped_)
447 {
448 376x stopped_ = true;
449 376x signal_all(lock);
450 376x interrupt_reactor();
451 }
452 418x }
453
454 inline bool
455 62x reactor_scheduler_base::stopped() const noexcept
456 {
457 62x std::unique_lock lock(mutex_);
458 124x return stopped_;
459 62x }
460
461 inline void
462 91x reactor_scheduler_base::restart()
463 {
464 91x std::unique_lock lock(mutex_);
465 91x stopped_ = false;
466 91x }
467
468 inline std::size_t
469 388x reactor_scheduler_base::run()
470 {
471 776x if (outstanding_work_.load(std::memory_order_acquire) == 0)
472 {
473 32x stop();
474 32x return 0;
475 }
476
477 356x reactor_thread_context_guard ctx(this);
478 356x std::unique_lock lock(mutex_);
479
480 356x std::size_t n = 0;
481 for (;;)
482 {
483 177847x if (!do_one(lock, -1, &ctx.frame_))
484 356x break;
485 177491x if (n != (std::numeric_limits<std::size_t>::max)())
486 177491x ++n;
487 177491x if (!lock.owns_lock())
488 110415x lock.lock();
489 }
490 356x return n;
491 356x }
492
493 inline std::size_t
494 2x reactor_scheduler_base::run_one()
495 {
496 4x if (outstanding_work_.load(std::memory_order_acquire) == 0)
497 {
498 stop();
499 return 0;
500 }
501
502 2x reactor_thread_context_guard ctx(this);
503 2x std::unique_lock lock(mutex_);
504 2x return do_one(lock, -1, &ctx.frame_);
505 2x }
506
507 inline std::size_t
508 102x reactor_scheduler_base::wait_one(long usec)
509 {
510 204x if (outstanding_work_.load(std::memory_order_acquire) == 0)
511 {
512 10x stop();
513 10x return 0;
514 }
515
516 92x reactor_thread_context_guard ctx(this);
517 92x std::unique_lock lock(mutex_);
518 92x return do_one(lock, usec, &ctx.frame_);
519 92x }
520
521 inline std::size_t
522 6x reactor_scheduler_base::poll()
523 {
524 12x if (outstanding_work_.load(std::memory_order_acquire) == 0)
525 {
526 1x stop();
527 1x return 0;
528 }
529
530 5x reactor_thread_context_guard ctx(this);
531 5x std::unique_lock lock(mutex_);
532
533 5x std::size_t n = 0;
534 for (;;)
535 {
536 11x if (!do_one(lock, 0, &ctx.frame_))
537 5x break;
538 6x if (n != (std::numeric_limits<std::size_t>::max)())
539 6x ++n;
540 6x if (!lock.owns_lock())
541 6x lock.lock();
542 }
543 5x return n;
544 5x }
545
546 inline std::size_t
547 4x reactor_scheduler_base::poll_one()
548 {
549 8x if (outstanding_work_.load(std::memory_order_acquire) == 0)
550 {
551 2x stop();
552 2x return 0;
553 }
554
555 2x reactor_thread_context_guard ctx(this);
556 2x std::unique_lock lock(mutex_);
557 2x return do_one(lock, 0, &ctx.frame_);
558 2x }
559
560 inline void
561 14410x reactor_scheduler_base::work_started() noexcept
562 {
563 14410x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
564 14410x }
565
566 inline void
567 21145x reactor_scheduler_base::work_finished() noexcept
568 {
569 42290x if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
570 368x stop();
571 21145x }
572
573 inline void
574 95491x reactor_scheduler_base::compensating_work_started() const noexcept
575 {
576 95491x auto* ctx = reactor_find_context(this);
577 95491x if (ctx)
578 95491x ++ctx->private_outstanding_work;
579 95491x }
580
581 inline void
582 reactor_scheduler_base::drain_thread_queue(
583 op_queue& queue, std::int64_t count) const
584 {
585 if (count > 0)
586 outstanding_work_.fetch_add(count, std::memory_order_relaxed);
587
588 std::unique_lock lock(mutex_);
589 completed_ops_.splice(queue);
590 if (count > 0)
591 maybe_unlock_and_signal_one(lock);
592 }
593
594 inline void
595 8228x reactor_scheduler_base::post_deferred_completions(op_queue& ops) const
596 {
597 8228x if (ops.empty())
598 8228x return;
599
600 if (auto* ctx = reactor_find_context(this))
601 {
602 ctx->private_queue.splice(ops);
603 return;
604 }
605
606 std::unique_lock lock(mutex_);
607 completed_ops_.splice(ops);
608 wake_one_thread_and_unlock(lock);
609 }
610
611 inline void
612 517x reactor_scheduler_base::shutdown_drain()
613 {
614 517x std::unique_lock lock(mutex_);
615
616 1120x while (auto* h = completed_ops_.pop())
617 {
618 603x if (h == &task_op_)
619 517x continue;
620 86x lock.unlock();
621 86x h->destroy();
622 86x lock.lock();
623 603x }
624
625 517x signal_all(lock);
626 517x }
627
628 inline void
629 893x reactor_scheduler_base::signal_all(std::unique_lock<std::mutex>&) const
630 {
631 893x state_ |= signaled_bit;
632 893x cond_.notify_all();
633 893x }
634
635 inline bool
636 2283x reactor_scheduler_base::maybe_unlock_and_signal_one(
637 std::unique_lock<std::mutex>& lock) const
638 {
639 2283x state_ |= signaled_bit;
640 2283x if (state_ > signaled_bit)
641 {
642 lock.unlock();
643 cond_.notify_one();
644 return true;
645 }
646 2283x return false;
647 }
648
649 inline bool
650 203452x reactor_scheduler_base::unlock_and_signal_one(
651 std::unique_lock<std::mutex>& lock) const
652 {
653 203452x state_ |= signaled_bit;
654 203452x bool have_waiters = state_ > signaled_bit;
655 203452x lock.unlock();
656 203452x if (have_waiters)
657 cond_.notify_one();
658 203452x return have_waiters;
659 }
660
661 inline void
662 reactor_scheduler_base::clear_signal() const
663 {
664 state_ &= ~signaled_bit;
665 }
666
667 inline void
668 reactor_scheduler_base::wait_for_signal(
669 std::unique_lock<std::mutex>& lock) const
670 {
671 while ((state_ & signaled_bit) == 0)
672 {
673 state_ += waiter_increment;
674 cond_.wait(lock);
675 state_ -= waiter_increment;
676 }
677 }
678
679 inline void
680 reactor_scheduler_base::wait_for_signal_for(
681 std::unique_lock<std::mutex>& lock, long timeout_us) const
682 {
683 if ((state_ & signaled_bit) == 0)
684 {
685 state_ += waiter_increment;
686 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
687 state_ -= waiter_increment;
688 }
689 }
690
691 inline void
692 2283x reactor_scheduler_base::wake_one_thread_and_unlock(
693 std::unique_lock<std::mutex>& lock) const
694 {
695 2283x if (maybe_unlock_and_signal_one(lock))
696 return;
697
698 2283x if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
699 {
700 55x task_interrupted_ = true;
701 55x lock.unlock();
702 55x interrupt_reactor();
703 }
704 else
705 {
706 2228x lock.unlock();
707 }
708 }
709
710 177552x inline reactor_scheduler_base::work_cleanup::~work_cleanup()
711 {
712 177552x if (ctx)
713 {
714 177552x std::int64_t produced = ctx->private_outstanding_work;
715 177552x if (produced > 1)
716 15x sched->outstanding_work_.fetch_add(
717 produced - 1, std::memory_order_relaxed);
718 177537x else if (produced < 1)
719 14963x sched->work_finished();
720 177552x ctx->private_outstanding_work = 0;
721
722 177552x if (!ctx->private_queue.empty())
723 {
724 67098x lock->lock();
725 67098x sched->completed_ops_.splice(ctx->private_queue);
726 }
727 }
728 else
729 {
730 sched->work_finished();
731 }
732 177552x }
733
734 221208x inline reactor_scheduler_base::task_cleanup::~task_cleanup()
735 {
736 110604x if (!ctx)
737 return;
738
739 110604x if (ctx->private_outstanding_work > 0)
740 {
741 4440x sched->outstanding_work_.fetch_add(
742 4440x ctx->private_outstanding_work, std::memory_order_relaxed);
743 4440x ctx->private_outstanding_work = 0;
744 }
745
746 110604x if (!ctx->private_queue.empty())
747 {
748 4440x if (!lock->owns_lock())
749 lock->lock();
750 4440x sched->completed_ops_.splice(ctx->private_queue);
751 }
752 110604x }
753
754 inline std::size_t
755 177954x reactor_scheduler_base::do_one(
756 std::unique_lock<std::mutex>& lock, long timeout_us, context_type* ctx)
757 {
758 for (;;)
759 {
760 288517x if (stopped_)
761 356x return 0;
762
763 288161x scheduler_op* op = completed_ops_.pop();
764
765 // Handle reactor sentinel — time to poll for I/O
766 288161x if (op == &task_op_)
767 {
768 bool more_handlers =
769 110609x !completed_ops_.empty() || (ctx && !ctx->private_queue.empty());
770
771 195318x if (!more_handlers &&
772 169418x (outstanding_work_.load(std::memory_order_acquire) == 0 ||
773 timeout_us == 0))
774 {
775 5x completed_ops_.push(&task_op_);
776 5x return 0;
777 }
778
779 110604x long task_timeout_us = more_handlers ? 0 : timeout_us;
780 110604x task_interrupted_ = task_timeout_us == 0;
781 110604x task_running_.store(true, std::memory_order_release);
782
783 110604x if (more_handlers)
784 25900x unlock_and_signal_one(lock);
785
786 try
787 {
788 110604x run_task(lock, ctx, task_timeout_us);
789 }
790 catch (...)
791 {
792 task_running_.store(false, std::memory_order_relaxed);
793 throw;
794 }
795
796 110604x task_running_.store(false, std::memory_order_relaxed);
797 110604x completed_ops_.push(&task_op_);
798 110604x if (timeout_us > 0)
799 41x return 0;
800 110563x continue;
801 110563x }
802
803 // Handle operation
804 177552x if (op != nullptr)
805 {
806 177552x bool more = !completed_ops_.empty();
807
808 177552x if (more)
809 177552x ctx->unassisted = !unlock_and_signal_one(lock);
810 else
811 {
812 ctx->unassisted = false;
813 lock.unlock();
814 }
815
816 177552x work_cleanup on_exit{this, &lock, ctx};
817 (void)on_exit;
818
819 177552x (*op)();
820 177552x return 1;
821 177552x }
822
823 // Try private queue before blocking
824 if (reactor_drain_private_queue(ctx, outstanding_work_, completed_ops_))
825 continue;
826
827 if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
828 timeout_us == 0)
829 return 0;
830
831 clear_signal();
832 if (timeout_us < 0)
833 wait_for_signal(lock);
834 else
835 wait_for_signal_for(lock, timeout_us);
836 110563x }
837 }
838
839 } // namespace boost::corosio::detail
840
841 #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
842