| 1 | |
| 2 | // Copyright Oliver Kowalke 2015. |
| 3 | // Distributed under the Boost Software License, Version 1.0. |
| 4 | // (See accompanying file LICENSE_1_0.txt or copy at |
| 5 | // http://www.boost.org/LICENSE_1_0.txt) |
| 6 | // |
| 7 | |
| 8 | #include "boost/fiber/numa/algo/work_stealing.hpp" |
| 9 | |
| 10 | #include <cmath> |
| 11 | #include <random> |
| 12 | |
| 13 | #include <boost/assert.hpp> |
| 14 | #include <boost/context/detail/prefetch.hpp> |
| 15 | |
| 16 | #include "boost/fiber/detail/thread_barrier.hpp" |
| 17 | #include "boost/fiber/type.hpp" |
| 18 | |
| 19 | #ifdef BOOST_HAS_ABI_HEADERS |
| 20 | # include BOOST_ABI_PREFIX |
| 21 | #endif |
| 22 | |
| 23 | namespace boost { |
| 24 | namespace fibers { |
| 25 | namespace numa { |
| 26 | namespace algo { |
| 27 | |
| 28 | std::vector< intrusive_ptr< work_stealing > > work_stealing::schedulers_{}; |
| 29 | |
| 30 | std::vector< std::uint32_t > get_local_cpus( std::uint32_t node_id, std::vector< boost::fibers::numa::node > const& topo) { |
| 31 | for ( auto & node : topo) { |
| 32 | if ( node_id == node.id) { |
| 33 | // store IDs of logical cpus that belong to this local NUMA node |
| 34 | return std::vector< std::uint32_t >{ node.logical_cpus.begin(), node.logical_cpus.end() }; |
| 35 | } |
| 36 | } |
| 37 | return std::vector< std::uint32_t >{}; |
| 38 | } |
| 39 | |
| 40 | std::vector< std::uint32_t > get_remote_cpus( std::uint32_t node_id, std::vector< boost::fibers::numa::node > const& topo) { |
| 41 | std::vector< std::uint32_t > remote_cpus; |
| 42 | for ( auto & node : topo) { |
| 43 | if ( node_id != node.id) { |
| 44 | // store IDs of logical cpus that belong to a remote NUMA node |
| 45 | // no ordering regarding to the NUMA distance |
| 46 | remote_cpus.insert( position: remote_cpus.end(), first: node.logical_cpus.begin(), last: node.logical_cpus.end() ); |
| 47 | } |
| 48 | } |
| 49 | return remote_cpus; |
| 50 | } |
| 51 | |
| 52 | void |
| 53 | work_stealing::init_( std::vector< boost::fibers::numa::node > const& topo, |
| 54 | std::vector< intrusive_ptr< work_stealing > > & schedulers) { |
| 55 | std::uint32_t max_cpu_id = 0; |
| 56 | for ( auto & node : topo) { |
| 57 | max_cpu_id = (std::max)( a: max_cpu_id, b: * node.logical_cpus.rbegin() ); |
| 58 | } |
| 59 | // resize array of schedulers to max. CPU ID, initilized with nullptr |
| 60 | // CPU ID acts as the index in the scheduler array |
| 61 | // if a logical cpus is offline, schedulers_ will contain a nullptr |
| 62 | // logical cpus index starts at `0` -> add 1 |
| 63 | std::vector< intrusive_ptr< work_stealing > >{ max_cpu_id + 1, nullptr }.swap( x&: schedulers); |
| 64 | } |
| 65 | |
| 66 | work_stealing::work_stealing( |
| 67 | std::uint32_t cpu_id, |
| 68 | std::uint32_t node_id, |
| 69 | std::vector< boost::fibers::numa::node > const& topo, |
| 70 | bool suspend) : |
| 71 | cpu_id_{ cpu_id }, |
| 72 | local_cpus_{ get_local_cpus( node_id, topo) }, |
| 73 | remote_cpus_{ get_remote_cpus( node_id, topo) }, |
| 74 | suspend_{ suspend } { |
| 75 | // pin current thread to logical cpu |
| 76 | boost::fibers::numa::pin_thread( cpuid: cpu_id_); |
| 77 | // calculate thread count |
| 78 | std::size_t thread_count = 0; |
| 79 | for ( auto & node : topo) { |
| 80 | thread_count += node.logical_cpus.size(); |
| 81 | } |
| 82 | static boost::fibers::detail::thread_barrier b{ thread_count }; |
| 83 | // initialize the array of schedulers |
| 84 | static std::once_flag flag; |
| 85 | std::call_once( once&: flag, f: & work_stealing::init_, args: topo, args: std::ref( t&: schedulers_) ); |
| 86 | // register pointer of this scheduler |
| 87 | schedulers_[cpu_id_] = this; |
| 88 | b.wait(); |
| 89 | } |
| 90 | |
| 91 | void |
| 92 | work_stealing::awakened( context * ctx) noexcept { |
| 93 | if ( ! ctx->is_context( t: type::pinned_context) ) { |
| 94 | ctx->detach(); |
| 95 | } |
| 96 | rqueue_.push( c: ctx); |
| 97 | } |
| 98 | |
| 99 | context * |
| 100 | work_stealing::pick_next() noexcept { |
| 101 | context * victim = rqueue_.pop(); |
| 102 | if ( nullptr != victim) { |
| 103 | boost::context::detail::prefetch_range( addr: victim, len: sizeof( context) ); |
| 104 | if ( ! victim->is_context( t: type::pinned_context) ) { |
| 105 | context::active()->attach( victim); |
| 106 | } |
| 107 | } else { |
| 108 | std::uint32_t cpu_id = 0; |
| 109 | std::size_t count = 0, size = local_cpus_.size(); |
| 110 | static thread_local std::minstd_rand generator{ std::random_device{}() }; |
| 111 | std::uniform_int_distribution< std::uint32_t > local_distribution{ |
| 112 | 0, static_cast< std::uint32_t >( local_cpus_.size() - 1) }; |
| 113 | std::uniform_int_distribution< std::uint32_t > remote_distribution{ |
| 114 | 0, static_cast< std::uint32_t >( remote_cpus_.size() - 1) }; |
| 115 | do { |
| 116 | do { |
| 117 | ++count; |
| 118 | // random selection of one logical cpu |
| 119 | // that belongs to the local NUMA node |
| 120 | cpu_id = local_cpus_[local_distribution( generator)]; |
| 121 | // prevent stealing from own scheduler |
| 122 | } while ( cpu_id == cpu_id_); |
| 123 | // steal context from other scheduler |
| 124 | // schedulers_[cpu_id] should never contain a nullptr |
| 125 | BOOST_ASSERT( nullptr != schedulers_[cpu_id]); |
| 126 | victim = schedulers_[cpu_id]->steal(); |
| 127 | } while ( nullptr == victim && count < size); |
| 128 | if ( nullptr != victim) { |
| 129 | boost::context::detail::prefetch_range( addr: victim, len: sizeof( context) ); |
| 130 | BOOST_ASSERT( ! victim->is_context( type::pinned_context) ); |
| 131 | context::active()->attach( victim); |
| 132 | } else if ( ! remote_cpus_.empty() ) { |
| 133 | cpu_id = 0; |
| 134 | count = 0; |
| 135 | size = remote_cpus_.size(); |
| 136 | do { |
| 137 | ++count; |
| 138 | // random selection of one logical cpu |
| 139 | // that belongs to a remote NUMA node |
| 140 | cpu_id = remote_cpus_[remote_distribution( generator)]; |
| 141 | // remote cpu ID should never be equal to local cpu ID |
| 142 | BOOST_ASSERT( cpu_id != cpu_id_); |
| 143 | // schedulers_[cpu_id] should never contain a nullptr |
| 144 | BOOST_ASSERT( nullptr != schedulers_[cpu_id]); |
| 145 | // steal context from other scheduler |
| 146 | victim = schedulers_[cpu_id]->steal(); |
| 147 | } while ( nullptr == victim && count < size); |
| 148 | if ( nullptr != victim) { |
| 149 | boost::context::detail::prefetch_range( addr: victim, len: sizeof( context) ); |
| 150 | BOOST_ASSERT( ! victim->is_context( type::pinned_context) ); |
| 151 | // move memory from remote NUMA-node to |
| 152 | // memory of local NUMA-node |
| 153 | context::active()->attach( victim); |
| 154 | } |
| 155 | } |
| 156 | } |
| 157 | return victim; |
| 158 | } |
| 159 | |
| 160 | void |
| 161 | work_stealing::suspend_until( std::chrono::steady_clock::time_point const& time_point) noexcept { |
| 162 | if ( suspend_) { |
| 163 | if ( (std::chrono::steady_clock::time_point::max)() == time_point) { |
| 164 | std::unique_lock< std::mutex > lk{ mtx_ }; |
| 165 | cnd_.wait( lock&: lk, p: [this](){ return flag_; }); |
| 166 | flag_ = false; |
| 167 | } else { |
| 168 | std::unique_lock< std::mutex > lk{ mtx_ }; |
| 169 | cnd_.wait_until( lock&: lk, atime: time_point, p: [this](){ return flag_; }); |
| 170 | flag_ = false; |
| 171 | } |
| 172 | } |
| 173 | } |
| 174 | |
| 175 | void |
| 176 | work_stealing::notify() noexcept { |
| 177 | if ( suspend_) { |
| 178 | std::unique_lock< std::mutex > lk{ mtx_ }; |
| 179 | flag_ = true; |
| 180 | lk.unlock(); |
| 181 | cnd_.notify_all(); |
| 182 | } |
| 183 | } |
| 184 | |
| 185 | }}}} |
| 186 | |
| 187 | #ifdef BOOST_HAS_ABI_HEADERS |
| 188 | # include BOOST_ABI_SUFFIX |
| 189 | #endif |
| 190 | |