1 | use super::task::Task; |
2 | use super::FuturesUnordered; |
3 | use core::marker::PhantomData; |
4 | use core::pin::Pin; |
5 | use core::ptr; |
6 | use core::sync::atomic::Ordering::Relaxed; |
7 | |
8 | /// Mutable iterator over all futures in the unordered set. |
9 | #[derive(Debug)] |
10 | pub struct IterPinMut<'a, Fut> { |
11 | pub(super) task: *const Task<Fut>, |
12 | pub(super) len: usize, |
13 | pub(super) _marker: PhantomData<&'a mut FuturesUnordered<Fut>>, |
14 | } |
15 | |
16 | /// Mutable iterator over all futures in the unordered set. |
17 | #[derive(Debug)] |
18 | pub struct IterMut<'a, Fut: Unpin>(pub(super) IterPinMut<'a, Fut>); |
19 | |
20 | /// Immutable iterator over all futures in the unordered set. |
21 | #[derive(Debug)] |
22 | pub struct IterPinRef<'a, Fut> { |
23 | pub(super) task: *const Task<Fut>, |
24 | pub(super) len: usize, |
25 | pub(super) pending_next_all: *mut Task<Fut>, |
26 | pub(super) _marker: PhantomData<&'a FuturesUnordered<Fut>>, |
27 | } |
28 | |
29 | /// Immutable iterator over all the futures in the unordered set. |
30 | #[derive(Debug)] |
31 | pub struct Iter<'a, Fut: Unpin>(pub(super) IterPinRef<'a, Fut>); |
32 | |
33 | /// Owned iterator over all futures in the unordered set. |
34 | #[derive(Debug)] |
35 | pub struct IntoIter<Fut: Unpin> { |
36 | pub(super) len: usize, |
37 | pub(super) inner: FuturesUnordered<Fut>, |
38 | } |
39 | |
40 | impl<Fut: Unpin> Iterator for IntoIter<Fut> { |
41 | type Item = Fut; |
42 | |
43 | fn next(&mut self) -> Option<Self::Item> { |
44 | // `head_all` can be accessed directly and we don't need to spin on |
45 | // `Task::next_all` since we have exclusive access to the set. |
46 | let task = self.inner.head_all.get_mut(); |
47 | |
48 | if (*task).is_null() { |
49 | return None; |
50 | } |
51 | |
52 | unsafe { |
53 | // Moving out of the future is safe because it is `Unpin` |
54 | let future = (*(**task).future.get()).take().unwrap(); |
55 | |
56 | // Mutable access to a previously shared `FuturesUnordered` implies |
57 | // that the other threads already released the object before the |
58 | // current thread acquired it, so relaxed ordering can be used and |
59 | // valid `next_all` checks can be skipped. |
60 | let next = (**task).next_all.load(Relaxed); |
61 | *task = next; |
62 | if !task.is_null() { |
63 | *(**task).prev_all.get() = ptr::null_mut(); |
64 | } |
65 | self.len -= 1; |
66 | Some(future) |
67 | } |
68 | } |
69 | |
70 | fn size_hint(&self) -> (usize, Option<usize>) { |
71 | (self.len, Some(self.len)) |
72 | } |
73 | } |
74 | |
75 | impl<Fut: Unpin> ExactSizeIterator for IntoIter<Fut> {} |
76 | |
77 | impl<'a, Fut> Iterator for IterPinMut<'a, Fut> { |
78 | type Item = Pin<&'a mut Fut>; |
79 | |
80 | fn next(&mut self) -> Option<Self::Item> { |
81 | if self.task.is_null() { |
82 | return None; |
83 | } |
84 | |
85 | unsafe { |
86 | let future = (*(*self.task).future.get()).as_mut().unwrap(); |
87 | |
88 | // Mutable access to a previously shared `FuturesUnordered` implies |
89 | // that the other threads already released the object before the |
90 | // current thread acquired it, so relaxed ordering can be used and |
91 | // valid `next_all` checks can be skipped. |
92 | let next = (*self.task).next_all.load(Relaxed); |
93 | self.task = next; |
94 | self.len -= 1; |
95 | Some(Pin::new_unchecked(future)) |
96 | } |
97 | } |
98 | |
99 | fn size_hint(&self) -> (usize, Option<usize>) { |
100 | (self.len, Some(self.len)) |
101 | } |
102 | } |
103 | |
104 | impl<Fut> ExactSizeIterator for IterPinMut<'_, Fut> {} |
105 | |
106 | impl<'a, Fut: Unpin> Iterator for IterMut<'a, Fut> { |
107 | type Item = &'a mut Fut; |
108 | |
109 | fn next(&mut self) -> Option<Self::Item> { |
110 | self.0.next().map(Pin::get_mut) |
111 | } |
112 | |
113 | fn size_hint(&self) -> (usize, Option<usize>) { |
114 | self.0.size_hint() |
115 | } |
116 | } |
117 | |
118 | impl<Fut: Unpin> ExactSizeIterator for IterMut<'_, Fut> {} |
119 | |
120 | impl<'a, Fut> Iterator for IterPinRef<'a, Fut> { |
121 | type Item = Pin<&'a Fut>; |
122 | |
123 | fn next(&mut self) -> Option<Self::Item> { |
124 | if self.task.is_null() { |
125 | return None; |
126 | } |
127 | |
128 | unsafe { |
129 | let future = (*(*self.task).future.get()).as_ref().unwrap(); |
130 | |
131 | // Relaxed ordering can be used since acquire ordering when |
132 | // `head_all` was initially read for this iterator implies acquire |
133 | // ordering for all previously inserted nodes (and we don't need to |
134 | // read `len_all` again for any other nodes). |
135 | let next = (*self.task).spin_next_all(self.pending_next_all, Relaxed); |
136 | self.task = next; |
137 | self.len -= 1; |
138 | Some(Pin::new_unchecked(future)) |
139 | } |
140 | } |
141 | |
142 | fn size_hint(&self) -> (usize, Option<usize>) { |
143 | (self.len, Some(self.len)) |
144 | } |
145 | } |
146 | |
147 | impl<Fut> ExactSizeIterator for IterPinRef<'_, Fut> {} |
148 | |
149 | impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> { |
150 | type Item = &'a Fut; |
151 | |
152 | fn next(&mut self) -> Option<Self::Item> { |
153 | self.0.next().map(Pin::get_ref) |
154 | } |
155 | |
156 | fn size_hint(&self) -> (usize, Option<usize>) { |
157 | self.0.size_hint() |
158 | } |
159 | } |
160 | |
161 | impl<Fut: Unpin> ExactSizeIterator for Iter<'_, Fut> {} |
162 | |
163 | // SAFETY: we do nothing thread-local and there is no interior mutability, |
164 | // so the usual structural `Send`/`Sync` apply. |
165 | unsafe impl<Fut: Send> Send for IterPinRef<'_, Fut> {} |
166 | unsafe impl<Fut: Sync> Sync for IterPinRef<'_, Fut> {} |
167 | |
168 | unsafe impl<Fut: Send> Send for IterPinMut<'_, Fut> {} |
169 | unsafe impl<Fut: Sync> Sync for IterPinMut<'_, Fut> {} |
170 | |
171 | unsafe impl<Fut: Send + Unpin> Send for IntoIter<Fut> {} |
172 | unsafe impl<Fut: Sync + Unpin> Sync for IntoIter<Fut> {} |
173 | |