TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
12 :
13 : #include <boost/corosio/detail/config.hpp>
14 : #include <boost/capy/ex/execution_context.hpp>
15 :
16 : #include <boost/corosio/native/native_scheduler.hpp>
17 : #include <boost/corosio/detail/scheduler_op.hpp>
18 : #include <boost/corosio/detail/thread_local_ptr.hpp>
19 :
20 : #include <atomic>
21 : #include <chrono>
22 : #include <condition_variable>
23 : #include <coroutine>
24 : #include <cstddef>
25 : #include <cstdint>
26 : #include <limits>
27 : #include <memory>
28 : #include <mutex>
29 :
30 : namespace boost::corosio::detail {
31 :
32 : // Forward declaration
33 : class reactor_scheduler_base;
34 :
35 : /** Per-thread state for a reactor scheduler.
36 :
37 : Each thread running a scheduler's event loop has one of these
38 : on a thread-local stack. It holds a private work queue and
39 : inline completion budget for speculative I/O fast paths.
40 : */
41 : struct BOOST_COROSIO_SYMBOL_VISIBLE reactor_scheduler_context
42 : {
43 : /// Scheduler this context belongs to.
44 : reactor_scheduler_base const* key;
45 :
46 : /// Next context frame on this thread's stack.
47 : reactor_scheduler_context* next;
48 :
49 : /// Private work queue for reduced contention.
50 : op_queue private_queue;
51 :
52 : /// Unflushed work count for the private queue.
53 : std::int64_t private_outstanding_work;
54 :
55 : /// Remaining inline completions allowed this cycle.
56 : int inline_budget;
57 :
58 : /// Maximum inline budget (adaptive, 2-16).
59 : int inline_budget_max;
60 :
61 : /// True if no other thread absorbed queued work last cycle.
62 : bool unassisted;
63 :
64 : /// Construct a context frame linked to @a n.
65 HIT 457 : reactor_scheduler_context(
66 : reactor_scheduler_base const* k, reactor_scheduler_context* n)
67 457 : : key(k)
68 457 : , next(n)
69 457 : , private_outstanding_work(0)
70 457 : , inline_budget(0)
71 457 : , inline_budget_max(2)
72 457 : , unassisted(false)
73 : {
74 457 : }
75 : };
76 :
77 : /// Thread-local context stack for reactor schedulers.
78 : inline thread_local_ptr<reactor_scheduler_context> reactor_context_stack;
79 :
80 : /// Find the context frame for a scheduler on this thread.
81 : inline reactor_scheduler_context*
82 551356 : reactor_find_context(reactor_scheduler_base const* self) noexcept
83 : {
84 551356 : for (auto* c = reactor_context_stack.get(); c != nullptr; c = c->next)
85 : {
86 548384 : if (c->key == self)
87 548384 : return c;
88 : }
89 2972 : return nullptr;
90 : }
91 :
92 : /// Flush private work count to global counter.
93 : inline void
94 MIS 0 : reactor_flush_private_work(
95 : reactor_scheduler_context* ctx,
96 : std::atomic<std::int64_t>& outstanding_work) noexcept
97 : {
98 0 : if (ctx && ctx->private_outstanding_work > 0)
99 : {
100 0 : outstanding_work.fetch_add(
101 : ctx->private_outstanding_work, std::memory_order_relaxed);
102 0 : ctx->private_outstanding_work = 0;
103 : }
104 0 : }
105 :
106 : /** Drain private queue to global queue, flushing work count first.
107 :
108 : @return True if any ops were drained.
109 : */
110 : inline bool
111 0 : reactor_drain_private_queue(
112 : reactor_scheduler_context* ctx,
113 : std::atomic<std::int64_t>& outstanding_work,
114 : op_queue& completed_ops) noexcept
115 : {
116 0 : if (!ctx || ctx->private_queue.empty())
117 0 : return false;
118 :
119 0 : reactor_flush_private_work(ctx, outstanding_work);
120 0 : completed_ops.splice(ctx->private_queue);
121 0 : return true;
122 : }
123 :
124 : /** Non-template base for reactor-backed scheduler implementations.
125 :
126 : Provides the complete threading model shared by epoll, kqueue,
127 : and select schedulers: signal state machine, inline completion
128 : budget, work counting, run/poll methods, and the do_one event
129 : loop.
130 :
131 : Derived classes provide platform-specific hooks by overriding:
132 : - `run_task(lock, ctx)` to run the reactor poll
133 : - `interrupt_reactor()` to wake a blocked reactor
134 :
135 : De-templated from the original CRTP design to eliminate
136 : duplicate instantiations when multiple backends are compiled
137 : into the same binary. Virtual dispatch for run_task (called
138 : once per reactor cycle, before a blocking syscall) has
139 : negligible overhead.
140 :
141 : @par Thread Safety
142 : All public member functions are thread-safe.
143 : */
144 : class reactor_scheduler_base
145 : : public native_scheduler
146 : , public capy::execution_context::service
147 : {
148 : public:
149 : using key_type = scheduler;
150 : using context_type = reactor_scheduler_context;
151 :
152 : /// Post a coroutine for deferred execution.
153 : void post(std::coroutine_handle<> h) const override;
154 :
155 : /// Post a scheduler operation for deferred execution.
156 : void post(scheduler_op* h) const override;
157 :
158 : /// Return true if called from a thread running this scheduler.
159 : bool running_in_this_thread() const noexcept override;
160 :
161 : /// Request the scheduler to stop dispatching handlers.
162 : void stop() override;
163 :
164 : /// Return true if the scheduler has been stopped.
165 : bool stopped() const noexcept override;
166 :
167 : /// Reset the stopped state so `run()` can resume.
168 : void restart() override;
169 :
170 : /// Run the event loop until no work remains.
171 : std::size_t run() override;
172 :
173 : /// Run until one handler completes or no work remains.
174 : std::size_t run_one() override;
175 :
176 : /// Run until one handler completes or @a usec elapses.
177 : std::size_t wait_one(long usec) override;
178 :
179 : /// Run ready handlers without blocking.
180 : std::size_t poll() override;
181 :
182 : /// Run at most one ready handler without blocking.
183 : std::size_t poll_one() override;
184 :
185 : /// Increment the outstanding work count.
186 : void work_started() noexcept override;
187 :
188 : /// Decrement the outstanding work count, stopping on zero.
189 : void work_finished() noexcept override;
190 :
191 : /** Reset the thread's inline completion budget.
192 :
193 : Called at the start of each posted completion handler to
194 : grant a fresh budget for speculative inline completions.
195 : */
196 : void reset_inline_budget() const noexcept;
197 :
198 : /** Consume one unit of inline budget if available.
199 :
200 : @return True if budget was available and consumed.
201 : */
202 : bool try_consume_inline_budget() const noexcept;
203 :
204 : /** Offset a forthcoming work_finished from work_cleanup.
205 :
206 : Called by descriptor_state when all I/O returned EAGAIN and
207 : no handler will be executed. Must be called from a scheduler
208 : thread.
209 : */
210 : void compensating_work_started() const noexcept;
211 :
212 : /** Drain work from thread context's private queue to global queue.
213 :
214 : Flushes private work count to the global counter, then
215 : transfers the queue under mutex protection.
216 :
217 : @param queue The private queue to drain.
218 : @param count Private work count to flush before draining.
219 : */
220 : void drain_thread_queue(op_queue& queue, std::int64_t count) const;
221 :
222 : /** Post completed operations for deferred invocation.
223 :
224 : If called from a thread running this scheduler, operations
225 : go to the thread's private queue (fast path). Otherwise,
226 : operations are added to the global queue under mutex and a
227 : waiter is signaled.
228 :
229 : @par Preconditions
230 : work_started() must have been called for each operation.
231 :
232 : @param ops Queue of operations to post.
233 : */
234 : void post_deferred_completions(op_queue& ops) const;
235 :
236 : protected:
237 HIT 517 : reactor_scheduler_base() = default;
238 :
239 : /** Drain completed_ops during shutdown.
240 :
241 : Pops all operations from the global queue and destroys them,
242 : skipping the task sentinel. Signals all waiting threads.
243 : Derived classes call this from their shutdown() override
244 : before performing platform-specific cleanup.
245 : */
246 : void shutdown_drain();
247 :
248 : /// RAII guard that re-inserts the task sentinel after `run_task`.
249 : struct task_cleanup
250 : {
251 : reactor_scheduler_base const* sched;
252 : std::unique_lock<std::mutex>* lock;
253 : context_type* ctx;
254 : ~task_cleanup();
255 : };
256 :
257 : mutable std::mutex mutex_;
258 : mutable std::condition_variable cond_;
259 : mutable op_queue completed_ops_;
260 : mutable std::atomic<std::int64_t> outstanding_work_{0};
261 : bool stopped_ = false;
262 : mutable std::atomic<bool> task_running_{false};
263 : mutable bool task_interrupted_ = false;
264 :
265 : /// Bit 0 of `state_`: set when the condvar should be signaled.
266 : static constexpr std::size_t signaled_bit = 1;
267 :
268 : /// Increment per waiting thread in `state_`.
269 : static constexpr std::size_t waiter_increment = 2;
270 : mutable std::size_t state_ = 0;
271 :
272 : /// Sentinel op that triggers a reactor poll when dequeued.
273 : struct task_op final : scheduler_op
274 : {
275 MIS 0 : void operator()() override {}
276 0 : void destroy() override {}
277 : };
278 : task_op task_op_;
279 :
280 : /// Run the platform-specific reactor poll.
281 : virtual void
282 : run_task(std::unique_lock<std::mutex>& lock, context_type* ctx,
283 : long timeout_us) = 0;
284 :
285 : /// Wake a blocked reactor (e.g. write to eventfd or pipe).
286 : virtual void interrupt_reactor() const = 0;
287 :
288 : private:
289 : struct work_cleanup
290 : {
291 : reactor_scheduler_base* sched;
292 : std::unique_lock<std::mutex>* lock;
293 : context_type* ctx;
294 : ~work_cleanup();
295 : };
296 :
297 : std::size_t do_one(
298 : std::unique_lock<std::mutex>& lock, long timeout_us, context_type* ctx);
299 :
300 : void signal_all(std::unique_lock<std::mutex>& lock) const;
301 : bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
302 : bool unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
303 : void clear_signal() const;
304 : void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
305 : void wait_for_signal_for(
306 : std::unique_lock<std::mutex>& lock, long timeout_us) const;
307 : void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
308 : };
309 :
310 : /** RAII guard that pushes/pops a scheduler context frame.
311 :
312 : On construction, pushes a new context frame onto the
313 : thread-local stack. On destruction, drains any remaining
314 : private queue items to the global queue and pops the frame.
315 : */
316 : struct reactor_thread_context_guard
317 : {
318 : /// The context frame managed by this guard.
319 : reactor_scheduler_context frame_;
320 :
321 : /// Construct the guard, pushing a frame for @a sched.
322 HIT 457 : explicit reactor_thread_context_guard(
323 : reactor_scheduler_base const* sched) noexcept
324 457 : : frame_(sched, reactor_context_stack.get())
325 : {
326 457 : reactor_context_stack.set(&frame_);
327 457 : }
328 :
329 : /// Destroy the guard, draining private work and popping the frame.
330 457 : ~reactor_thread_context_guard() noexcept
331 : {
332 457 : if (!frame_.private_queue.empty())
333 MIS 0 : frame_.key->drain_thread_queue(
334 0 : frame_.private_queue, frame_.private_outstanding_work);
335 HIT 457 : reactor_context_stack.set(frame_.next);
336 457 : }
337 : };
338 :
339 : // ---- Inline implementations ------------------------------------------------
340 :
341 : inline void
342 70541 : reactor_scheduler_base::reset_inline_budget() const noexcept
343 : {
344 70541 : if (auto* ctx = reactor_find_context(this))
345 : {
346 : // Cap when no other thread absorbed queued work
347 70541 : if (ctx->unassisted)
348 : {
349 70541 : ctx->inline_budget_max = 4;
350 70541 : ctx->inline_budget = 4;
351 70541 : return;
352 : }
353 : // Ramp up when previous cycle fully consumed budget
354 MIS 0 : if (ctx->inline_budget == 0)
355 0 : ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16);
356 0 : else if (ctx->inline_budget < ctx->inline_budget_max)
357 0 : ctx->inline_budget_max = 2;
358 0 : ctx->inline_budget = ctx->inline_budget_max;
359 : }
360 : }
361 :
362 : inline bool
363 HIT 310255 : reactor_scheduler_base::try_consume_inline_budget() const noexcept
364 : {
365 310255 : if (auto* ctx = reactor_find_context(this))
366 : {
367 310255 : if (ctx->inline_budget > 0)
368 : {
369 248222 : --ctx->inline_budget;
370 248222 : return true;
371 : }
372 : }
373 62033 : return false;
374 : }
375 :
376 : inline void
377 2117 : reactor_scheduler_base::post(std::coroutine_handle<> h) const
378 : {
379 : struct post_handler final : scheduler_op
380 : {
381 : std::coroutine_handle<> h_;
382 :
383 2117 : explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
384 4234 : ~post_handler() override = default;
385 :
386 2108 : void operator()() override
387 : {
388 2108 : auto saved = h_;
389 2108 : delete this;
390 : // Ensure stores from the posting thread are visible
391 : std::atomic_thread_fence(std::memory_order_acquire);
392 2108 : saved.resume();
393 2108 : }
394 :
395 9 : void destroy() override
396 : {
397 9 : auto saved = h_;
398 9 : delete this;
399 9 : saved.destroy();
400 9 : }
401 : };
402 :
403 2117 : auto ph = std::make_unique<post_handler>(h);
404 :
405 2117 : if (auto* ctx = reactor_find_context(this))
406 : {
407 6 : ++ctx->private_outstanding_work;
408 6 : ctx->private_queue.push(ph.release());
409 6 : return;
410 : }
411 :
412 2111 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
413 :
414 2111 : std::unique_lock lock(mutex_);
415 2111 : completed_ops_.push(ph.release());
416 2111 : wake_one_thread_and_unlock(lock);
417 2117 : }
418 :
419 : inline void
420 71733 : reactor_scheduler_base::post(scheduler_op* h) const
421 : {
422 71733 : if (auto* ctx = reactor_find_context(this))
423 : {
424 71561 : ++ctx->private_outstanding_work;
425 71561 : ctx->private_queue.push(h);
426 71561 : return;
427 : }
428 :
429 172 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
430 :
431 172 : std::unique_lock lock(mutex_);
432 172 : completed_ops_.push(h);
433 172 : wake_one_thread_and_unlock(lock);
434 172 : }
435 :
436 : inline bool
437 1219 : reactor_scheduler_base::running_in_this_thread() const noexcept
438 : {
439 1219 : return reactor_find_context(this) != nullptr;
440 : }
441 :
442 : inline void
443 418 : reactor_scheduler_base::stop()
444 : {
445 418 : std::unique_lock lock(mutex_);
446 418 : if (!stopped_)
447 : {
448 376 : stopped_ = true;
449 376 : signal_all(lock);
450 376 : interrupt_reactor();
451 : }
452 418 : }
453 :
454 : inline bool
455 62 : reactor_scheduler_base::stopped() const noexcept
456 : {
457 62 : std::unique_lock lock(mutex_);
458 124 : return stopped_;
459 62 : }
460 :
461 : inline void
462 91 : reactor_scheduler_base::restart()
463 : {
464 91 : std::unique_lock lock(mutex_);
465 91 : stopped_ = false;
466 91 : }
467 :
468 : inline std::size_t
469 388 : reactor_scheduler_base::run()
470 : {
471 776 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
472 : {
473 32 : stop();
474 32 : return 0;
475 : }
476 :
477 356 : reactor_thread_context_guard ctx(this);
478 356 : std::unique_lock lock(mutex_);
479 :
480 356 : std::size_t n = 0;
481 : for (;;)
482 : {
483 177847 : if (!do_one(lock, -1, &ctx.frame_))
484 356 : break;
485 177491 : if (n != (std::numeric_limits<std::size_t>::max)())
486 177491 : ++n;
487 177491 : if (!lock.owns_lock())
488 110415 : lock.lock();
489 : }
490 356 : return n;
491 356 : }
492 :
493 : inline std::size_t
494 2 : reactor_scheduler_base::run_one()
495 : {
496 4 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
497 : {
498 MIS 0 : stop();
499 0 : return 0;
500 : }
501 :
502 HIT 2 : reactor_thread_context_guard ctx(this);
503 2 : std::unique_lock lock(mutex_);
504 2 : return do_one(lock, -1, &ctx.frame_);
505 2 : }
506 :
507 : inline std::size_t
508 102 : reactor_scheduler_base::wait_one(long usec)
509 : {
510 204 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
511 : {
512 10 : stop();
513 10 : return 0;
514 : }
515 :
516 92 : reactor_thread_context_guard ctx(this);
517 92 : std::unique_lock lock(mutex_);
518 92 : return do_one(lock, usec, &ctx.frame_);
519 92 : }
520 :
521 : inline std::size_t
522 6 : reactor_scheduler_base::poll()
523 : {
524 12 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
525 : {
526 1 : stop();
527 1 : return 0;
528 : }
529 :
530 5 : reactor_thread_context_guard ctx(this);
531 5 : std::unique_lock lock(mutex_);
532 :
533 5 : std::size_t n = 0;
534 : for (;;)
535 : {
536 11 : if (!do_one(lock, 0, &ctx.frame_))
537 5 : break;
538 6 : if (n != (std::numeric_limits<std::size_t>::max)())
539 6 : ++n;
540 6 : if (!lock.owns_lock())
541 6 : lock.lock();
542 : }
543 5 : return n;
544 5 : }
545 :
546 : inline std::size_t
547 4 : reactor_scheduler_base::poll_one()
548 : {
549 8 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
550 : {
551 2 : stop();
552 2 : return 0;
553 : }
554 :
555 2 : reactor_thread_context_guard ctx(this);
556 2 : std::unique_lock lock(mutex_);
557 2 : return do_one(lock, 0, &ctx.frame_);
558 2 : }
559 :
560 : inline void
561 14410 : reactor_scheduler_base::work_started() noexcept
562 : {
563 14410 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
564 14410 : }
565 :
566 : inline void
567 21145 : reactor_scheduler_base::work_finished() noexcept
568 : {
569 42290 : if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
570 368 : stop();
571 21145 : }
572 :
573 : inline void
574 95491 : reactor_scheduler_base::compensating_work_started() const noexcept
575 : {
576 95491 : auto* ctx = reactor_find_context(this);
577 95491 : if (ctx)
578 95491 : ++ctx->private_outstanding_work;
579 95491 : }
580 :
581 : inline void
582 MIS 0 : reactor_scheduler_base::drain_thread_queue(
583 : op_queue& queue, std::int64_t count) const
584 : {
585 0 : if (count > 0)
586 0 : outstanding_work_.fetch_add(count, std::memory_order_relaxed);
587 :
588 0 : std::unique_lock lock(mutex_);
589 0 : completed_ops_.splice(queue);
590 0 : if (count > 0)
591 0 : maybe_unlock_and_signal_one(lock);
592 0 : }
593 :
594 : inline void
595 HIT 8228 : reactor_scheduler_base::post_deferred_completions(op_queue& ops) const
596 : {
597 8228 : if (ops.empty())
598 8228 : return;
599 :
600 MIS 0 : if (auto* ctx = reactor_find_context(this))
601 : {
602 0 : ctx->private_queue.splice(ops);
603 0 : return;
604 : }
605 :
606 0 : std::unique_lock lock(mutex_);
607 0 : completed_ops_.splice(ops);
608 0 : wake_one_thread_and_unlock(lock);
609 0 : }
610 :
611 : inline void
612 HIT 517 : reactor_scheduler_base::shutdown_drain()
613 : {
614 517 : std::unique_lock lock(mutex_);
615 :
616 1120 : while (auto* h = completed_ops_.pop())
617 : {
618 603 : if (h == &task_op_)
619 517 : continue;
620 86 : lock.unlock();
621 86 : h->destroy();
622 86 : lock.lock();
623 603 : }
624 :
625 517 : signal_all(lock);
626 517 : }
627 :
628 : inline void
629 893 : reactor_scheduler_base::signal_all(std::unique_lock<std::mutex>&) const
630 : {
631 893 : state_ |= signaled_bit;
632 893 : cond_.notify_all();
633 893 : }
634 :
635 : inline bool
636 2283 : reactor_scheduler_base::maybe_unlock_and_signal_one(
637 : std::unique_lock<std::mutex>& lock) const
638 : {
639 2283 : state_ |= signaled_bit;
640 2283 : if (state_ > signaled_bit)
641 : {
642 MIS 0 : lock.unlock();
643 0 : cond_.notify_one();
644 0 : return true;
645 : }
646 HIT 2283 : return false;
647 : }
648 :
649 : inline bool
650 203452 : reactor_scheduler_base::unlock_and_signal_one(
651 : std::unique_lock<std::mutex>& lock) const
652 : {
653 203452 : state_ |= signaled_bit;
654 203452 : bool have_waiters = state_ > signaled_bit;
655 203452 : lock.unlock();
656 203452 : if (have_waiters)
657 MIS 0 : cond_.notify_one();
658 HIT 203452 : return have_waiters;
659 : }
660 :
661 : inline void
662 MIS 0 : reactor_scheduler_base::clear_signal() const
663 : {
664 0 : state_ &= ~signaled_bit;
665 0 : }
666 :
667 : inline void
668 0 : reactor_scheduler_base::wait_for_signal(
669 : std::unique_lock<std::mutex>& lock) const
670 : {
671 0 : while ((state_ & signaled_bit) == 0)
672 : {
673 0 : state_ += waiter_increment;
674 0 : cond_.wait(lock);
675 0 : state_ -= waiter_increment;
676 : }
677 0 : }
678 :
679 : inline void
680 0 : reactor_scheduler_base::wait_for_signal_for(
681 : std::unique_lock<std::mutex>& lock, long timeout_us) const
682 : {
683 0 : if ((state_ & signaled_bit) == 0)
684 : {
685 0 : state_ += waiter_increment;
686 0 : cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
687 0 : state_ -= waiter_increment;
688 : }
689 0 : }
690 :
691 : inline void
692 HIT 2283 : reactor_scheduler_base::wake_one_thread_and_unlock(
693 : std::unique_lock<std::mutex>& lock) const
694 : {
695 2283 : if (maybe_unlock_and_signal_one(lock))
696 MIS 0 : return;
697 :
698 HIT 2283 : if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
699 : {
700 55 : task_interrupted_ = true;
701 55 : lock.unlock();
702 55 : interrupt_reactor();
703 : }
704 : else
705 : {
706 2228 : lock.unlock();
707 : }
708 : }
709 :
710 177552 : inline reactor_scheduler_base::work_cleanup::~work_cleanup()
711 : {
712 177552 : if (ctx)
713 : {
714 177552 : std::int64_t produced = ctx->private_outstanding_work;
715 177552 : if (produced > 1)
716 15 : sched->outstanding_work_.fetch_add(
717 : produced - 1, std::memory_order_relaxed);
718 177537 : else if (produced < 1)
719 14963 : sched->work_finished();
720 177552 : ctx->private_outstanding_work = 0;
721 :
722 177552 : if (!ctx->private_queue.empty())
723 : {
724 67098 : lock->lock();
725 67098 : sched->completed_ops_.splice(ctx->private_queue);
726 : }
727 : }
728 : else
729 : {
730 MIS 0 : sched->work_finished();
731 : }
732 HIT 177552 : }
733 :
734 221208 : inline reactor_scheduler_base::task_cleanup::~task_cleanup()
735 : {
736 110604 : if (!ctx)
737 MIS 0 : return;
738 :
739 HIT 110604 : if (ctx->private_outstanding_work > 0)
740 : {
741 4440 : sched->outstanding_work_.fetch_add(
742 4440 : ctx->private_outstanding_work, std::memory_order_relaxed);
743 4440 : ctx->private_outstanding_work = 0;
744 : }
745 :
746 110604 : if (!ctx->private_queue.empty())
747 : {
748 4440 : if (!lock->owns_lock())
749 MIS 0 : lock->lock();
750 HIT 4440 : sched->completed_ops_.splice(ctx->private_queue);
751 : }
752 110604 : }
753 :
754 : inline std::size_t
755 177954 : reactor_scheduler_base::do_one(
756 : std::unique_lock<std::mutex>& lock, long timeout_us, context_type* ctx)
757 : {
758 : for (;;)
759 : {
760 288517 : if (stopped_)
761 356 : return 0;
762 :
763 288161 : scheduler_op* op = completed_ops_.pop();
764 :
765 : // Handle reactor sentinel — time to poll for I/O
766 288161 : if (op == &task_op_)
767 : {
768 : bool more_handlers =
769 110609 : !completed_ops_.empty() || (ctx && !ctx->private_queue.empty());
770 :
771 195318 : if (!more_handlers &&
772 169418 : (outstanding_work_.load(std::memory_order_acquire) == 0 ||
773 : timeout_us == 0))
774 : {
775 5 : completed_ops_.push(&task_op_);
776 5 : return 0;
777 : }
778 :
779 110604 : long task_timeout_us = more_handlers ? 0 : timeout_us;
780 110604 : task_interrupted_ = task_timeout_us == 0;
781 110604 : task_running_.store(true, std::memory_order_release);
782 :
783 110604 : if (more_handlers)
784 25900 : unlock_and_signal_one(lock);
785 :
786 : try
787 : {
788 110604 : run_task(lock, ctx, task_timeout_us);
789 : }
790 MIS 0 : catch (...)
791 : {
792 0 : task_running_.store(false, std::memory_order_relaxed);
793 0 : throw;
794 0 : }
795 :
796 HIT 110604 : task_running_.store(false, std::memory_order_relaxed);
797 110604 : completed_ops_.push(&task_op_);
798 110604 : if (timeout_us > 0)
799 41 : return 0;
800 110563 : continue;
801 110563 : }
802 :
803 : // Handle operation
804 177552 : if (op != nullptr)
805 : {
806 177552 : bool more = !completed_ops_.empty();
807 :
808 177552 : if (more)
809 177552 : ctx->unassisted = !unlock_and_signal_one(lock);
810 : else
811 : {
812 MIS 0 : ctx->unassisted = false;
813 0 : lock.unlock();
814 : }
815 :
816 HIT 177552 : work_cleanup on_exit{this, &lock, ctx};
817 : (void)on_exit;
818 :
819 177552 : (*op)();
820 177552 : return 1;
821 177552 : }
822 :
823 : // Try private queue before blocking
824 MIS 0 : if (reactor_drain_private_queue(ctx, outstanding_work_, completed_ops_))
825 0 : continue;
826 :
827 0 : if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
828 : timeout_us == 0)
829 0 : return 0;
830 :
831 0 : clear_signal();
832 0 : if (timeout_us < 0)
833 0 : wait_for_signal(lock);
834 : else
835 0 : wait_for_signal_for(lock, timeout_us);
836 HIT 110563 : }
837 : }
838 :
839 : } // namespace boost::corosio::detail
840 :
841 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
|