1use super::*;
2
3use indexmap::{self, IndexMap};
4
5use std::convert::Infallible;
6use std::fmt;
7use std::marker::PhantomData;
8use std::ops;
9
10/// Storage for streams
11#[derive(Debug)]
12pub(super) struct Store {
13 slab: slab::Slab<Stream>,
14 ids: IndexMap<StreamId, SlabIndex>,
15}
16
17/// "Pointer" to an entry in the store
18pub(super) struct Ptr<'a> {
19 key: Key,
20 store: &'a mut Store,
21}
22
23/// References an entry in the store.
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub(crate) struct Key {
26 index: SlabIndex,
27 /// Keep the stream ID in the key as an ABA guard, since slab indices
28 /// could be re-used with a new stream.
29 stream_id: StreamId,
30}
31
32// We can never have more than `StreamId::MAX` streams in the store,
33// so we can save a smaller index (u32 vs usize).
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35struct SlabIndex(u32);
36
37#[derive(Debug)]
38pub(super) struct Queue<N> {
39 indices: Option<store::Indices>,
40 _p: PhantomData<N>,
41}
42
43pub(super) trait Next {
44 fn next(stream: &Stream) -> Option<Key>;
45
46 fn set_next(stream: &mut Stream, key: Option<Key>);
47
48 fn take_next(stream: &mut Stream) -> Option<Key>;
49
50 fn is_queued(stream: &Stream) -> bool;
51
52 fn set_queued(stream: &mut Stream, val: bool);
53}
54
55/// A linked list
56#[derive(Debug, Clone, Copy)]
57struct Indices {
58 pub head: Key,
59 pub tail: Key,
60}
61
62pub(super) enum Entry<'a> {
63 Occupied(OccupiedEntry<'a>),
64 Vacant(VacantEntry<'a>),
65}
66
67pub(super) struct OccupiedEntry<'a> {
68 ids: indexmap::map::OccupiedEntry<'a, StreamId, SlabIndex>,
69}
70
71pub(super) struct VacantEntry<'a> {
72 ids: indexmap::map::VacantEntry<'a, StreamId, SlabIndex>,
73 slab: &'a mut slab::Slab<Stream>,
74}
75
76pub(super) trait Resolve {
77 fn resolve(&mut self, key: Key) -> Ptr;
78}
79
80// ===== impl Store =====
81
82impl Store {
83 pub fn new() -> Self {
84 Store {
85 slab: slab::Slab::new(),
86 ids: IndexMap::new(),
87 }
88 }
89
90 pub fn find_mut(&mut self, id: &StreamId) -> Option<Ptr> {
91 let index = match self.ids.get(id) {
92 Some(key) => *key,
93 None => return None,
94 };
95
96 Some(Ptr {
97 key: Key {
98 index,
99 stream_id: *id,
100 },
101 store: self,
102 })
103 }
104
105 pub fn insert(&mut self, id: StreamId, val: Stream) -> Ptr {
106 let index = SlabIndex(self.slab.insert(val) as u32);
107 assert!(self.ids.insert(id, index).is_none());
108
109 Ptr {
110 key: Key {
111 index,
112 stream_id: id,
113 },
114 store: self,
115 }
116 }
117
118 pub fn find_entry(&mut self, id: StreamId) -> Entry {
119 use self::indexmap::map::Entry::*;
120
121 match self.ids.entry(id) {
122 Occupied(e) => Entry::Occupied(OccupiedEntry { ids: e }),
123 Vacant(e) => Entry::Vacant(VacantEntry {
124 ids: e,
125 slab: &mut self.slab,
126 }),
127 }
128 }
129
130 #[allow(clippy::blocks_in_conditions)]
131 pub(crate) fn for_each<F>(&mut self, mut f: F)
132 where
133 F: FnMut(Ptr),
134 {
135 match self.try_for_each(|ptr| {
136 f(ptr);
137 Ok::<_, Infallible>(())
138 }) {
139 Ok(()) => (),
140 #[allow(unused)]
141 Err(infallible) => match infallible {},
142 }
143 }
144
145 pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
146 where
147 F: FnMut(Ptr) -> Result<(), E>,
148 {
149 let mut len = self.ids.len();
150 let mut i = 0;
151
152 while i < len {
153 // Get the key by index, this makes the borrow checker happy
154 let (stream_id, index) = {
155 let entry = self.ids.get_index(i).unwrap();
156 (*entry.0, *entry.1)
157 };
158
159 f(Ptr {
160 key: Key { index, stream_id },
161 store: self,
162 })?;
163
164 // TODO: This logic probably could be better...
165 let new_len = self.ids.len();
166
167 if new_len < len {
168 debug_assert!(new_len == len - 1);
169 len -= 1;
170 } else {
171 i += 1;
172 }
173 }
174
175 Ok(())
176 }
177}
178
179impl Resolve for Store {
180 fn resolve(&mut self, key: Key) -> Ptr {
181 Ptr { key, store: self }
182 }
183}
184
185impl ops::Index<Key> for Store {
186 type Output = Stream;
187
188 fn index(&self, key: Key) -> &Self::Output {
189 self.slab
190 .get(key.index.0 as usize)
191 .filter(|s: &&Stream| s.id == key.stream_id)
192 .unwrap_or_else(|| {
193 panic!("dangling store key for stream_id={:?}", key.stream_id);
194 })
195 }
196}
197
198impl ops::IndexMut<Key> for Store {
199 fn index_mut(&mut self, key: Key) -> &mut Self::Output {
200 self.slab
201 .get_mut(key.index.0 as usize)
202 .filter(|s: &&mut Stream| s.id == key.stream_id)
203 .unwrap_or_else(|| {
204 panic!("dangling store key for stream_id={:?}", key.stream_id);
205 })
206 }
207}
208
209impl Store {
210 #[cfg(feature = "unstable")]
211 pub fn num_active_streams(&self) -> usize {
212 self.ids.len()
213 }
214
215 #[cfg(feature = "unstable")]
216 pub fn num_wired_streams(&self) -> usize {
217 self.slab.len()
218 }
219}
220
221// While running h2 unit/integration tests, enable this debug assertion.
222//
223// In practice, we don't need to ensure this. But the integration tests
224// help to make sure we've cleaned up in cases where we could (like, the
225// runtime isn't suddenly dropping the task for unknown reasons).
226#[cfg(feature = "unstable")]
227impl Drop for Store {
228 fn drop(&mut self) {
229 use std::thread;
230
231 if !thread::panicking() {
232 debug_assert!(self.slab.is_empty());
233 }
234 }
235}
236
237// ===== impl Queue =====
238
239impl<N> Queue<N>
240where
241 N: Next,
242{
243 pub fn new() -> Self {
244 Queue {
245 indices: None,
246 _p: PhantomData,
247 }
248 }
249
250 pub fn take(&mut self) -> Self {
251 Queue {
252 indices: self.indices.take(),
253 _p: PhantomData,
254 }
255 }
256
257 /// Queue the stream.
258 ///
259 /// If the stream is already contained by the list, return `false`.
260 pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
261 tracing::trace!("Queue::push_back");
262
263 if N::is_queued(stream) {
264 tracing::trace!(" -> already queued");
265 return false;
266 }
267
268 N::set_queued(stream, true);
269
270 // The next pointer shouldn't be set
271 debug_assert!(N::next(stream).is_none());
272
273 // Queue the stream
274 match self.indices {
275 Some(ref mut idxs) => {
276 tracing::trace!(" -> existing entries");
277
278 // Update the current tail node to point to `stream`
279 let key = stream.key();
280 N::set_next(&mut stream.resolve(idxs.tail), Some(key));
281
282 // Update the tail pointer
283 idxs.tail = stream.key();
284 }
285 None => {
286 tracing::trace!(" -> first entry");
287 self.indices = Some(store::Indices {
288 head: stream.key(),
289 tail: stream.key(),
290 });
291 }
292 }
293
294 true
295 }
296
297 /// Queue the stream
298 ///
299 /// If the stream is already contained by the list, return `false`.
300 pub fn push_front(&mut self, stream: &mut store::Ptr) -> bool {
301 tracing::trace!("Queue::push_front");
302
303 if N::is_queued(stream) {
304 tracing::trace!(" -> already queued");
305 return false;
306 }
307
308 N::set_queued(stream, true);
309
310 // The next pointer shouldn't be set
311 debug_assert!(N::next(stream).is_none());
312
313 // Queue the stream
314 match self.indices {
315 Some(ref mut idxs) => {
316 tracing::trace!(" -> existing entries");
317
318 // Update the provided stream to point to the head node
319 let head_key = stream.resolve(idxs.head).key();
320 N::set_next(stream, Some(head_key));
321
322 // Update the head pointer
323 idxs.head = stream.key();
324 }
325 None => {
326 tracing::trace!(" -> first entry");
327 self.indices = Some(store::Indices {
328 head: stream.key(),
329 tail: stream.key(),
330 });
331 }
332 }
333
334 true
335 }
336
337 pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
338 where
339 R: Resolve,
340 {
341 if let Some(mut idxs) = self.indices {
342 let mut stream = store.resolve(idxs.head);
343
344 if idxs.head == idxs.tail {
345 assert!(N::next(&stream).is_none());
346 self.indices = None;
347 } else {
348 idxs.head = N::take_next(&mut stream).unwrap();
349 self.indices = Some(idxs);
350 }
351
352 debug_assert!(N::is_queued(&stream));
353 N::set_queued(&mut stream, false);
354
355 return Some(stream);
356 }
357
358 None
359 }
360
361 pub fn is_empty(&self) -> bool {
362 self.indices.is_none()
363 }
364
365 pub fn pop_if<'a, R, F>(&mut self, store: &'a mut R, f: F) -> Option<store::Ptr<'a>>
366 where
367 R: Resolve,
368 F: Fn(&Stream) -> bool,
369 {
370 if let Some(idxs) = self.indices {
371 let should_pop = f(&store.resolve(idxs.head));
372 if should_pop {
373 return self.pop(store);
374 }
375 }
376
377 None
378 }
379}
380
381// ===== impl Ptr =====
382
383impl<'a> Ptr<'a> {
384 /// Returns the Key associated with the stream
385 pub fn key(&self) -> Key {
386 self.key
387 }
388
389 pub fn store_mut(&mut self) -> &mut Store {
390 self.store
391 }
392
393 /// Remove the stream from the store
394 pub fn remove(self) -> StreamId {
395 // The stream must have been unlinked before this point
396 debug_assert!(!self.store.ids.contains_key(&self.key.stream_id));
397
398 // Remove the stream state
399 let stream = self.store.slab.remove(self.key.index.0 as usize);
400 assert_eq!(stream.id, self.key.stream_id);
401 stream.id
402 }
403
404 /// Remove the StreamId -> stream state association.
405 ///
406 /// This will effectively remove the stream as far as the H2 protocol is
407 /// concerned.
408 pub fn unlink(&mut self) {
409 let id = self.key.stream_id;
410 self.store.ids.swap_remove(&id);
411 }
412}
413
414impl<'a> Resolve for Ptr<'a> {
415 fn resolve(&mut self, key: Key) -> Ptr {
416 Ptr {
417 key,
418 store: &mut *self.store,
419 }
420 }
421}
422
423impl<'a> ops::Deref for Ptr<'a> {
424 type Target = Stream;
425
426 fn deref(&self) -> &Stream {
427 &self.store[self.key]
428 }
429}
430
431impl<'a> ops::DerefMut for Ptr<'a> {
432 fn deref_mut(&mut self) -> &mut Stream {
433 &mut self.store[self.key]
434 }
435}
436
437impl<'a> fmt::Debug for Ptr<'a> {
438 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
439 (**self).fmt(fmt)
440 }
441}
442
443// ===== impl OccupiedEntry =====
444
445impl<'a> OccupiedEntry<'a> {
446 pub fn key(&self) -> Key {
447 let stream_id: StreamId = *self.ids.key();
448 let index: SlabIndex = *self.ids.get();
449 Key { index, stream_id }
450 }
451}
452
453// ===== impl VacantEntry =====
454
455impl<'a> VacantEntry<'a> {
456 pub fn insert(self, value: Stream) -> Key {
457 // Insert the value in the slab
458 let stream_id: StreamId = value.id;
459 let index: SlabIndex = SlabIndex(self.slab.insert(val:value) as u32);
460
461 // Insert the handle in the ID map
462 self.ids.insert(index);
463
464 Key { index, stream_id }
465 }
466}
467