| 1 | /* | 
| 2 | Copyright 2018 Google Inc. All Rights Reserved. | 
| 3 |  | 
| 4 | Licensed under the Apache License, Version 2.0 (the "License"); | 
| 5 | you may not use this file except in compliance with the License. | 
| 6 | You may obtain a copy of the License at | 
| 7 |  | 
| 8 |     http://www.apache.org/licenses/LICENSE-2.0 | 
| 9 |  | 
| 10 | Unless required by applicable law or agreed to in writing, software | 
| 11 | distributed under the License is distributed on an "AS-IS" BASIS, | 
| 12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
| 13 | See the License for the specific language governing permissions and | 
| 14 | limitations under the License. | 
| 15 | */ | 
| 16 |  | 
| 17 | #include "utils/lockless_task_queue.h" | 
| 18 |  | 
| 19 | #include <limits> | 
| 20 |  | 
| 21 | #include "base/logging.h" | 
| 22 |  | 
| 23 | namespace vraudio { | 
| 24 |  | 
| 25 | namespace { | 
| 26 |  | 
| 27 | // Reserved index representing an invalid list index. | 
| 28 | constexpr uint64_t kInvalidIndex = std::numeric_limits<uint32_t>::max(); | 
| 29 |  | 
| 30 | // Maximum number of producers. | 
| 31 | constexpr uint64_t kMaxProducers = kInvalidIndex - 1; | 
| 32 |  | 
| 33 | }  // namespace | 
| 34 |  | 
| 35 | LocklessTaskQueue::LocklessTaskQueue(size_t max_tasks) { | 
| 36 |   CHECK_GT(max_tasks, 0U); | 
| 37 |   CHECK_LE(max_tasks, kMaxProducers); | 
| 38 |   Init(num_nodes: max_tasks); | 
| 39 | } | 
| 40 |  | 
| 41 | LocklessTaskQueue::~LocklessTaskQueue() { Clear(); } | 
| 42 |  | 
| 43 | void LocklessTaskQueue::Post(Task&& task) { | 
| 44 |   const TagAndIndex free_node_idx = PopNodeFromList(list_head: &free_list_head_idx_); | 
| 45 |   if (GetIndex(tag_and_index: free_node_idx) == kInvalidIndex) { | 
| 46 |     LOG(WARNING) << "Queue capacity reached - dropping task" ; | 
| 47 |     return; | 
| 48 |   } | 
| 49 |   nodes_[GetIndex(tag_and_index: free_node_idx)].task = std::move(task); | 
| 50 |   PushNodeToList(list_head: &task_list_head_idx_, node: free_node_idx); | 
| 51 | } | 
| 52 |  | 
| 53 | void LocklessTaskQueue::Execute() { | 
| 54 |   const TagAndIndex old_flag_with_invalid_index = | 
| 55 |       (GetFlag(tag_and_index: task_list_head_idx_) << 32) + kInvalidIndex; | 
| 56 |   const TagAndIndex old_task_list_head_idx = | 
| 57 |       task_list_head_idx_.exchange(i: old_flag_with_invalid_index); | 
| 58 |   ProcessTaskList(list_head: old_task_list_head_idx, execute: true /*execute_tasks*/); | 
| 59 | } | 
| 60 |  | 
| 61 | void LocklessTaskQueue::Clear() { | 
| 62 |   const TagAndIndex old_flag_with_invalid_index = | 
| 63 |       (GetFlag(tag_and_index: task_list_head_idx_) << 32) + kInvalidIndex; | 
| 64 |   const TagAndIndex old_task_list_head_idx = | 
| 65 |       task_list_head_idx_.exchange(i: old_flag_with_invalid_index); | 
| 66 |   ProcessTaskList(list_head: old_task_list_head_idx, execute: false /*execute_tasks*/); | 
| 67 | } | 
| 68 |  | 
| 69 | LocklessTaskQueue::TagAndIndex LocklessTaskQueue::IncreaseTag( | 
| 70 |     TagAndIndex tag_and_index) { | 
| 71 |   // The most significant 32 bits a reserved for tagging. Overflows are | 
| 72 |   // acceptable. | 
| 73 |   return tag_and_index + (static_cast<uint64_t>(1) << 32); | 
| 74 | } | 
| 75 |  | 
| 76 | LocklessTaskQueue::TagAndIndex LocklessTaskQueue::GetIndex( | 
| 77 |     TagAndIndex tag_and_index) { | 
| 78 |   // The least significant 32 bits a reserved for the index. | 
| 79 |   return tag_and_index & std::numeric_limits<uint32_t>::max(); | 
| 80 | } | 
| 81 |  | 
| 82 | // Extracts the flag in the most significant 32 bits from a TagAndIndex; | 
| 83 | LocklessTaskQueue::TagAndIndex LocklessTaskQueue::GetFlag( | 
| 84 |     TagAndIndex tag_and_index) { | 
| 85 |   // The most significant 32 bits a reserved for the flag. | 
| 86 |   return tag_and_index >> 32; | 
| 87 | } | 
| 88 |  | 
| 89 | void LocklessTaskQueue::PushNodeToList( | 
| 90 |     std::atomic<TagAndIndex>* list_head_idx_ptr, TagAndIndex node_idx) { | 
| 91 |   DCHECK(list_head_idx_ptr); | 
| 92 |   TagAndIndex list_head_idx; | 
| 93 |   do { | 
| 94 |     list_head_idx = list_head_idx_ptr->load(); | 
| 95 |     nodes_[GetIndex(tag_and_index: node_idx)].next = list_head_idx; | 
| 96 |   } while (!std::atomic_compare_exchange_strong(a: list_head_idx_ptr, | 
| 97 |                                                 i1: &list_head_idx, i2: node_idx)); | 
| 98 | } | 
| 99 |  | 
| 100 | LocklessTaskQueue::TagAndIndex LocklessTaskQueue::PopNodeFromList( | 
| 101 |     std::atomic<TagAndIndex>* list_head_idx_ptr) { | 
| 102 |   DCHECK(list_head_idx_ptr); | 
| 103 |   TagAndIndex list_head_idx; | 
| 104 |   TagAndIndex list_head_next; | 
| 105 |   do { | 
| 106 |     list_head_idx = list_head_idx_ptr->load(); | 
| 107 |     if (GetIndex(tag_and_index: list_head_idx) == kInvalidIndex) { | 
| 108 |       // End of list reached. | 
| 109 |       return kInvalidIndex; | 
| 110 |     } | 
| 111 |     list_head_next = nodes_[GetIndex(tag_and_index: list_head_idx)].next; | 
| 112 |   } while (!std::atomic_compare_exchange_strong( | 
| 113 |       a: list_head_idx_ptr, i1: &list_head_idx, i2: list_head_next)); | 
| 114 |   return IncreaseTag(tag_and_index: list_head_idx); | 
| 115 | } | 
| 116 |  | 
| 117 | void LocklessTaskQueue::ProcessTaskList(TagAndIndex list_head_idx, | 
| 118 |                                         bool execute) { | 
| 119 |   TagAndIndex node_itr = list_head_idx; | 
| 120 |   while (GetIndex(tag_and_index: node_itr) != kInvalidIndex) { | 
| 121 |     Node* node = &nodes_[GetIndex(tag_and_index: node_itr)]; | 
| 122 |     TagAndIndex next_node = node->next; | 
| 123 |     temp_tasks_.emplace_back(args: std::move(node->task)); | 
| 124 |     node->task = nullptr; | 
| 125 |     PushNodeToList(list_head_idx_ptr: &free_list_head_idx_, node_idx: node_itr); | 
| 126 |     node_itr = next_node; | 
| 127 |   } | 
| 128 |  | 
| 129 |   if (execute) { | 
| 130 |     // Execute tasks in reverse order. | 
| 131 |     for (std::vector<Task>::reverse_iterator task_itr = temp_tasks_.rbegin(); | 
| 132 |          task_itr != temp_tasks_.rend(); ++task_itr) { | 
| 133 |       if (*task_itr != nullptr) { | 
| 134 |         (*task_itr)(); | 
| 135 |       } | 
| 136 |     } | 
| 137 |   } | 
| 138 |   temp_tasks_.clear(); | 
| 139 | } | 
| 140 |  | 
| 141 | void LocklessTaskQueue::Init(size_t num_nodes) { | 
| 142 |   nodes_.resize(new_size: num_nodes); | 
| 143 |   temp_tasks_.reserve(n: num_nodes); | 
| 144 |  | 
| 145 |   // Initialize free list. | 
| 146 |   free_list_head_idx_ = 0; | 
| 147 |   for (size_t i = 0; i < num_nodes - 1; ++i) { | 
| 148 |     nodes_[i].next = i + 1; | 
| 149 |   } | 
| 150 |   nodes_[num_nodes - 1].next = kInvalidIndex; | 
| 151 |  | 
| 152 |   // Initialize task list. | 
| 153 |   task_list_head_idx_ = kInvalidIndex; | 
| 154 | } | 
| 155 |  | 
| 156 | }  // namespace vraudio | 
| 157 |  |