| 1 | /* |
| 2 | Copyright 2018 Google Inc. All Rights Reserved. |
| 3 | |
| 4 | Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | you may not use this file except in compliance with the License. |
| 6 | You may obtain a copy of the License at |
| 7 | |
| 8 | http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | |
| 10 | Unless required by applicable law or agreed to in writing, software |
| 11 | distributed under the License is distributed on an "AS-IS" BASIS, |
| 12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | See the License for the specific language governing permissions and |
| 14 | limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | #include "utils/lockless_task_queue.h" |
| 18 | |
| 19 | #include <limits> |
| 20 | |
| 21 | #include "base/logging.h" |
| 22 | |
| 23 | namespace vraudio { |
| 24 | |
| 25 | namespace { |
| 26 | |
| 27 | // Reserved index representing an invalid list index. |
| 28 | constexpr uint64_t kInvalidIndex = std::numeric_limits<uint32_t>::max(); |
| 29 | |
| 30 | // Maximum number of producers. |
| 31 | constexpr uint64_t kMaxProducers = kInvalidIndex - 1; |
| 32 | |
| 33 | } // namespace |
| 34 | |
| 35 | LocklessTaskQueue::LocklessTaskQueue(size_t max_tasks) { |
| 36 | CHECK_GT(max_tasks, 0U); |
| 37 | CHECK_LE(max_tasks, kMaxProducers); |
| 38 | Init(num_nodes: max_tasks); |
| 39 | } |
| 40 | |
| 41 | LocklessTaskQueue::~LocklessTaskQueue() { Clear(); } |
| 42 | |
| 43 | void LocklessTaskQueue::Post(Task&& task) { |
| 44 | const TagAndIndex free_node_idx = PopNodeFromList(list_head: &free_list_head_idx_); |
| 45 | if (GetIndex(tag_and_index: free_node_idx) == kInvalidIndex) { |
| 46 | LOG(WARNING) << "Queue capacity reached - dropping task" ; |
| 47 | return; |
| 48 | } |
| 49 | nodes_[GetIndex(tag_and_index: free_node_idx)].task = std::move(task); |
| 50 | PushNodeToList(list_head: &task_list_head_idx_, node: free_node_idx); |
| 51 | } |
| 52 | |
| 53 | void LocklessTaskQueue::Execute() { |
| 54 | const TagAndIndex old_flag_with_invalid_index = |
| 55 | (GetFlag(tag_and_index: task_list_head_idx_) << 32) + kInvalidIndex; |
| 56 | const TagAndIndex old_task_list_head_idx = |
| 57 | task_list_head_idx_.exchange(i: old_flag_with_invalid_index); |
| 58 | ProcessTaskList(list_head: old_task_list_head_idx, execute: true /*execute_tasks*/); |
| 59 | } |
| 60 | |
| 61 | void LocklessTaskQueue::Clear() { |
| 62 | const TagAndIndex old_flag_with_invalid_index = |
| 63 | (GetFlag(tag_and_index: task_list_head_idx_) << 32) + kInvalidIndex; |
| 64 | const TagAndIndex old_task_list_head_idx = |
| 65 | task_list_head_idx_.exchange(i: old_flag_with_invalid_index); |
| 66 | ProcessTaskList(list_head: old_task_list_head_idx, execute: false /*execute_tasks*/); |
| 67 | } |
| 68 | |
| 69 | LocklessTaskQueue::TagAndIndex LocklessTaskQueue::IncreaseTag( |
| 70 | TagAndIndex tag_and_index) { |
| 71 | // The most significant 32 bits a reserved for tagging. Overflows are |
| 72 | // acceptable. |
| 73 | return tag_and_index + (static_cast<uint64_t>(1) << 32); |
| 74 | } |
| 75 | |
| 76 | LocklessTaskQueue::TagAndIndex LocklessTaskQueue::GetIndex( |
| 77 | TagAndIndex tag_and_index) { |
| 78 | // The least significant 32 bits a reserved for the index. |
| 79 | return tag_and_index & std::numeric_limits<uint32_t>::max(); |
| 80 | } |
| 81 | |
| 82 | // Extracts the flag in the most significant 32 bits from a TagAndIndex; |
| 83 | LocklessTaskQueue::TagAndIndex LocklessTaskQueue::GetFlag( |
| 84 | TagAndIndex tag_and_index) { |
| 85 | // The most significant 32 bits a reserved for the flag. |
| 86 | return tag_and_index >> 32; |
| 87 | } |
| 88 | |
| 89 | void LocklessTaskQueue::PushNodeToList( |
| 90 | std::atomic<TagAndIndex>* list_head_idx_ptr, TagAndIndex node_idx) { |
| 91 | DCHECK(list_head_idx_ptr); |
| 92 | TagAndIndex list_head_idx; |
| 93 | do { |
| 94 | list_head_idx = list_head_idx_ptr->load(); |
| 95 | nodes_[GetIndex(tag_and_index: node_idx)].next = list_head_idx; |
| 96 | } while (!std::atomic_compare_exchange_strong(a: list_head_idx_ptr, |
| 97 | i1: &list_head_idx, i2: node_idx)); |
| 98 | } |
| 99 | |
| 100 | LocklessTaskQueue::TagAndIndex LocklessTaskQueue::PopNodeFromList( |
| 101 | std::atomic<TagAndIndex>* list_head_idx_ptr) { |
| 102 | DCHECK(list_head_idx_ptr); |
| 103 | TagAndIndex list_head_idx; |
| 104 | TagAndIndex list_head_next; |
| 105 | do { |
| 106 | list_head_idx = list_head_idx_ptr->load(); |
| 107 | if (GetIndex(tag_and_index: list_head_idx) == kInvalidIndex) { |
| 108 | // End of list reached. |
| 109 | return kInvalidIndex; |
| 110 | } |
| 111 | list_head_next = nodes_[GetIndex(tag_and_index: list_head_idx)].next; |
| 112 | } while (!std::atomic_compare_exchange_strong( |
| 113 | a: list_head_idx_ptr, i1: &list_head_idx, i2: list_head_next)); |
| 114 | return IncreaseTag(tag_and_index: list_head_idx); |
| 115 | } |
| 116 | |
| 117 | void LocklessTaskQueue::ProcessTaskList(TagAndIndex list_head_idx, |
| 118 | bool execute) { |
| 119 | TagAndIndex node_itr = list_head_idx; |
| 120 | while (GetIndex(tag_and_index: node_itr) != kInvalidIndex) { |
| 121 | Node* node = &nodes_[GetIndex(tag_and_index: node_itr)]; |
| 122 | TagAndIndex next_node = node->next; |
| 123 | temp_tasks_.emplace_back(args: std::move(node->task)); |
| 124 | node->task = nullptr; |
| 125 | PushNodeToList(list_head_idx_ptr: &free_list_head_idx_, node_idx: node_itr); |
| 126 | node_itr = next_node; |
| 127 | } |
| 128 | |
| 129 | if (execute) { |
| 130 | // Execute tasks in reverse order. |
| 131 | for (std::vector<Task>::reverse_iterator task_itr = temp_tasks_.rbegin(); |
| 132 | task_itr != temp_tasks_.rend(); ++task_itr) { |
| 133 | if (*task_itr != nullptr) { |
| 134 | (*task_itr)(); |
| 135 | } |
| 136 | } |
| 137 | } |
| 138 | temp_tasks_.clear(); |
| 139 | } |
| 140 | |
| 141 | void LocklessTaskQueue::Init(size_t num_nodes) { |
| 142 | nodes_.resize(new_size: num_nodes); |
| 143 | temp_tasks_.reserve(n: num_nodes); |
| 144 | |
| 145 | // Initialize free list. |
| 146 | free_list_head_idx_ = 0; |
| 147 | for (size_t i = 0; i < num_nodes - 1; ++i) { |
| 148 | nodes_[i].next = i + 1; |
| 149 | } |
| 150 | nodes_[num_nodes - 1].next = kInvalidIndex; |
| 151 | |
| 152 | // Initialize task list. |
| 153 | task_list_head_idx_ = kInvalidIndex; |
| 154 | } |
| 155 | |
| 156 | } // namespace vraudio |
| 157 | |