1use super::task::Task;
2use super::FuturesUnordered;
3use core::marker::PhantomData;
4use core::pin::Pin;
5use core::ptr;
6use core::sync::atomic::Ordering::Relaxed;
7
8/// Mutable iterator over all futures in the unordered set.
9#[derive(Debug)]
10pub struct IterPinMut<'a, Fut> {
11 pub(super) task: *const Task<Fut>,
12 pub(super) len: usize,
13 pub(super) _marker: PhantomData<&'a mut FuturesUnordered<Fut>>,
14}
15
16/// Mutable iterator over all futures in the unordered set.
17#[derive(Debug)]
18pub struct IterMut<'a, Fut: Unpin>(pub(super) IterPinMut<'a, Fut>);
19
20/// Immutable iterator over all futures in the unordered set.
21#[derive(Debug)]
22pub struct IterPinRef<'a, Fut> {
23 pub(super) task: *const Task<Fut>,
24 pub(super) len: usize,
25 pub(super) pending_next_all: *mut Task<Fut>,
26 pub(super) _marker: PhantomData<&'a FuturesUnordered<Fut>>,
27}
28
29/// Immutable iterator over all the futures in the unordered set.
30#[derive(Debug)]
31pub struct Iter<'a, Fut: Unpin>(pub(super) IterPinRef<'a, Fut>);
32
33/// Owned iterator over all futures in the unordered set.
34#[derive(Debug)]
35pub struct IntoIter<Fut: Unpin> {
36 pub(super) len: usize,
37 pub(super) inner: FuturesUnordered<Fut>,
38}
39
40impl<Fut: Unpin> Iterator for IntoIter<Fut> {
41 type Item = Fut;
42
43 fn next(&mut self) -> Option<Self::Item> {
44 // `head_all` can be accessed directly and we don't need to spin on
45 // `Task::next_all` since we have exclusive access to the set.
46 let task = self.inner.head_all.get_mut();
47
48 if (*task).is_null() {
49 return None;
50 }
51
52 unsafe {
53 // Moving out of the future is safe because it is `Unpin`
54 let future = (*(**task).future.get()).take().unwrap();
55
56 // Mutable access to a previously shared `FuturesUnordered` implies
57 // that the other threads already released the object before the
58 // current thread acquired it, so relaxed ordering can be used and
59 // valid `next_all` checks can be skipped.
60 let next = (**task).next_all.load(Relaxed);
61 *task = next;
62 if !task.is_null() {
63 *(**task).prev_all.get() = ptr::null_mut();
64 }
65 self.len -= 1;
66 Some(future)
67 }
68 }
69
70 fn size_hint(&self) -> (usize, Option<usize>) {
71 (self.len, Some(self.len))
72 }
73}
74
75impl<Fut: Unpin> ExactSizeIterator for IntoIter<Fut> {}
76
77impl<'a, Fut> Iterator for IterPinMut<'a, Fut> {
78 type Item = Pin<&'a mut Fut>;
79
80 fn next(&mut self) -> Option<Self::Item> {
81 if self.task.is_null() {
82 return None;
83 }
84
85 unsafe {
86 let future = (*(*self.task).future.get()).as_mut().unwrap();
87
88 // Mutable access to a previously shared `FuturesUnordered` implies
89 // that the other threads already released the object before the
90 // current thread acquired it, so relaxed ordering can be used and
91 // valid `next_all` checks can be skipped.
92 let next = (*self.task).next_all.load(Relaxed);
93 self.task = next;
94 self.len -= 1;
95 Some(Pin::new_unchecked(future))
96 }
97 }
98
99 fn size_hint(&self) -> (usize, Option<usize>) {
100 (self.len, Some(self.len))
101 }
102}
103
104impl<Fut> ExactSizeIterator for IterPinMut<'_, Fut> {}
105
106impl<'a, Fut: Unpin> Iterator for IterMut<'a, Fut> {
107 type Item = &'a mut Fut;
108
109 fn next(&mut self) -> Option<Self::Item> {
110 self.0.next().map(Pin::get_mut)
111 }
112
113 fn size_hint(&self) -> (usize, Option<usize>) {
114 self.0.size_hint()
115 }
116}
117
118impl<Fut: Unpin> ExactSizeIterator for IterMut<'_, Fut> {}
119
120impl<'a, Fut> Iterator for IterPinRef<'a, Fut> {
121 type Item = Pin<&'a Fut>;
122
123 fn next(&mut self) -> Option<Self::Item> {
124 if self.task.is_null() {
125 return None;
126 }
127
128 unsafe {
129 let future = (*(*self.task).future.get()).as_ref().unwrap();
130
131 // Relaxed ordering can be used since acquire ordering when
132 // `head_all` was initially read for this iterator implies acquire
133 // ordering for all previously inserted nodes (and we don't need to
134 // read `len_all` again for any other nodes).
135 let next = (*self.task).spin_next_all(self.pending_next_all, Relaxed);
136 self.task = next;
137 self.len -= 1;
138 Some(Pin::new_unchecked(future))
139 }
140 }
141
142 fn size_hint(&self) -> (usize, Option<usize>) {
143 (self.len, Some(self.len))
144 }
145}
146
147impl<Fut> ExactSizeIterator for IterPinRef<'_, Fut> {}
148
149impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> {
150 type Item = &'a Fut;
151
152 fn next(&mut self) -> Option<Self::Item> {
153 self.0.next().map(Pin::get_ref)
154 }
155
156 fn size_hint(&self) -> (usize, Option<usize>) {
157 self.0.size_hint()
158 }
159}
160
161impl<Fut: Unpin> ExactSizeIterator for Iter<'_, Fut> {}
162
163// SAFETY: we do nothing thread-local and there is no interior mutability,
164// so the usual structural `Send`/`Sync` apply.
165unsafe impl<Fut: Send> Send for IterPinRef<'_, Fut> {}
166unsafe impl<Fut: Sync> Sync for IterPinRef<'_, Fut> {}
167
168unsafe impl<Fut: Send> Send for IterPinMut<'_, Fut> {}
169unsafe impl<Fut: Sync> Sync for IterPinMut<'_, Fut> {}
170
171unsafe impl<Fut: Send + Unpin> Send for IntoIter<Fut> {}
172unsafe impl<Fut: Sync + Unpin> Sync for IntoIter<Fut> {}
173