1use super::*;
2
3use indexmap::{self, IndexMap};
4
5use std::convert::Infallible;
6use std::fmt;
7use std::marker::PhantomData;
8use std::ops;
9
10/// Storage for streams
11#[derive(Debug)]
12pub(super) struct Store {
13 slab: slab::Slab<Stream>,
14 ids: IndexMap<StreamId, SlabIndex>,
15}
16
17/// "Pointer" to an entry in the store
18pub(super) struct Ptr<'a> {
19 key: Key,
20 store: &'a mut Store,
21}
22
23/// References an entry in the store.
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub(crate) struct Key {
26 index: SlabIndex,
27 /// Keep the stream ID in the key as an ABA guard, since slab indices
28 /// could be re-used with a new stream.
29 stream_id: StreamId,
30}
31
32// We can never have more than `StreamId::MAX` streams in the store,
33// so we can save a smaller index (u32 vs usize).
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35struct SlabIndex(u32);
36
37#[derive(Debug)]
38pub(super) struct Queue<N> {
39 indices: Option<store::Indices>,
40 _p: PhantomData<N>,
41}
42
43pub(super) trait Next {
44 fn next(stream: &Stream) -> Option<Key>;
45
46 fn set_next(stream: &mut Stream, key: Option<Key>);
47
48 fn take_next(stream: &mut Stream) -> Option<Key>;
49
50 fn is_queued(stream: &Stream) -> bool;
51
52 fn set_queued(stream: &mut Stream, val: bool);
53}
54
55/// A linked list
56#[derive(Debug, Clone, Copy)]
57struct Indices {
58 pub head: Key,
59 pub tail: Key,
60}
61
62pub(super) enum Entry<'a> {
63 Occupied(OccupiedEntry<'a>),
64 Vacant(VacantEntry<'a>),
65}
66
67pub(super) struct OccupiedEntry<'a> {
68 ids: indexmap::map::OccupiedEntry<'a, StreamId, SlabIndex>,
69}
70
71pub(super) struct VacantEntry<'a> {
72 ids: indexmap::map::VacantEntry<'a, StreamId, SlabIndex>,
73 slab: &'a mut slab::Slab<Stream>,
74}
75
76pub(super) trait Resolve {
77 fn resolve(&mut self, key: Key) -> Ptr;
78}
79
80// ===== impl Store =====
81
82impl Store {
83 pub fn new() -> Self {
84 Store {
85 slab: slab::Slab::new(),
86 ids: IndexMap::new(),
87 }
88 }
89
90 pub fn find_mut(&mut self, id: &StreamId) -> Option<Ptr> {
91 let index = match self.ids.get(id) {
92 Some(key) => *key,
93 None => return None,
94 };
95
96 Some(Ptr {
97 key: Key {
98 index,
99 stream_id: *id,
100 },
101 store: self,
102 })
103 }
104
105 pub fn insert(&mut self, id: StreamId, val: Stream) -> Ptr {
106 let index = SlabIndex(self.slab.insert(val) as u32);
107 assert!(self.ids.insert(id, index).is_none());
108
109 Ptr {
110 key: Key {
111 index,
112 stream_id: id,
113 },
114 store: self,
115 }
116 }
117
118 pub fn find_entry(&mut self, id: StreamId) -> Entry {
119 use self::indexmap::map::Entry::*;
120
121 match self.ids.entry(id) {
122 Occupied(e) => Entry::Occupied(OccupiedEntry { ids: e }),
123 Vacant(e) => Entry::Vacant(VacantEntry {
124 ids: e,
125 slab: &mut self.slab,
126 }),
127 }
128 }
129
130 pub(crate) fn for_each<F>(&mut self, mut f: F)
131 where
132 F: FnMut(Ptr),
133 {
134 match self.try_for_each(|ptr| {
135 f(ptr);
136 Ok::<_, Infallible>(())
137 }) {
138 Ok(()) => (),
139 Err(infallible) => match infallible {},
140 }
141 }
142
143 pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
144 where
145 F: FnMut(Ptr) -> Result<(), E>,
146 {
147 let mut len = self.ids.len();
148 let mut i = 0;
149
150 while i < len {
151 // Get the key by index, this makes the borrow checker happy
152 let (stream_id, index) = {
153 let entry = self.ids.get_index(i).unwrap();
154 (*entry.0, *entry.1)
155 };
156
157 f(Ptr {
158 key: Key { index, stream_id },
159 store: self,
160 })?;
161
162 // TODO: This logic probably could be better...
163 let new_len = self.ids.len();
164
165 if new_len < len {
166 debug_assert!(new_len == len - 1);
167 len -= 1;
168 } else {
169 i += 1;
170 }
171 }
172
173 Ok(())
174 }
175}
176
177impl Resolve for Store {
178 fn resolve(&mut self, key: Key) -> Ptr {
179 Ptr { key, store: self }
180 }
181}
182
183impl ops::Index<Key> for Store {
184 type Output = Stream;
185
186 fn index(&self, key: Key) -> &Self::Output {
187 self.slab
188 .get(key:key.index.0 as usize)
189 .filter(|s: &&Stream| s.id == key.stream_id)
190 .unwrap_or_else(|| {
191 panic!("dangling store key for stream_id={:?}", key.stream_id);
192 })
193 }
194}
195
196impl ops::IndexMut<Key> for Store {
197 fn index_mut(&mut self, key: Key) -> &mut Self::Output {
198 self.slab
199 .get_mut(key:key.index.0 as usize)
200 .filter(|s: &&mut Stream| s.id == key.stream_id)
201 .unwrap_or_else(|| {
202 panic!("dangling store key for stream_id={:?}", key.stream_id);
203 })
204 }
205}
206
207impl Store {
208 #[cfg(feature = "unstable")]
209 pub fn num_active_streams(&self) -> usize {
210 self.ids.len()
211 }
212
213 #[cfg(feature = "unstable")]
214 pub fn num_wired_streams(&self) -> usize {
215 self.slab.len()
216 }
217}
218
219// While running h2 unit/integration tests, enable this debug assertion.
220//
221// In practice, we don't need to ensure this. But the integration tests
222// help to make sure we've cleaned up in cases where we could (like, the
223// runtime isn't suddenly dropping the task for unknown reasons).
224#[cfg(feature = "unstable")]
225impl Drop for Store {
226 fn drop(&mut self) {
227 use std::thread;
228
229 if !thread::panicking() {
230 debug_assert!(self.slab.is_empty());
231 }
232 }
233}
234
235// ===== impl Queue =====
236
237impl<N> Queue<N>
238where
239 N: Next,
240{
241 pub fn new() -> Self {
242 Queue {
243 indices: None,
244 _p: PhantomData,
245 }
246 }
247
248 pub fn take(&mut self) -> Self {
249 Queue {
250 indices: self.indices.take(),
251 _p: PhantomData,
252 }
253 }
254
255 /// Queue the stream.
256 ///
257 /// If the stream is already contained by the list, return `false`.
258 pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
259 tracing::trace!("Queue::push_back");
260
261 if N::is_queued(stream) {
262 tracing::trace!(" -> already queued");
263 return false;
264 }
265
266 N::set_queued(stream, true);
267
268 // The next pointer shouldn't be set
269 debug_assert!(N::next(stream).is_none());
270
271 // Queue the stream
272 match self.indices {
273 Some(ref mut idxs) => {
274 tracing::trace!(" -> existing entries");
275
276 // Update the current tail node to point to `stream`
277 let key = stream.key();
278 N::set_next(&mut stream.resolve(idxs.tail), Some(key));
279
280 // Update the tail pointer
281 idxs.tail = stream.key();
282 }
283 None => {
284 tracing::trace!(" -> first entry");
285 self.indices = Some(store::Indices {
286 head: stream.key(),
287 tail: stream.key(),
288 });
289 }
290 }
291
292 true
293 }
294
295 /// Queue the stream
296 ///
297 /// If the stream is already contained by the list, return `false`.
298 pub fn push_front(&mut self, stream: &mut store::Ptr) -> bool {
299 tracing::trace!("Queue::push_front");
300
301 if N::is_queued(stream) {
302 tracing::trace!(" -> already queued");
303 return false;
304 }
305
306 N::set_queued(stream, true);
307
308 // The next pointer shouldn't be set
309 debug_assert!(N::next(stream).is_none());
310
311 // Queue the stream
312 match self.indices {
313 Some(ref mut idxs) => {
314 tracing::trace!(" -> existing entries");
315
316 // Update the provided stream to point to the head node
317 let head_key = stream.resolve(idxs.head).key();
318 N::set_next(stream, Some(head_key));
319
320 // Update the head pointer
321 idxs.head = stream.key();
322 }
323 None => {
324 tracing::trace!(" -> first entry");
325 self.indices = Some(store::Indices {
326 head: stream.key(),
327 tail: stream.key(),
328 });
329 }
330 }
331
332 true
333 }
334
335 pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
336 where
337 R: Resolve,
338 {
339 if let Some(mut idxs) = self.indices {
340 let mut stream = store.resolve(idxs.head);
341
342 if idxs.head == idxs.tail {
343 assert!(N::next(&stream).is_none());
344 self.indices = None;
345 } else {
346 idxs.head = N::take_next(&mut stream).unwrap();
347 self.indices = Some(idxs);
348 }
349
350 debug_assert!(N::is_queued(&stream));
351 N::set_queued(&mut stream, false);
352
353 return Some(stream);
354 }
355
356 None
357 }
358
359 pub fn is_empty(&self) -> bool {
360 self.indices.is_none()
361 }
362
363 pub fn pop_if<'a, R, F>(&mut self, store: &'a mut R, f: F) -> Option<store::Ptr<'a>>
364 where
365 R: Resolve,
366 F: Fn(&Stream) -> bool,
367 {
368 if let Some(idxs) = self.indices {
369 let should_pop = f(&store.resolve(idxs.head));
370 if should_pop {
371 return self.pop(store);
372 }
373 }
374
375 None
376 }
377}
378
379// ===== impl Ptr =====
380
381impl<'a> Ptr<'a> {
382 /// Returns the Key associated with the stream
383 pub fn key(&self) -> Key {
384 self.key
385 }
386
387 pub fn store_mut(&mut self) -> &mut Store {
388 self.store
389 }
390
391 /// Remove the stream from the store
392 pub fn remove(self) -> StreamId {
393 // The stream must have been unlinked before this point
394 debug_assert!(!self.store.ids.contains_key(&self.key.stream_id));
395
396 // Remove the stream state
397 let stream = self.store.slab.remove(self.key.index.0 as usize);
398 assert_eq!(stream.id, self.key.stream_id);
399 stream.id
400 }
401
402 /// Remove the StreamId -> stream state association.
403 ///
404 /// This will effectively remove the stream as far as the H2 protocol is
405 /// concerned.
406 pub fn unlink(&mut self) {
407 let id = self.key.stream_id;
408 self.store.ids.swap_remove(&id);
409 }
410}
411
412impl<'a> Resolve for Ptr<'a> {
413 fn resolve(&mut self, key: Key) -> Ptr {
414 Ptr {
415 key,
416 store: &mut *self.store,
417 }
418 }
419}
420
421impl<'a> ops::Deref for Ptr<'a> {
422 type Target = Stream;
423
424 fn deref(&self) -> &Stream {
425 &self.store[self.key]
426 }
427}
428
429impl<'a> ops::DerefMut for Ptr<'a> {
430 fn deref_mut(&mut self) -> &mut Stream {
431 &mut self.store[self.key]
432 }
433}
434
435impl<'a> fmt::Debug for Ptr<'a> {
436 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
437 (**self).fmt(fmt)
438 }
439}
440
441// ===== impl OccupiedEntry =====
442
443impl<'a> OccupiedEntry<'a> {
444 pub fn key(&self) -> Key {
445 let stream_id: StreamId = *self.ids.key();
446 let index: SlabIndex = *self.ids.get();
447 Key { index, stream_id }
448 }
449}
450
451// ===== impl VacantEntry =====
452
453impl<'a> VacantEntry<'a> {
454 pub fn insert(self, value: Stream) -> Key {
455 // Insert the value in the slab
456 let stream_id: StreamId = value.id;
457 let index: SlabIndex = SlabIndex(self.slab.insert(val:value) as u32);
458
459 // Insert the handle in the ID map
460 self.ids.insert(index);
461
462 Key { index, stream_id }
463 }
464}
465