LCOV - code coverage report
Current view: top level - corosio/native/detail/reactor - reactor_scheduler.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 76.1 % 310 236 74
Test Date: 2026-03-30 19:32:49 Functions: 81.4 % 43 35 8

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/config.hpp>
      14                 : #include <boost/capy/ex/execution_context.hpp>
      15                 : 
      16                 : #include <boost/corosio/native/native_scheduler.hpp>
      17                 : #include <boost/corosio/detail/scheduler_op.hpp>
      18                 : #include <boost/corosio/detail/thread_local_ptr.hpp>
      19                 : 
      20                 : #include <atomic>
      21                 : #include <chrono>
      22                 : #include <condition_variable>
      23                 : #include <coroutine>
      24                 : #include <cstddef>
      25                 : #include <cstdint>
      26                 : #include <limits>
      27                 : #include <memory>
      28                 : #include <mutex>
      29                 : 
      30                 : namespace boost::corosio::detail {
      31                 : 
      32                 : // Forward declaration
      33                 : class reactor_scheduler_base;
      34                 : 
      35                 : /** Per-thread state for a reactor scheduler.
      36                 : 
      37                 :     Each thread running a scheduler's event loop has one of these
      38                 :     on a thread-local stack. It holds a private work queue and
      39                 :     inline completion budget for speculative I/O fast paths.
      40                 : */
      41                 : struct BOOST_COROSIO_SYMBOL_VISIBLE reactor_scheduler_context
      42                 : {
      43                 :     /// Scheduler this context belongs to.
      44                 :     reactor_scheduler_base const* key;
      45                 : 
      46                 :     /// Next context frame on this thread's stack.
      47                 :     reactor_scheduler_context* next;
      48                 : 
      49                 :     /// Private work queue for reduced contention.
      50                 :     op_queue private_queue;
      51                 : 
      52                 :     /// Unflushed work count for the private queue.
      53                 :     std::int64_t private_outstanding_work;
      54                 : 
      55                 :     /// Remaining inline completions allowed this cycle.
      56                 :     int inline_budget;
      57                 : 
      58                 :     /// Maximum inline budget (adaptive, 2-16).
      59                 :     int inline_budget_max;
      60                 : 
      61                 :     /// True if no other thread absorbed queued work last cycle.
      62                 :     bool unassisted;
      63                 : 
      64                 :     /// Construct a context frame linked to @a n.
      65 HIT         457 :     reactor_scheduler_context(
      66                 :         reactor_scheduler_base const* k, reactor_scheduler_context* n)
      67             457 :         : key(k)
      68             457 :         , next(n)
      69             457 :         , private_outstanding_work(0)
      70             457 :         , inline_budget(0)
      71             457 :         , inline_budget_max(2)
      72             457 :         , unassisted(false)
      73                 :     {
      74             457 :     }
      75                 : };
      76                 : 
      77                 : /// Thread-local context stack for reactor schedulers.
      78                 : inline thread_local_ptr<reactor_scheduler_context> reactor_context_stack;
      79                 : 
      80                 : /// Find the context frame for a scheduler on this thread.
      81                 : inline reactor_scheduler_context*
      82          551356 : reactor_find_context(reactor_scheduler_base const* self) noexcept
      83                 : {
      84          551356 :     for (auto* c = reactor_context_stack.get(); c != nullptr; c = c->next)
      85                 :     {
      86          548384 :         if (c->key == self)
      87          548384 :             return c;
      88                 :     }
      89            2972 :     return nullptr;
      90                 : }
      91                 : 
      92                 : /// Flush private work count to global counter.
      93                 : inline void
      94 MIS           0 : reactor_flush_private_work(
      95                 :     reactor_scheduler_context* ctx,
      96                 :     std::atomic<std::int64_t>& outstanding_work) noexcept
      97                 : {
      98               0 :     if (ctx && ctx->private_outstanding_work > 0)
      99                 :     {
     100               0 :         outstanding_work.fetch_add(
     101                 :             ctx->private_outstanding_work, std::memory_order_relaxed);
     102               0 :         ctx->private_outstanding_work = 0;
     103                 :     }
     104               0 : }
     105                 : 
     106                 : /** Drain private queue to global queue, flushing work count first.
     107                 : 
     108                 :     @return True if any ops were drained.
     109                 : */
     110                 : inline bool
     111               0 : reactor_drain_private_queue(
     112                 :     reactor_scheduler_context* ctx,
     113                 :     std::atomic<std::int64_t>& outstanding_work,
     114                 :     op_queue& completed_ops) noexcept
     115                 : {
     116               0 :     if (!ctx || ctx->private_queue.empty())
     117               0 :         return false;
     118                 : 
     119               0 :     reactor_flush_private_work(ctx, outstanding_work);
     120               0 :     completed_ops.splice(ctx->private_queue);
     121               0 :     return true;
     122                 : }
     123                 : 
     124                 : /** Non-template base for reactor-backed scheduler implementations.
     125                 : 
     126                 :     Provides the complete threading model shared by epoll, kqueue,
     127                 :     and select schedulers: signal state machine, inline completion
     128                 :     budget, work counting, run/poll methods, and the do_one event
     129                 :     loop.
     130                 : 
     131                 :     Derived classes provide platform-specific hooks by overriding:
     132                 :     - `run_task(lock, ctx)` to run the reactor poll
     133                 :     - `interrupt_reactor()` to wake a blocked reactor
     134                 : 
     135                 :     De-templated from the original CRTP design to eliminate
     136                 :     duplicate instantiations when multiple backends are compiled
     137                 :     into the same binary. Virtual dispatch for run_task (called
     138                 :     once per reactor cycle, before a blocking syscall) has
     139                 :     negligible overhead.
     140                 : 
     141                 :     @par Thread Safety
     142                 :     All public member functions are thread-safe.
     143                 : */
     144                 : class reactor_scheduler_base
     145                 :     : public native_scheduler
     146                 :     , public capy::execution_context::service
     147                 : {
     148                 : public:
     149                 :     using key_type     = scheduler;
     150                 :     using context_type = reactor_scheduler_context;
     151                 : 
     152                 :     /// Post a coroutine for deferred execution.
     153                 :     void post(std::coroutine_handle<> h) const override;
     154                 : 
     155                 :     /// Post a scheduler operation for deferred execution.
     156                 :     void post(scheduler_op* h) const override;
     157                 : 
     158                 :     /// Return true if called from a thread running this scheduler.
     159                 :     bool running_in_this_thread() const noexcept override;
     160                 : 
     161                 :     /// Request the scheduler to stop dispatching handlers.
     162                 :     void stop() override;
     163                 : 
     164                 :     /// Return true if the scheduler has been stopped.
     165                 :     bool stopped() const noexcept override;
     166                 : 
     167                 :     /// Reset the stopped state so `run()` can resume.
     168                 :     void restart() override;
     169                 : 
     170                 :     /// Run the event loop until no work remains.
     171                 :     std::size_t run() override;
     172                 : 
     173                 :     /// Run until one handler completes or no work remains.
     174                 :     std::size_t run_one() override;
     175                 : 
     176                 :     /// Run until one handler completes or @a usec elapses.
     177                 :     std::size_t wait_one(long usec) override;
     178                 : 
     179                 :     /// Run ready handlers without blocking.
     180                 :     std::size_t poll() override;
     181                 : 
     182                 :     /// Run at most one ready handler without blocking.
     183                 :     std::size_t poll_one() override;
     184                 : 
     185                 :     /// Increment the outstanding work count.
     186                 :     void work_started() noexcept override;
     187                 : 
     188                 :     /// Decrement the outstanding work count, stopping on zero.
     189                 :     void work_finished() noexcept override;
     190                 : 
     191                 :     /** Reset the thread's inline completion budget.
     192                 : 
     193                 :         Called at the start of each posted completion handler to
     194                 :         grant a fresh budget for speculative inline completions.
     195                 :     */
     196                 :     void reset_inline_budget() const noexcept;
     197                 : 
     198                 :     /** Consume one unit of inline budget if available.
     199                 : 
     200                 :         @return True if budget was available and consumed.
     201                 :     */
     202                 :     bool try_consume_inline_budget() const noexcept;
     203                 : 
     204                 :     /** Offset a forthcoming work_finished from work_cleanup.
     205                 : 
     206                 :         Called by descriptor_state when all I/O returned EAGAIN and
     207                 :         no handler will be executed. Must be called from a scheduler
     208                 :         thread.
     209                 :     */
     210                 :     void compensating_work_started() const noexcept;
     211                 : 
     212                 :     /** Drain work from thread context's private queue to global queue.
     213                 : 
     214                 :         Flushes private work count to the global counter, then
     215                 :         transfers the queue under mutex protection.
     216                 : 
     217                 :         @param queue The private queue to drain.
     218                 :         @param count Private work count to flush before draining.
     219                 :     */
     220                 :     void drain_thread_queue(op_queue& queue, std::int64_t count) const;
     221                 : 
     222                 :     /** Post completed operations for deferred invocation.
     223                 : 
     224                 :         If called from a thread running this scheduler, operations
     225                 :         go to the thread's private queue (fast path). Otherwise,
     226                 :         operations are added to the global queue under mutex and a
     227                 :         waiter is signaled.
     228                 : 
     229                 :         @par Preconditions
     230                 :         work_started() must have been called for each operation.
     231                 : 
     232                 :         @param ops Queue of operations to post.
     233                 :     */
     234                 :     void post_deferred_completions(op_queue& ops) const;
     235                 : 
     236                 : protected:
     237 HIT         517 :     reactor_scheduler_base() = default;
     238                 : 
     239                 :     /** Drain completed_ops during shutdown.
     240                 : 
     241                 :         Pops all operations from the global queue and destroys them,
     242                 :         skipping the task sentinel. Signals all waiting threads.
     243                 :         Derived classes call this from their shutdown() override
     244                 :         before performing platform-specific cleanup.
     245                 :     */
     246                 :     void shutdown_drain();
     247                 : 
     248                 :     /// RAII guard that re-inserts the task sentinel after `run_task`.
     249                 :     struct task_cleanup
     250                 :     {
     251                 :         reactor_scheduler_base const* sched;
     252                 :         std::unique_lock<std::mutex>* lock;
     253                 :         context_type* ctx;
     254                 :         ~task_cleanup();
     255                 :     };
     256                 : 
     257                 :     mutable std::mutex mutex_;
     258                 :     mutable std::condition_variable cond_;
     259                 :     mutable op_queue completed_ops_;
     260                 :     mutable std::atomic<std::int64_t> outstanding_work_{0};
     261                 :     bool stopped_ = false;
     262                 :     mutable std::atomic<bool> task_running_{false};
     263                 :     mutable bool task_interrupted_ = false;
     264                 : 
     265                 :     /// Bit 0 of `state_`: set when the condvar should be signaled.
     266                 :     static constexpr std::size_t signaled_bit = 1;
     267                 : 
     268                 :     /// Increment per waiting thread in `state_`.
     269                 :     static constexpr std::size_t waiter_increment = 2;
     270                 :     mutable std::size_t state_                    = 0;
     271                 : 
     272                 :     /// Sentinel op that triggers a reactor poll when dequeued.
     273                 :     struct task_op final : scheduler_op
     274                 :     {
     275 MIS           0 :         void operator()() override {}
     276               0 :         void destroy() override {}
     277                 :     };
     278                 :     task_op task_op_;
     279                 : 
     280                 :     /// Run the platform-specific reactor poll.
     281                 :     virtual void
     282                 :     run_task(std::unique_lock<std::mutex>& lock, context_type* ctx,
     283                 :         long timeout_us) = 0;
     284                 : 
     285                 :     /// Wake a blocked reactor (e.g. write to eventfd or pipe).
     286                 :     virtual void interrupt_reactor() const = 0;
     287                 : 
     288                 : private:
     289                 :     struct work_cleanup
     290                 :     {
     291                 :         reactor_scheduler_base* sched;
     292                 :         std::unique_lock<std::mutex>* lock;
     293                 :         context_type* ctx;
     294                 :         ~work_cleanup();
     295                 :     };
     296                 : 
     297                 :     std::size_t do_one(
     298                 :         std::unique_lock<std::mutex>& lock, long timeout_us, context_type* ctx);
     299                 : 
     300                 :     void signal_all(std::unique_lock<std::mutex>& lock) const;
     301                 :     bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
     302                 :     bool unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
     303                 :     void clear_signal() const;
     304                 :     void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
     305                 :     void wait_for_signal_for(
     306                 :         std::unique_lock<std::mutex>& lock, long timeout_us) const;
     307                 :     void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
     308                 : };
     309                 : 
     310                 : /** RAII guard that pushes/pops a scheduler context frame.
     311                 : 
     312                 :     On construction, pushes a new context frame onto the
     313                 :     thread-local stack. On destruction, drains any remaining
     314                 :     private queue items to the global queue and pops the frame.
     315                 : */
     316                 : struct reactor_thread_context_guard
     317                 : {
     318                 :     /// The context frame managed by this guard.
     319                 :     reactor_scheduler_context frame_;
     320                 : 
     321                 :     /// Construct the guard, pushing a frame for @a sched.
     322 HIT         457 :     explicit reactor_thread_context_guard(
     323                 :         reactor_scheduler_base const* sched) noexcept
     324             457 :         : frame_(sched, reactor_context_stack.get())
     325                 :     {
     326             457 :         reactor_context_stack.set(&frame_);
     327             457 :     }
     328                 : 
     329                 :     /// Destroy the guard, draining private work and popping the frame.
     330             457 :     ~reactor_thread_context_guard() noexcept
     331                 :     {
     332             457 :         if (!frame_.private_queue.empty())
     333 MIS           0 :             frame_.key->drain_thread_queue(
     334               0 :                 frame_.private_queue, frame_.private_outstanding_work);
     335 HIT         457 :         reactor_context_stack.set(frame_.next);
     336             457 :     }
     337                 : };
     338                 : 
     339                 : // ---- Inline implementations ------------------------------------------------
     340                 : 
     341                 : inline void
     342           70541 : reactor_scheduler_base::reset_inline_budget() const noexcept
     343                 : {
     344           70541 :     if (auto* ctx = reactor_find_context(this))
     345                 :     {
     346                 :         // Cap when no other thread absorbed queued work
     347           70541 :         if (ctx->unassisted)
     348                 :         {
     349           70541 :             ctx->inline_budget_max = 4;
     350           70541 :             ctx->inline_budget     = 4;
     351           70541 :             return;
     352                 :         }
     353                 :         // Ramp up when previous cycle fully consumed budget
     354 MIS           0 :         if (ctx->inline_budget == 0)
     355               0 :             ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16);
     356               0 :         else if (ctx->inline_budget < ctx->inline_budget_max)
     357               0 :             ctx->inline_budget_max = 2;
     358               0 :         ctx->inline_budget = ctx->inline_budget_max;
     359                 :     }
     360                 : }
     361                 : 
     362                 : inline bool
     363 HIT      310255 : reactor_scheduler_base::try_consume_inline_budget() const noexcept
     364                 : {
     365          310255 :     if (auto* ctx = reactor_find_context(this))
     366                 :     {
     367          310255 :         if (ctx->inline_budget > 0)
     368                 :         {
     369          248222 :             --ctx->inline_budget;
     370          248222 :             return true;
     371                 :         }
     372                 :     }
     373           62033 :     return false;
     374                 : }
     375                 : 
     376                 : inline void
     377            2117 : reactor_scheduler_base::post(std::coroutine_handle<> h) const
     378                 : {
     379                 :     struct post_handler final : scheduler_op
     380                 :     {
     381                 :         std::coroutine_handle<> h_;
     382                 : 
     383            2117 :         explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
     384            4234 :         ~post_handler() override = default;
     385                 : 
     386            2108 :         void operator()() override
     387                 :         {
     388            2108 :             auto saved = h_;
     389            2108 :             delete this;
     390                 :             // Ensure stores from the posting thread are visible
     391                 :             std::atomic_thread_fence(std::memory_order_acquire);
     392            2108 :             saved.resume();
     393            2108 :         }
     394                 : 
     395               9 :         void destroy() override
     396                 :         {
     397               9 :             auto saved = h_;
     398               9 :             delete this;
     399               9 :             saved.destroy();
     400               9 :         }
     401                 :     };
     402                 : 
     403            2117 :     auto ph = std::make_unique<post_handler>(h);
     404                 : 
     405            2117 :     if (auto* ctx = reactor_find_context(this))
     406                 :     {
     407               6 :         ++ctx->private_outstanding_work;
     408               6 :         ctx->private_queue.push(ph.release());
     409               6 :         return;
     410                 :     }
     411                 : 
     412            2111 :     outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     413                 : 
     414            2111 :     std::unique_lock lock(mutex_);
     415            2111 :     completed_ops_.push(ph.release());
     416            2111 :     wake_one_thread_and_unlock(lock);
     417            2117 : }
     418                 : 
     419                 : inline void
     420           71733 : reactor_scheduler_base::post(scheduler_op* h) const
     421                 : {
     422           71733 :     if (auto* ctx = reactor_find_context(this))
     423                 :     {
     424           71561 :         ++ctx->private_outstanding_work;
     425           71561 :         ctx->private_queue.push(h);
     426           71561 :         return;
     427                 :     }
     428                 : 
     429             172 :     outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     430                 : 
     431             172 :     std::unique_lock lock(mutex_);
     432             172 :     completed_ops_.push(h);
     433             172 :     wake_one_thread_and_unlock(lock);
     434             172 : }
     435                 : 
     436                 : inline bool
     437            1219 : reactor_scheduler_base::running_in_this_thread() const noexcept
     438                 : {
     439            1219 :     return reactor_find_context(this) != nullptr;
     440                 : }
     441                 : 
     442                 : inline void
     443             418 : reactor_scheduler_base::stop()
     444                 : {
     445             418 :     std::unique_lock lock(mutex_);
     446             418 :     if (!stopped_)
     447                 :     {
     448             376 :         stopped_ = true;
     449             376 :         signal_all(lock);
     450             376 :         interrupt_reactor();
     451                 :     }
     452             418 : }
     453                 : 
     454                 : inline bool
     455              62 : reactor_scheduler_base::stopped() const noexcept
     456                 : {
     457              62 :     std::unique_lock lock(mutex_);
     458             124 :     return stopped_;
     459              62 : }
     460                 : 
     461                 : inline void
     462              91 : reactor_scheduler_base::restart()
     463                 : {
     464              91 :     std::unique_lock lock(mutex_);
     465              91 :     stopped_ = false;
     466              91 : }
     467                 : 
     468                 : inline std::size_t
     469             388 : reactor_scheduler_base::run()
     470                 : {
     471             776 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     472                 :     {
     473              32 :         stop();
     474              32 :         return 0;
     475                 :     }
     476                 : 
     477             356 :     reactor_thread_context_guard ctx(this);
     478             356 :     std::unique_lock lock(mutex_);
     479                 : 
     480             356 :     std::size_t n = 0;
     481                 :     for (;;)
     482                 :     {
     483          177847 :         if (!do_one(lock, -1, &ctx.frame_))
     484             356 :             break;
     485          177491 :         if (n != (std::numeric_limits<std::size_t>::max)())
     486          177491 :             ++n;
     487          177491 :         if (!lock.owns_lock())
     488          110415 :             lock.lock();
     489                 :     }
     490             356 :     return n;
     491             356 : }
     492                 : 
     493                 : inline std::size_t
     494               2 : reactor_scheduler_base::run_one()
     495                 : {
     496               4 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     497                 :     {
     498 MIS           0 :         stop();
     499               0 :         return 0;
     500                 :     }
     501                 : 
     502 HIT           2 :     reactor_thread_context_guard ctx(this);
     503               2 :     std::unique_lock lock(mutex_);
     504               2 :     return do_one(lock, -1, &ctx.frame_);
     505               2 : }
     506                 : 
     507                 : inline std::size_t
     508             102 : reactor_scheduler_base::wait_one(long usec)
     509                 : {
     510             204 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     511                 :     {
     512              10 :         stop();
     513              10 :         return 0;
     514                 :     }
     515                 : 
     516              92 :     reactor_thread_context_guard ctx(this);
     517              92 :     std::unique_lock lock(mutex_);
     518              92 :     return do_one(lock, usec, &ctx.frame_);
     519              92 : }
     520                 : 
     521                 : inline std::size_t
     522               6 : reactor_scheduler_base::poll()
     523                 : {
     524              12 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     525                 :     {
     526               1 :         stop();
     527               1 :         return 0;
     528                 :     }
     529                 : 
     530               5 :     reactor_thread_context_guard ctx(this);
     531               5 :     std::unique_lock lock(mutex_);
     532                 : 
     533               5 :     std::size_t n = 0;
     534                 :     for (;;)
     535                 :     {
     536              11 :         if (!do_one(lock, 0, &ctx.frame_))
     537               5 :             break;
     538               6 :         if (n != (std::numeric_limits<std::size_t>::max)())
     539               6 :             ++n;
     540               6 :         if (!lock.owns_lock())
     541               6 :             lock.lock();
     542                 :     }
     543               5 :     return n;
     544               5 : }
     545                 : 
     546                 : inline std::size_t
     547               4 : reactor_scheduler_base::poll_one()
     548                 : {
     549               8 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     550                 :     {
     551               2 :         stop();
     552               2 :         return 0;
     553                 :     }
     554                 : 
     555               2 :     reactor_thread_context_guard ctx(this);
     556               2 :     std::unique_lock lock(mutex_);
     557               2 :     return do_one(lock, 0, &ctx.frame_);
     558               2 : }
     559                 : 
     560                 : inline void
     561           14410 : reactor_scheduler_base::work_started() noexcept
     562                 : {
     563           14410 :     outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     564           14410 : }
     565                 : 
     566                 : inline void
     567           21145 : reactor_scheduler_base::work_finished() noexcept
     568                 : {
     569           42290 :     if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
     570             368 :         stop();
     571           21145 : }
     572                 : 
     573                 : inline void
     574           95491 : reactor_scheduler_base::compensating_work_started() const noexcept
     575                 : {
     576           95491 :     auto* ctx = reactor_find_context(this);
     577           95491 :     if (ctx)
     578           95491 :         ++ctx->private_outstanding_work;
     579           95491 : }
     580                 : 
     581                 : inline void
     582 MIS           0 : reactor_scheduler_base::drain_thread_queue(
     583                 :     op_queue& queue, std::int64_t count) const
     584                 : {
     585               0 :     if (count > 0)
     586               0 :         outstanding_work_.fetch_add(count, std::memory_order_relaxed);
     587                 : 
     588               0 :     std::unique_lock lock(mutex_);
     589               0 :     completed_ops_.splice(queue);
     590               0 :     if (count > 0)
     591               0 :         maybe_unlock_and_signal_one(lock);
     592               0 : }
     593                 : 
     594                 : inline void
     595 HIT        8228 : reactor_scheduler_base::post_deferred_completions(op_queue& ops) const
     596                 : {
     597            8228 :     if (ops.empty())
     598            8228 :         return;
     599                 : 
     600 MIS           0 :     if (auto* ctx = reactor_find_context(this))
     601                 :     {
     602               0 :         ctx->private_queue.splice(ops);
     603               0 :         return;
     604                 :     }
     605                 : 
     606               0 :     std::unique_lock lock(mutex_);
     607               0 :     completed_ops_.splice(ops);
     608               0 :     wake_one_thread_and_unlock(lock);
     609               0 : }
     610                 : 
     611                 : inline void
     612 HIT         517 : reactor_scheduler_base::shutdown_drain()
     613                 : {
     614             517 :     std::unique_lock lock(mutex_);
     615                 : 
     616            1120 :     while (auto* h = completed_ops_.pop())
     617                 :     {
     618             603 :         if (h == &task_op_)
     619             517 :             continue;
     620              86 :         lock.unlock();
     621              86 :         h->destroy();
     622              86 :         lock.lock();
     623             603 :     }
     624                 : 
     625             517 :     signal_all(lock);
     626             517 : }
     627                 : 
     628                 : inline void
     629             893 : reactor_scheduler_base::signal_all(std::unique_lock<std::mutex>&) const
     630                 : {
     631             893 :     state_ |= signaled_bit;
     632             893 :     cond_.notify_all();
     633             893 : }
     634                 : 
     635                 : inline bool
     636            2283 : reactor_scheduler_base::maybe_unlock_and_signal_one(
     637                 :     std::unique_lock<std::mutex>& lock) const
     638                 : {
     639            2283 :     state_ |= signaled_bit;
     640            2283 :     if (state_ > signaled_bit)
     641                 :     {
     642 MIS           0 :         lock.unlock();
     643               0 :         cond_.notify_one();
     644               0 :         return true;
     645                 :     }
     646 HIT        2283 :     return false;
     647                 : }
     648                 : 
     649                 : inline bool
     650          203452 : reactor_scheduler_base::unlock_and_signal_one(
     651                 :     std::unique_lock<std::mutex>& lock) const
     652                 : {
     653          203452 :     state_ |= signaled_bit;
     654          203452 :     bool have_waiters = state_ > signaled_bit;
     655          203452 :     lock.unlock();
     656          203452 :     if (have_waiters)
     657 MIS           0 :         cond_.notify_one();
     658 HIT      203452 :     return have_waiters;
     659                 : }
     660                 : 
     661                 : inline void
     662 MIS           0 : reactor_scheduler_base::clear_signal() const
     663                 : {
     664               0 :     state_ &= ~signaled_bit;
     665               0 : }
     666                 : 
     667                 : inline void
     668               0 : reactor_scheduler_base::wait_for_signal(
     669                 :     std::unique_lock<std::mutex>& lock) const
     670                 : {
     671               0 :     while ((state_ & signaled_bit) == 0)
     672                 :     {
     673               0 :         state_ += waiter_increment;
     674               0 :         cond_.wait(lock);
     675               0 :         state_ -= waiter_increment;
     676                 :     }
     677               0 : }
     678                 : 
     679                 : inline void
     680               0 : reactor_scheduler_base::wait_for_signal_for(
     681                 :     std::unique_lock<std::mutex>& lock, long timeout_us) const
     682                 : {
     683               0 :     if ((state_ & signaled_bit) == 0)
     684                 :     {
     685               0 :         state_ += waiter_increment;
     686               0 :         cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
     687               0 :         state_ -= waiter_increment;
     688                 :     }
     689               0 : }
     690                 : 
     691                 : inline void
     692 HIT        2283 : reactor_scheduler_base::wake_one_thread_and_unlock(
     693                 :     std::unique_lock<std::mutex>& lock) const
     694                 : {
     695            2283 :     if (maybe_unlock_and_signal_one(lock))
     696 MIS           0 :         return;
     697                 : 
     698 HIT        2283 :     if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
     699                 :     {
     700              55 :         task_interrupted_ = true;
     701              55 :         lock.unlock();
     702              55 :         interrupt_reactor();
     703                 :     }
     704                 :     else
     705                 :     {
     706            2228 :         lock.unlock();
     707                 :     }
     708                 : }
     709                 : 
     710          177552 : inline reactor_scheduler_base::work_cleanup::~work_cleanup()
     711                 : {
     712          177552 :     if (ctx)
     713                 :     {
     714          177552 :         std::int64_t produced = ctx->private_outstanding_work;
     715          177552 :         if (produced > 1)
     716              15 :             sched->outstanding_work_.fetch_add(
     717                 :                 produced - 1, std::memory_order_relaxed);
     718          177537 :         else if (produced < 1)
     719           14963 :             sched->work_finished();
     720          177552 :         ctx->private_outstanding_work = 0;
     721                 : 
     722          177552 :         if (!ctx->private_queue.empty())
     723                 :         {
     724           67098 :             lock->lock();
     725           67098 :             sched->completed_ops_.splice(ctx->private_queue);
     726                 :         }
     727                 :     }
     728                 :     else
     729                 :     {
     730 MIS           0 :         sched->work_finished();
     731                 :     }
     732 HIT      177552 : }
     733                 : 
     734          221208 : inline reactor_scheduler_base::task_cleanup::~task_cleanup()
     735                 : {
     736          110604 :     if (!ctx)
     737 MIS           0 :         return;
     738                 : 
     739 HIT      110604 :     if (ctx->private_outstanding_work > 0)
     740                 :     {
     741            4440 :         sched->outstanding_work_.fetch_add(
     742            4440 :             ctx->private_outstanding_work, std::memory_order_relaxed);
     743            4440 :         ctx->private_outstanding_work = 0;
     744                 :     }
     745                 : 
     746          110604 :     if (!ctx->private_queue.empty())
     747                 :     {
     748            4440 :         if (!lock->owns_lock())
     749 MIS           0 :             lock->lock();
     750 HIT        4440 :         sched->completed_ops_.splice(ctx->private_queue);
     751                 :     }
     752          110604 : }
     753                 : 
     754                 : inline std::size_t
     755          177954 : reactor_scheduler_base::do_one(
     756                 :     std::unique_lock<std::mutex>& lock, long timeout_us, context_type* ctx)
     757                 : {
     758                 :     for (;;)
     759                 :     {
     760          288517 :         if (stopped_)
     761             356 :             return 0;
     762                 : 
     763          288161 :         scheduler_op* op = completed_ops_.pop();
     764                 : 
     765                 :         // Handle reactor sentinel — time to poll for I/O
     766          288161 :         if (op == &task_op_)
     767                 :         {
     768                 :             bool more_handlers =
     769          110609 :                 !completed_ops_.empty() || (ctx && !ctx->private_queue.empty());
     770                 : 
     771          195318 :             if (!more_handlers &&
     772          169418 :                 (outstanding_work_.load(std::memory_order_acquire) == 0 ||
     773                 :                  timeout_us == 0))
     774                 :             {
     775               5 :                 completed_ops_.push(&task_op_);
     776               5 :                 return 0;
     777                 :             }
     778                 : 
     779          110604 :             long task_timeout_us = more_handlers ? 0 : timeout_us;
     780          110604 :             task_interrupted_ = task_timeout_us == 0;
     781          110604 :             task_running_.store(true, std::memory_order_release);
     782                 : 
     783          110604 :             if (more_handlers)
     784           25900 :                 unlock_and_signal_one(lock);
     785                 : 
     786                 :             try
     787                 :             {
     788          110604 :                 run_task(lock, ctx, task_timeout_us);
     789                 :             }
     790 MIS           0 :             catch (...)
     791                 :             {
     792               0 :                 task_running_.store(false, std::memory_order_relaxed);
     793               0 :                 throw;
     794               0 :             }
     795                 : 
     796 HIT      110604 :             task_running_.store(false, std::memory_order_relaxed);
     797          110604 :             completed_ops_.push(&task_op_);
     798          110604 :             if (timeout_us > 0)
     799              41 :                 return 0;
     800          110563 :             continue;
     801          110563 :         }
     802                 : 
     803                 :         // Handle operation
     804          177552 :         if (op != nullptr)
     805                 :         {
     806          177552 :             bool more = !completed_ops_.empty();
     807                 : 
     808          177552 :             if (more)
     809          177552 :                 ctx->unassisted = !unlock_and_signal_one(lock);
     810                 :             else
     811                 :             {
     812 MIS           0 :                 ctx->unassisted = false;
     813               0 :                 lock.unlock();
     814                 :             }
     815                 : 
     816 HIT      177552 :             work_cleanup on_exit{this, &lock, ctx};
     817                 :             (void)on_exit;
     818                 : 
     819          177552 :             (*op)();
     820          177552 :             return 1;
     821          177552 :         }
     822                 : 
     823                 :         // Try private queue before blocking
     824 MIS           0 :         if (reactor_drain_private_queue(ctx, outstanding_work_, completed_ops_))
     825               0 :             continue;
     826                 : 
     827               0 :         if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
     828                 :             timeout_us == 0)
     829               0 :             return 0;
     830                 : 
     831               0 :         clear_signal();
     832               0 :         if (timeout_us < 0)
     833               0 :             wait_for_signal(lock);
     834                 :         else
     835               0 :             wait_for_signal_for(lock, timeout_us);
     836 HIT      110563 :     }
     837                 : }
     838                 : 
     839                 : } // namespace boost::corosio::detail
     840                 : 
     841                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
        

Generated by: LCOV version 2.3