1 | /* |
2 | Copyright 2018 Google Inc. All Rights Reserved. |
3 | |
4 | Licensed under the Apache License, Version 2.0 (the "License"); |
5 | you may not use this file except in compliance with the License. |
6 | You may obtain a copy of the License at |
7 | |
8 | http://www.apache.org/licenses/LICENSE-2.0 |
9 | |
10 | Unless required by applicable law or agreed to in writing, software |
11 | distributed under the License is distributed on an "AS-IS" BASIS, |
12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | See the License for the specific language governing permissions and |
14 | limitations under the License. |
15 | */ |
16 | |
17 | #include "utils/lockless_task_queue.h" |
18 | |
19 | #include <limits> |
20 | |
21 | #include "base/logging.h" |
22 | |
23 | namespace vraudio { |
24 | |
25 | namespace { |
26 | |
27 | // Reserved index representing an invalid list index. |
28 | constexpr uint64_t kInvalidIndex = std::numeric_limits<uint32_t>::max(); |
29 | |
30 | // Maximum number of producers. |
31 | constexpr uint64_t kMaxProducers = kInvalidIndex - 1; |
32 | |
33 | } // namespace |
34 | |
35 | LocklessTaskQueue::LocklessTaskQueue(size_t max_tasks) { |
36 | CHECK_GT(max_tasks, 0U); |
37 | CHECK_LE(max_tasks, kMaxProducers); |
38 | Init(num_nodes: max_tasks); |
39 | } |
40 | |
41 | LocklessTaskQueue::~LocklessTaskQueue() { Clear(); } |
42 | |
43 | void LocklessTaskQueue::Post(Task&& task) { |
44 | const TagAndIndex free_node_idx = PopNodeFromList(list_head: &free_list_head_idx_); |
45 | if (GetIndex(tag_and_index: free_node_idx) == kInvalidIndex) { |
46 | LOG(WARNING) << "Queue capacity reached - dropping task" ; |
47 | return; |
48 | } |
49 | nodes_[GetIndex(tag_and_index: free_node_idx)].task = std::move(task); |
50 | PushNodeToList(list_head: &task_list_head_idx_, node: free_node_idx); |
51 | } |
52 | |
53 | void LocklessTaskQueue::Execute() { |
54 | const TagAndIndex old_flag_with_invalid_index = |
55 | (GetFlag(tag_and_index: task_list_head_idx_) << 32) + kInvalidIndex; |
56 | const TagAndIndex old_task_list_head_idx = |
57 | task_list_head_idx_.exchange(i: old_flag_with_invalid_index); |
58 | ProcessTaskList(list_head: old_task_list_head_idx, execute: true /*execute_tasks*/); |
59 | } |
60 | |
61 | void LocklessTaskQueue::Clear() { |
62 | const TagAndIndex old_flag_with_invalid_index = |
63 | (GetFlag(tag_and_index: task_list_head_idx_) << 32) + kInvalidIndex; |
64 | const TagAndIndex old_task_list_head_idx = |
65 | task_list_head_idx_.exchange(i: old_flag_with_invalid_index); |
66 | ProcessTaskList(list_head: old_task_list_head_idx, execute: false /*execute_tasks*/); |
67 | } |
68 | |
69 | LocklessTaskQueue::TagAndIndex LocklessTaskQueue::IncreaseTag( |
70 | TagAndIndex tag_and_index) { |
71 | // The most significant 32 bits a reserved for tagging. Overflows are |
72 | // acceptable. |
73 | return tag_and_index + (static_cast<uint64_t>(1) << 32); |
74 | } |
75 | |
76 | LocklessTaskQueue::TagAndIndex LocklessTaskQueue::GetIndex( |
77 | TagAndIndex tag_and_index) { |
78 | // The least significant 32 bits a reserved for the index. |
79 | return tag_and_index & std::numeric_limits<uint32_t>::max(); |
80 | } |
81 | |
82 | // Extracts the flag in the most significant 32 bits from a TagAndIndex; |
83 | LocklessTaskQueue::TagAndIndex LocklessTaskQueue::GetFlag( |
84 | TagAndIndex tag_and_index) { |
85 | // The most significant 32 bits a reserved for the flag. |
86 | return tag_and_index >> 32; |
87 | } |
88 | |
89 | void LocklessTaskQueue::PushNodeToList( |
90 | std::atomic<TagAndIndex>* list_head_idx_ptr, TagAndIndex node_idx) { |
91 | DCHECK(list_head_idx_ptr); |
92 | TagAndIndex list_head_idx; |
93 | do { |
94 | list_head_idx = list_head_idx_ptr->load(); |
95 | nodes_[GetIndex(tag_and_index: node_idx)].next = list_head_idx; |
96 | } while (!std::atomic_compare_exchange_strong(a: list_head_idx_ptr, |
97 | i1: &list_head_idx, i2: node_idx)); |
98 | } |
99 | |
100 | LocklessTaskQueue::TagAndIndex LocklessTaskQueue::PopNodeFromList( |
101 | std::atomic<TagAndIndex>* list_head_idx_ptr) { |
102 | DCHECK(list_head_idx_ptr); |
103 | TagAndIndex list_head_idx; |
104 | TagAndIndex list_head_next; |
105 | do { |
106 | list_head_idx = list_head_idx_ptr->load(); |
107 | if (GetIndex(tag_and_index: list_head_idx) == kInvalidIndex) { |
108 | // End of list reached. |
109 | return kInvalidIndex; |
110 | } |
111 | list_head_next = nodes_[GetIndex(tag_and_index: list_head_idx)].next; |
112 | } while (!std::atomic_compare_exchange_strong( |
113 | a: list_head_idx_ptr, i1: &list_head_idx, i2: list_head_next)); |
114 | return IncreaseTag(tag_and_index: list_head_idx); |
115 | } |
116 | |
117 | void LocklessTaskQueue::ProcessTaskList(TagAndIndex list_head_idx, |
118 | bool execute) { |
119 | TagAndIndex node_itr = list_head_idx; |
120 | while (GetIndex(tag_and_index: node_itr) != kInvalidIndex) { |
121 | Node* node = &nodes_[GetIndex(tag_and_index: node_itr)]; |
122 | TagAndIndex next_node = node->next; |
123 | temp_tasks_.emplace_back(args: std::move(node->task)); |
124 | node->task = nullptr; |
125 | PushNodeToList(list_head_idx_ptr: &free_list_head_idx_, node_idx: node_itr); |
126 | node_itr = next_node; |
127 | } |
128 | |
129 | if (execute) { |
130 | // Execute tasks in reverse order. |
131 | for (std::vector<Task>::reverse_iterator task_itr = temp_tasks_.rbegin(); |
132 | task_itr != temp_tasks_.rend(); ++task_itr) { |
133 | if (*task_itr != nullptr) { |
134 | (*task_itr)(); |
135 | } |
136 | } |
137 | } |
138 | temp_tasks_.clear(); |
139 | } |
140 | |
141 | void LocklessTaskQueue::Init(size_t num_nodes) { |
142 | nodes_.resize(new_size: num_nodes); |
143 | temp_tasks_.reserve(n: num_nodes); |
144 | |
145 | // Initialize free list. |
146 | free_list_head_idx_ = 0; |
147 | for (size_t i = 0; i < num_nodes - 1; ++i) { |
148 | nodes_[i].next = i + 1; |
149 | } |
150 | nodes_[num_nodes - 1].next = kInvalidIndex; |
151 | |
152 | // Initialize task list. |
153 | task_list_head_idx_ = kInvalidIndex; |
154 | } |
155 | |
156 | } // namespace vraudio |
157 | |