1 | //! A bounded single-producer single-consumer pipe. |
2 | //! |
3 | //! This crate provides a ring buffer that can be asynchronously read from and written to. It is |
4 | //! created via the [`pipe`] function, which returns a pair of [`Reader`] and [`Writer`] handles. |
5 | //! They implement the [`AsyncRead`] and [`AsyncWrite`] traits, respectively. |
6 | //! |
7 | //! The handles are single-producer/single-consumer; to clarify, they cannot be cloned and need `&mut` |
8 | //! access to read or write to them. If multiple-producer/multiple-consumer handles are needed, |
9 | //! consider wrapping them in an `Arc<Mutex<...>>` or similar. |
10 | //! |
11 | //! When the sender is dropped, remaining bytes in the pipe can still be read. After that, attempts |
12 | //! to read will result in `Ok(0)`, i.e. they will always 'successfully' read 0 bytes. |
13 | //! |
14 | //! When the receiver is dropped, the pipe is closed and no more bytes and be written into it. |
15 | //! Further writes will result in `Ok(0)`, i.e. they will always 'successfully' write 0 bytes. |
16 | //! |
17 | //! # Version 0.2.0 Notes |
18 | //! |
19 | //! Previously, this crate contained other synchronization primitives, such as bounded channels, locks, |
20 | //! and event listeners. These have been split out into their own crates: |
21 | //! |
22 | //! - [`async-channel`](https://docs.rs/async-channel) |
23 | //! - [`async-dup`](https://docs.rs/async-dup) |
24 | //! - [`async-lock`](https://docs.rs/async-lock) |
25 | //! - [`async-mutex`](https://docs.rs/async-mutex) |
26 | //! - [`event-listener`](https://docs.rs/event-listener) |
27 | //! |
28 | //! # Examples |
29 | //! |
30 | //! ## Asynchronous Tasks |
31 | //! |
32 | //! Communicate between asynchronous tasks, potentially on other threads. |
33 | //! |
34 | //! ``` |
35 | //! use async_channel::unbounded; |
36 | //! use async_executor::Executor; |
37 | //! use easy_parallel::Parallel; |
38 | //! use futures_lite::{future, prelude::*}; |
39 | //! use std::time::Duration; |
40 | //! |
41 | //! # if cfg!(miri) { return; } |
42 | //! |
43 | //! // Create a pair of handles. |
44 | //! let (mut reader, mut writer) = piper::pipe(1024); |
45 | //! |
46 | //! // Create the executor. |
47 | //! let ex = Executor::new(); |
48 | //! let (signal, shutdown) = unbounded::<()>(); |
49 | //! |
50 | //! // Spawn a detached task for random data to the pipe. |
51 | //! let writer = ex.spawn(async move { |
52 | //! for _ in 0..1_000 { |
53 | //! // Generate 8 random numnbers. |
54 | //! let random = fastrand::u64(..).to_le_bytes(); |
55 | //! |
56 | //! // Write them to the pipe. |
57 | //! writer.write_all(&random).await.unwrap(); |
58 | //! |
59 | //! // Wait a bit. |
60 | //! async_io::Timer::after(Duration::from_millis(5)).await; |
61 | //! } |
62 | //! |
63 | //! // Drop the writer to close the pipe. |
64 | //! drop(writer); |
65 | //! }); |
66 | //! |
67 | //! // Detach the task so that it runs in the background. |
68 | //! writer.detach(); |
69 | //! |
70 | //! // Spawn a task for reading from the pipe. |
71 | //! let reader = ex.spawn(async move { |
72 | //! let mut buf = vec![]; |
73 | //! |
74 | //! // Read all bytes from the pipe. |
75 | //! reader.read_to_end(&mut buf).await.unwrap(); |
76 | //! |
77 | //! println!("Random data: {:#?}" , buf); |
78 | //! }); |
79 | //! |
80 | //! Parallel::new() |
81 | //! // Run four executor threads. |
82 | //! .each(0..4, |_| future::block_on(ex.run(shutdown.recv()))) |
83 | //! // Run the main future on the current thread. |
84 | //! .finish(|| future::block_on(async { |
85 | //! // Wait for the reader to finish. |
86 | //! reader.await; |
87 | //! |
88 | //! // Signal the executor threads to shut down. |
89 | //! drop(signal); |
90 | //! })); |
91 | //! ``` |
92 | //! |
93 | //! ## Blocking I/O |
94 | //! |
95 | //! File I/O is blocking; therefore, in `async` code, you must run it on another thread. This example |
96 | //! spawns another thread for reading a file and writing it to a pipe. |
97 | //! |
98 | //! ```no_run |
99 | //! use futures_lite::{future, prelude::*}; |
100 | //! use std::fs::File; |
101 | //! use std::io::prelude::*; |
102 | //! use std::thread; |
103 | //! |
104 | //! // Create a pair of handles. |
105 | //! let (mut r, mut w) = piper::pipe(1024); |
106 | //! |
107 | //! // Spawn a thread for reading a file. |
108 | //! thread::spawn(move || { |
109 | //! let mut file = File::open("Cargo.toml" ).unwrap(); |
110 | //! |
111 | //! // Read the file into a buffer. |
112 | //! let mut buf = [0u8; 16384]; |
113 | //! future::block_on(async move { |
114 | //! loop { |
115 | //! // Read a chunk of bytes from the file. |
116 | //! // Blocking is okay here, since this is a separate thread. |
117 | //! let n = file.read(&mut buf).unwrap(); |
118 | //! if n == 0 { |
119 | //! break; |
120 | //! } |
121 | //! |
122 | //! // Write the chunk to the pipe. |
123 | //! w.write_all(&buf[..n]).await.unwrap(); |
124 | //! } |
125 | //! |
126 | //! // Close the pipe. |
127 | //! drop(w); |
128 | //! }); |
129 | //! }); |
130 | //! |
131 | //! # future::block_on(async move { |
132 | //! // Read bytes from the pipe. |
133 | //! let mut buf = vec![]; |
134 | //! r.read_to_end(&mut buf).await.unwrap(); |
135 | //! |
136 | //! println!("Read {} bytes" , buf.len()); |
137 | //! # }); |
138 | //! ``` |
139 | //! |
140 | //! However, the lower-level [`poll_fill`] and [`poll_drain`] methods take `impl Read` and `impl Write` |
141 | //! arguments, respectively. This allows you to skip the buffer entirely and read/write directly from |
142 | //! the file into the pipe. This approach should be preferred when possible, as it avoids an extra |
143 | //! copy. |
144 | //! |
145 | //! ```no_run |
146 | //! # use futures_lite::future; |
147 | //! # use std::fs::File; |
148 | //! # let mut file: File = unimplemented!(); |
149 | //! # let mut w: piper::Writer = unimplemented!(); |
150 | //! // In the `future::block_on` call above... |
151 | //! # future::block_on(async move { |
152 | //! loop { |
153 | //! let n = future::poll_fn(|cx| w.poll_fill(cx, &mut file)).await.unwrap(); |
154 | //! if n == 0 { |
155 | //! break; |
156 | //! } |
157 | //! } |
158 | //! # }); |
159 | //! ``` |
160 | //! |
161 | //! The [`blocking`] crate is preferred in this use case, since it uses more efficient strategies for |
162 | //! thread management and pipes. |
163 | //! |
164 | //! [`poll_fill`]: struct.Writer.html#method.poll_fill |
165 | //! [`poll_drain`]: struct.Reader.html#method.poll_drain |
166 | //! [`blocking`]: https://docs.rs/blocking |
167 | |
168 | #![cfg_attr (not(feature = "std" ), no_std)] |
169 | #![forbid (missing_docs)] |
170 | #![doc ( |
171 | html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" |
172 | )] |
173 | #![doc ( |
174 | html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" |
175 | )] |
176 | |
177 | extern crate alloc; |
178 | |
179 | use core::convert::Infallible; |
180 | use core::mem; |
181 | use core::slice; |
182 | use core::task::{Context, Poll}; |
183 | |
184 | use alloc::vec::Vec; |
185 | |
186 | use sync::atomic::{self, AtomicBool, AtomicUsize, Ordering}; |
187 | use sync::Arc; |
188 | |
189 | #[cfg (feature = "std" )] |
190 | use std::{ |
191 | io::{self, Read, Write}, |
192 | pin::Pin, |
193 | }; |
194 | |
195 | use atomic_waker::AtomicWaker; |
196 | |
197 | #[cfg (feature = "std" )] |
198 | use futures_io::{AsyncRead, AsyncWrite}; |
199 | |
200 | macro_rules! ready { |
201 | ($e:expr) => {{ |
202 | match $e { |
203 | Poll::Ready(t) => t, |
204 | Poll::Pending => return Poll::Pending, |
205 | } |
206 | }}; |
207 | } |
208 | |
209 | /// Creates a bounded single-producer single-consumer pipe. |
210 | /// |
211 | /// A pipe is a ring buffer of `cap` bytes that can be asynchronously read from and written to. |
212 | /// |
213 | /// See the [crate-level documentation](index.html) for more details. |
214 | /// |
215 | /// # Panics |
216 | /// |
217 | /// This function panics if `cap` is 0 or if `cap * 2` overflows a `usize`. |
218 | #[allow (clippy::incompatible_msrv)] // false positive: https://github.com/rust-lang/rust-clippy/issues/12280 |
219 | pub fn pipe(cap: usize) -> (Reader, Writer) { |
220 | assert!(cap > 0, "capacity must be positive" ); |
221 | assert!(cap.checked_mul(2).is_some(), "capacity is too large" ); |
222 | |
223 | // Allocate the ring buffer. |
224 | let mut v = Vec::with_capacity(cap); |
225 | let buffer = v.as_mut_ptr(); |
226 | mem::forget(v); |
227 | |
228 | let inner = Arc::new(Pipe { |
229 | head: AtomicUsize::new(0), |
230 | tail: AtomicUsize::new(0), |
231 | reader: AtomicWaker::new(), |
232 | writer: AtomicWaker::new(), |
233 | closed: AtomicBool::new(false), |
234 | buffer, |
235 | cap, |
236 | }); |
237 | |
238 | // Use a random number generator to randomize fair yielding behavior. |
239 | let mut rng = rng(); |
240 | |
241 | let r = Reader { |
242 | inner: inner.clone(), |
243 | head: 0, |
244 | tail: 0, |
245 | rng: rng.fork(), |
246 | }; |
247 | |
248 | let w = Writer { |
249 | inner, |
250 | head: 0, |
251 | tail: 0, |
252 | zeroed_until: 0, |
253 | rng, |
254 | }; |
255 | |
256 | (r, w) |
257 | } |
258 | |
259 | /// The reading side of a pipe. |
260 | /// |
261 | /// This type is created by the [`pipe`] function. See its documentation for more details. |
262 | pub struct Reader { |
263 | /// The inner ring buffer. |
264 | inner: Arc<Pipe>, |
265 | |
266 | /// The head index, moved by the reader, in the range `0..2*cap`. |
267 | /// |
268 | /// This index always matches `inner.head`. |
269 | head: usize, |
270 | |
271 | /// The tail index, moved by the writer, in the range `0..2*cap`. |
272 | /// |
273 | /// This index is a snapshot of `index.tail` that might become stale at any point. |
274 | tail: usize, |
275 | |
276 | /// Random number generator. |
277 | rng: fastrand::Rng, |
278 | } |
279 | |
280 | /// The writing side of a pipe. |
281 | /// |
282 | /// This type is created by the [`pipe`] function. See its documentation for more details. |
283 | pub struct Writer { |
284 | /// The inner ring buffer. |
285 | inner: Arc<Pipe>, |
286 | |
287 | /// The head index, moved by the reader, in the range `0..2*cap`. |
288 | /// |
289 | /// This index is a snapshot of `index.head` that might become stale at any point. |
290 | head: usize, |
291 | |
292 | /// The tail index, moved by the writer, in the range `0..2*cap`. |
293 | /// |
294 | /// This index always matches `inner.tail`. |
295 | tail: usize, |
296 | |
297 | /// How many bytes at the beginning of the buffer have been zeroed. |
298 | /// |
299 | /// The pipe allocates an uninitialized buffer, and we must be careful about passing |
300 | /// uninitialized data to user code. Zeroing the buffer right after allocation would be too |
301 | /// expensive, so we zero it in smaller chunks as the writer makes progress. |
302 | zeroed_until: usize, |
303 | |
304 | /// Random number generator. |
305 | rng: fastrand::Rng, |
306 | } |
307 | |
308 | /// The inner ring buffer. |
309 | /// |
310 | /// Head and tail indices are in the range `0..2*cap`, even though they really map onto the |
311 | /// `0..cap` range. The distance between head and tail indices is never more than `cap`. |
312 | /// |
313 | /// The reason why indices are not in the range `0..cap` is because we need to distinguish between |
314 | /// the pipe being empty and being full. If head and tail were in `0..cap`, then `head == tail` |
315 | /// could mean the pipe is either empty or full, but we don't know which! |
316 | struct Pipe { |
317 | /// The head index, moved by the reader, in the range `0..2*cap`. |
318 | head: AtomicUsize, |
319 | |
320 | /// The tail index, moved by the writer, in the range `0..2*cap`. |
321 | tail: AtomicUsize, |
322 | |
323 | /// A waker representing the blocked reader. |
324 | reader: AtomicWaker, |
325 | |
326 | /// A waker representing the blocked writer. |
327 | writer: AtomicWaker, |
328 | |
329 | /// Set to `true` if the reader or writer was dropped. |
330 | closed: AtomicBool, |
331 | |
332 | /// The byte buffer. |
333 | buffer: *mut u8, |
334 | |
335 | /// The buffer capacity. |
336 | cap: usize, |
337 | } |
338 | |
339 | unsafe impl Sync for Pipe {} |
340 | unsafe impl Send for Pipe {} |
341 | |
342 | impl Drop for Pipe { |
343 | fn drop(&mut self) { |
344 | // Deallocate the byte buffer. |
345 | unsafe { |
346 | Vec::from_raw_parts(self.buffer, length:0, self.cap); |
347 | } |
348 | } |
349 | } |
350 | |
351 | impl Drop for Reader { |
352 | fn drop(&mut self) { |
353 | // Dropping closes the pipe and then wakes the writer. |
354 | self.inner.closed.store(val:true, order:Ordering::SeqCst); |
355 | self.inner.writer.wake(); |
356 | } |
357 | } |
358 | |
359 | impl Drop for Writer { |
360 | fn drop(&mut self) { |
361 | // Dropping closes the pipe and then wakes the reader. |
362 | self.inner.closed.store(val:true, order:Ordering::SeqCst); |
363 | self.inner.reader.wake(); |
364 | } |
365 | } |
366 | |
367 | impl Pipe { |
368 | /// Get the length of the data in the pipe. |
369 | fn len(&self) -> usize { |
370 | let head: usize = self.head.load(order:Ordering::Acquire); |
371 | let tail: usize = self.tail.load(order:Ordering::Acquire); |
372 | |
373 | if head <= tail { |
374 | tail - head |
375 | } else { |
376 | (2 * self.cap) - (head - tail) |
377 | } |
378 | } |
379 | } |
380 | |
381 | impl Reader { |
382 | /// Gets the total length of the data in the pipe. |
383 | /// |
384 | /// This method returns the number of bytes that have been written into the pipe but haven't been |
385 | /// read yet. |
386 | /// |
387 | /// # Examples |
388 | /// |
389 | /// ``` |
390 | /// let (mut reader, mut writer) = piper::pipe(10); |
391 | /// let _ = writer.try_fill(&[0u8; 5]); |
392 | /// assert_eq!(reader.len(), 5); |
393 | /// ``` |
394 | pub fn len(&self) -> usize { |
395 | self.inner.len() |
396 | } |
397 | |
398 | /// Tell whether or not the pipe is empty. |
399 | /// |
400 | /// This method returns `true` if the pipe is empty, and `false` otherwise. |
401 | /// |
402 | /// # Examples |
403 | /// |
404 | /// ``` |
405 | /// let (mut reader, mut writer) = piper::pipe(10); |
406 | /// assert!(reader.is_empty()); |
407 | /// let _ = writer.try_fill(&[0u8; 5]); |
408 | /// assert!(!reader.is_empty()); |
409 | /// ``` |
410 | pub fn is_empty(&self) -> bool { |
411 | self.inner.len() == 0 |
412 | } |
413 | |
414 | /// Gets the total capacity of the pipe. |
415 | /// |
416 | /// This method returns the number of bytes that the pipe can hold at a time. |
417 | /// |
418 | /// # Examples |
419 | /// |
420 | /// ``` |
421 | /// # futures_lite::future::block_on(async { |
422 | /// let (reader, _) = piper::pipe(10); |
423 | /// assert_eq!(reader.capacity(), 10); |
424 | /// # }); |
425 | /// ``` |
426 | pub fn capacity(&self) -> usize { |
427 | self.inner.cap |
428 | } |
429 | |
430 | /// Tell whether or not the pipe is full. |
431 | /// |
432 | /// The pipe is full if the number of bytes written into it is equal to its capacity. At this point, |
433 | /// writes will block until some data is read from the pipe. |
434 | /// |
435 | /// This method returns `true` if the pipe is full, and `false` otherwise. |
436 | /// |
437 | /// # Examples |
438 | /// |
439 | /// ``` |
440 | /// let (mut reader, mut writer) = piper::pipe(10); |
441 | /// assert!(!reader.is_full()); |
442 | /// let _ = writer.try_fill(&[0u8; 10]); |
443 | /// assert!(reader.is_full()); |
444 | /// let _ = reader.try_drain(&mut [0u8; 5]); |
445 | /// assert!(!reader.is_full()); |
446 | /// ``` |
447 | pub fn is_full(&self) -> bool { |
448 | self.inner.len() == self.inner.cap |
449 | } |
450 | |
451 | /// Tell whether or not the pipe is closed. |
452 | /// |
453 | /// The pipe is closed if either the reader or the writer has been dropped. At this point, attempting |
454 | /// to write into the pipe will return `Poll::Ready(Ok(0))` and attempting to read from the pipe after |
455 | /// any previously written bytes are read will return `Poll::Ready(Ok(0))`. |
456 | /// |
457 | /// # Examples |
458 | /// |
459 | /// ``` |
460 | /// # futures_lite::future::block_on(async { |
461 | /// let (mut reader, mut writer) = piper::pipe(10); |
462 | /// assert!(!reader.is_closed()); |
463 | /// drop(writer); |
464 | /// assert!(reader.is_closed()); |
465 | /// # }); |
466 | /// ``` |
467 | pub fn is_closed(&self) -> bool { |
468 | self.inner.closed.load(Ordering::SeqCst) |
469 | } |
470 | |
471 | /// Reads bytes from this reader and writes into blocking `dest`. |
472 | /// |
473 | /// This method reads directly from the pipe's internal buffer into `dest`. This avoids an extra copy, |
474 | /// but it may block the thread if `dest` blocks. |
475 | /// |
476 | /// If the pipe is empty, this method returns `Poll::Pending`. If the pipe is closed, this method |
477 | /// returns `Poll::Ready(Ok(0))`. Errors in `dest` are bubbled up through `Poll::Ready(Err(e))`. |
478 | /// Otherwise, this method returns `Poll::Ready(Ok(n))` where `n` is the number of bytes written. |
479 | /// |
480 | /// This method is only available when the `std` feature is enabled. For `no_std` environments, |
481 | /// consider using [`poll_drain_bytes`] instead. |
482 | /// |
483 | /// [`poll_drain_bytes`]: #method.poll_drain_bytes |
484 | /// |
485 | /// # Examples |
486 | /// |
487 | /// ``` |
488 | /// use futures_lite::{future, prelude::*}; |
489 | /// # future::block_on(async { |
490 | /// |
491 | /// let (mut r, mut w) = piper::pipe(1024); |
492 | /// |
493 | /// // Write some data to the pipe. |
494 | /// w.write_all(b"hello world" ).await.unwrap(); |
495 | /// |
496 | /// // Try reading from the pipe. |
497 | /// let mut buf = [0; 1024]; |
498 | /// let n = future::poll_fn(|cx| r.poll_drain(cx, &mut buf[..])).await.unwrap(); |
499 | /// |
500 | /// // The data was written to the buffer. |
501 | /// assert_eq!(&buf[..n], b"hello world" ); |
502 | /// # }); |
503 | /// ``` |
504 | #[cfg (feature = "std" )] |
505 | pub fn poll_drain( |
506 | &mut self, |
507 | cx: &mut Context<'_>, |
508 | dest: impl Write, |
509 | ) -> Poll<io::Result<usize>> { |
510 | self.drain_inner(Some(cx), dest) |
511 | } |
512 | |
513 | /// Reads bytes from this reader. |
514 | /// |
515 | /// Rather than taking a `Write` trait object, this method takes a slice of bytes to write into. |
516 | /// Because of this, it is infallible and can be used in `no_std` environments. |
517 | /// |
518 | /// The same conditions that apply to [`poll_drain`] apply to this method. |
519 | /// |
520 | /// [`poll_drain`]: #method.poll_drain |
521 | /// |
522 | /// # Examples |
523 | /// |
524 | /// ``` |
525 | /// use futures_lite::{future, prelude::*}; |
526 | /// # future::block_on(async { |
527 | /// let (mut r, mut w) = piper::pipe(1024); |
528 | /// |
529 | /// // Write some data to the pipe. |
530 | /// w.write_all(b"hello world" ).await.unwrap(); |
531 | /// |
532 | /// // Try reading from the pipe. |
533 | /// let mut buf = [0; 1024]; |
534 | /// let n = future::poll_fn(|cx| r.poll_drain_bytes(cx, &mut buf[..])).await; |
535 | /// |
536 | /// // The data was written to the buffer. |
537 | /// assert_eq!(&buf[..n], b"hello world" ); |
538 | /// # }); |
539 | /// ``` |
540 | pub fn poll_drain_bytes(&mut self, cx: &mut Context<'_>, dest: &mut [u8]) -> Poll<usize> { |
541 | match self.drain_inner(Some(cx), WriteBytes(dest)) { |
542 | Poll::Ready(Ok(n)) => Poll::Ready(n), |
543 | Poll::Ready(Err(e)) => match e {}, |
544 | Poll::Pending => Poll::Pending, |
545 | } |
546 | } |
547 | |
548 | /// Tries to read bytes from this reader. |
549 | /// |
550 | /// Returns the total number of bytes that were read from this reader. |
551 | /// |
552 | /// # Examples |
553 | /// |
554 | /// ``` |
555 | /// let (mut r, mut w) = piper::pipe(1024); |
556 | /// |
557 | /// // `try_drain()` returns 0 off the bat. |
558 | /// let mut buf = [0; 10]; |
559 | /// assert_eq!(r.try_drain(&mut buf), 0); |
560 | /// |
561 | /// // After a write it returns the data. |
562 | /// w.try_fill(&[0, 1, 2, 3, 4]); |
563 | /// assert_eq!(r.try_drain(&mut buf), 5); |
564 | /// assert_eq!(&buf[..5], &[0, 1, 2, 3, 4]); |
565 | /// ``` |
566 | pub fn try_drain(&mut self, dest: &mut [u8]) -> usize { |
567 | match self.drain_inner(None, WriteBytes(dest)) { |
568 | Poll::Ready(Ok(n)) => n, |
569 | Poll::Ready(Err(e)) => match e {}, |
570 | Poll::Pending => 0, |
571 | } |
572 | } |
573 | |
574 | /// Reads bytes from this reader and writes into blocking `dest`. |
575 | #[inline ] |
576 | fn drain_inner<W: WriteLike>( |
577 | &mut self, |
578 | mut cx: Option<&mut Context<'_>>, |
579 | mut dest: W, |
580 | ) -> Poll<Result<usize, W::Error>> { |
581 | let cap = self.inner.cap; |
582 | |
583 | // Calculates the distance between two indices. |
584 | let distance = |a: usize, b: usize| { |
585 | if a <= b { |
586 | b - a |
587 | } else { |
588 | 2 * cap - (a - b) |
589 | } |
590 | }; |
591 | |
592 | // If the pipe appears to be empty... |
593 | if distance(self.head, self.tail) == 0 { |
594 | // Reload the tail in case it's become stale. |
595 | self.tail = self.inner.tail.load(Ordering::Acquire); |
596 | |
597 | // If the pipe is now really empty... |
598 | if distance(self.head, self.tail) == 0 { |
599 | // Register the waker. |
600 | if let Some(cx) = cx.as_mut() { |
601 | self.inner.reader.register(cx.waker()); |
602 | } |
603 | atomic::fence(Ordering::SeqCst); |
604 | |
605 | // Reload the tail after registering the waker. |
606 | self.tail = self.inner.tail.load(Ordering::Acquire); |
607 | |
608 | // If the pipe is still empty... |
609 | if distance(self.head, self.tail) == 0 { |
610 | // Check whether the pipe is closed or just empty. |
611 | if self.inner.closed.load(Ordering::Relaxed) { |
612 | return Poll::Ready(Ok(0)); |
613 | } else { |
614 | return Poll::Pending; |
615 | } |
616 | } |
617 | } |
618 | } |
619 | |
620 | // The pipe is not empty so remove the waker. |
621 | self.inner.reader.take(); |
622 | |
623 | // Yield with some small probability - this improves fairness. |
624 | if let Some(cx) = cx { |
625 | ready!(maybe_yield(&mut self.rng, cx)); |
626 | } |
627 | |
628 | // Given an index in `0..2*cap`, returns the real index in `0..cap`. |
629 | let real_index = |i: usize| { |
630 | if i < cap { |
631 | i |
632 | } else { |
633 | i - cap |
634 | } |
635 | }; |
636 | |
637 | // Number of bytes read so far. |
638 | let mut count = 0; |
639 | |
640 | loop { |
641 | // Calculate how many bytes to read in this iteration. |
642 | let n = (128 * 1024) // Not too many bytes in one go - better to wake the writer soon! |
643 | .min(distance(self.head, self.tail)) // No more than bytes in the pipe. |
644 | .min(cap - real_index(self.head)); // Don't go past the buffer boundary. |
645 | |
646 | // Create a slice of data in the pipe buffer. |
647 | let pipe_slice = |
648 | unsafe { slice::from_raw_parts(self.inner.buffer.add(real_index(self.head)), n) }; |
649 | |
650 | // Copy bytes from the pipe buffer into `dest`. |
651 | let n = dest.write(pipe_slice)?; |
652 | count += n; |
653 | |
654 | // If pipe is empty or `dest` is full, return. |
655 | if n == 0 { |
656 | return Poll::Ready(Ok(count)); |
657 | } |
658 | |
659 | // Move the head forward. |
660 | if self.head + n < 2 * cap { |
661 | self.head += n; |
662 | } else { |
663 | self.head = 0; |
664 | } |
665 | |
666 | // Store the current head index. |
667 | self.inner.head.store(self.head, Ordering::Release); |
668 | |
669 | // Wake the writer because the pipe is not full. |
670 | self.inner.writer.wake(); |
671 | } |
672 | } |
673 | } |
674 | |
675 | #[cfg (feature = "std" )] |
676 | impl AsyncRead for Reader { |
677 | fn poll_read( |
678 | mut self: Pin<&mut Self>, |
679 | cx: &mut Context<'_>, |
680 | buf: &mut [u8], |
681 | ) -> Poll<io::Result<usize>> { |
682 | self.poll_drain_bytes(cx, dest:buf).map(Ok) |
683 | } |
684 | } |
685 | |
686 | impl Writer { |
687 | /// Gets the total length of the data in the pipe. |
688 | /// |
689 | /// This method returns the number of bytes that have been written into the pipe but haven't been |
690 | /// read yet. |
691 | /// |
692 | /// # Examples |
693 | /// |
694 | /// ``` |
695 | /// let (_reader, mut writer) = piper::pipe(10); |
696 | /// let _ = writer.try_fill(&[0u8; 5]); |
697 | /// assert_eq!(writer.len(), 5); |
698 | /// ``` |
699 | pub fn len(&self) -> usize { |
700 | self.inner.len() |
701 | } |
702 | |
703 | /// Tell whether or not the pipe is empty. |
704 | /// |
705 | /// This method returns `true` if the pipe is empty, and `false` otherwise. |
706 | /// |
707 | /// # Examples |
708 | /// |
709 | /// ``` |
710 | /// let (_reader, mut writer) = piper::pipe(10); |
711 | /// assert!(writer.is_empty()); |
712 | /// let _ = writer.try_fill(&[0u8; 5]); |
713 | /// assert!(!writer.is_empty()); |
714 | /// ``` |
715 | pub fn is_empty(&self) -> bool { |
716 | self.inner.len() == 0 |
717 | } |
718 | |
719 | /// Gets the total capacity of the pipe. |
720 | /// |
721 | /// This method returns the number of bytes that the pipe can hold at a time. |
722 | /// |
723 | /// # Examples |
724 | /// |
725 | /// ``` |
726 | /// # futures_lite::future::block_on(async { |
727 | /// let (_, writer) = piper::pipe(10); |
728 | /// assert_eq!(writer.capacity(), 10); |
729 | /// # }); |
730 | /// ``` |
731 | pub fn capacity(&self) -> usize { |
732 | self.inner.cap |
733 | } |
734 | |
735 | /// Tell whether or not the pipe is full. |
736 | /// |
737 | /// The pipe is full if the number of bytes written into it is equal to its capacity. At this point, |
738 | /// writes will block until some data is read from the pipe. |
739 | /// |
740 | /// This method returns `true` if the pipe is full, and `false` otherwise. |
741 | /// |
742 | /// # Examples |
743 | /// |
744 | /// ``` |
745 | /// let (mut reader, mut writer) = piper::pipe(10); |
746 | /// assert!(!writer.is_full()); |
747 | /// let _ = writer.try_fill(&[0u8; 10]); |
748 | /// assert!(writer.is_full()); |
749 | /// let _ = reader.try_drain(&mut [0u8; 5]); |
750 | /// assert!(!writer.is_full()); |
751 | /// ``` |
752 | pub fn is_full(&self) -> bool { |
753 | self.inner.len() == self.inner.cap |
754 | } |
755 | |
756 | /// Tell whether or not the pipe is closed. |
757 | /// |
758 | /// The pipe is closed if either the reader or the writer has been dropped. At this point, attempting |
759 | /// to write into the pipe will return `Poll::Ready(Ok(0))` and attempting to read from the pipe after |
760 | /// any previously written bytes are read will return `Poll::Ready(Ok(0))`. |
761 | /// |
762 | /// # Examples |
763 | /// |
764 | /// ``` |
765 | /// # futures_lite::future::block_on(async { |
766 | /// let (reader, writer) = piper::pipe(10); |
767 | /// assert!(!writer.is_closed()); |
768 | /// drop(reader); |
769 | /// assert!(writer.is_closed()); |
770 | /// # }); |
771 | /// ``` |
772 | pub fn is_closed(&self) -> bool { |
773 | self.inner.closed.load(Ordering::SeqCst) |
774 | } |
775 | |
776 | /// Reads bytes from blocking `src` and writes into this writer. |
777 | /// |
778 | /// This method writes directly from `src` into the pipe's internal buffer. This avoids an extra copy, |
779 | /// but it may block the thread if `src` blocks. |
780 | /// |
781 | /// If the pipe is full, this method returns `Poll::Pending`. If the pipe is closed, this method |
782 | /// returns `Poll::Ready(Ok(0))`. Errors in `src` are bubbled up through `Poll::Ready(Err(e))`. |
783 | /// Otherwise, this method returns `Poll::Ready(Ok(n))` where `n` is the number of bytes read. |
784 | /// |
785 | /// This method is only available when the `std` feature is enabled. For `no_std` environments, |
786 | /// consider using [`poll_fill_bytes`] instead. |
787 | /// |
788 | /// [`poll_fill_bytes`]: #method.poll_fill_bytes |
789 | /// |
790 | /// # Examples |
791 | /// |
792 | /// ``` |
793 | /// use futures_lite::{future, prelude::*}; |
794 | /// # future::block_on(async { |
795 | /// |
796 | /// // Create a pipe. |
797 | /// let (mut reader, mut writer) = piper::pipe(1024); |
798 | /// |
799 | /// // Fill the pipe with some bytes. |
800 | /// let data = b"hello world" ; |
801 | /// let n = future::poll_fn(|cx| writer.poll_fill(cx, &data[..])).await.unwrap(); |
802 | /// assert_eq!(n, data.len()); |
803 | /// |
804 | /// // Read the bytes back. |
805 | /// let mut buf = [0; 1024]; |
806 | /// reader.read_exact(&mut buf[..data.len()]).await.unwrap(); |
807 | /// assert_eq!(&buf[..data.len()], data); |
808 | /// # }); |
809 | /// ``` |
810 | #[cfg (feature = "std" )] |
811 | pub fn poll_fill(&mut self, cx: &mut Context<'_>, src: impl Read) -> Poll<io::Result<usize>> { |
812 | self.fill_inner(Some(cx), src) |
813 | } |
814 | |
815 | /// Writes bytes into this writer. |
816 | /// |
817 | /// Rather than taking a `Read` trait object, this method takes a slice of bytes to read from. |
818 | /// Because of this, it is infallible and can be used in `no_std` environments. |
819 | /// |
820 | /// The same conditions that apply to [`poll_fill`] apply to this method. |
821 | /// |
822 | /// [`poll_fill`]: #method.poll_fill |
823 | /// |
824 | /// # Examples |
825 | /// |
826 | /// ``` |
827 | /// use futures_lite::{future, prelude::*}; |
828 | /// # future::block_on(async { |
829 | /// |
830 | /// // Create a pipe. |
831 | /// let (mut reader, mut writer) = piper::pipe(1024); |
832 | /// |
833 | /// // Fill the pipe with some bytes. |
834 | /// let data = b"hello world" ; |
835 | /// let n = future::poll_fn(|cx| writer.poll_fill_bytes(cx, &data[..])).await; |
836 | /// assert_eq!(n, data.len()); |
837 | /// |
838 | /// // Read the bytes back. |
839 | /// let mut buf = [0; 1024]; |
840 | /// reader.read_exact(&mut buf[..data.len()]).await.unwrap(); |
841 | /// assert_eq!(&buf[..data.len()], data); |
842 | /// # }); |
843 | /// ``` |
844 | pub fn poll_fill_bytes(&mut self, cx: &mut Context<'_>, bytes: &[u8]) -> Poll<usize> { |
845 | match self.fill_inner(Some(cx), ReadBytes(bytes)) { |
846 | Poll::Ready(Ok(n)) => Poll::Ready(n), |
847 | Poll::Ready(Err(e)) => match e {}, |
848 | Poll::Pending => Poll::Pending, |
849 | } |
850 | } |
851 | |
852 | /// Tries to write bytes to this writer. |
853 | /// |
854 | /// Returns the total number of bytes that were read from this reader. |
855 | /// |
856 | /// # Examples |
857 | /// |
858 | /// ``` |
859 | /// let (mut r, mut w) = piper::pipe(1024); |
860 | /// |
861 | /// let mut buf = [0; 10]; |
862 | /// assert_eq!(w.try_fill(&[0, 1, 2, 3, 4]), 5); |
863 | /// assert_eq!(r.try_drain(&mut buf), 5); |
864 | /// assert_eq!(&buf[..5], &[0, 1, 2, 3, 4]); |
865 | /// ``` |
866 | pub fn try_fill(&mut self, dest: &[u8]) -> usize { |
867 | match self.fill_inner(None, ReadBytes(dest)) { |
868 | Poll::Ready(Ok(n)) => n, |
869 | Poll::Ready(Err(e)) => match e {}, |
870 | Poll::Pending => 0, |
871 | } |
872 | } |
873 | |
874 | /// Reads bytes from blocking `src` and writes into this writer. |
875 | #[inline ] |
876 | fn fill_inner<R: ReadLike>( |
877 | &mut self, |
878 | mut cx: Option<&mut Context<'_>>, |
879 | mut src: R, |
880 | ) -> Poll<Result<usize, R::Error>> { |
881 | // Just a quick check if the pipe is closed, which is why a relaxed load is okay. |
882 | if self.inner.closed.load(Ordering::Relaxed) { |
883 | return Poll::Ready(Ok(0)); |
884 | } |
885 | |
886 | // Calculates the distance between two indices. |
887 | let cap = self.inner.cap; |
888 | let distance = |a: usize, b: usize| { |
889 | if a <= b { |
890 | b - a |
891 | } else { |
892 | 2 * cap - (a - b) |
893 | } |
894 | }; |
895 | |
896 | // If the pipe appears to be full... |
897 | if distance(self.head, self.tail) == cap { |
898 | // Reload the head in case it's become stale. |
899 | self.head = self.inner.head.load(Ordering::Acquire); |
900 | |
901 | // If the pipe is now really empty... |
902 | if distance(self.head, self.tail) == cap { |
903 | // Register the waker. |
904 | if let Some(cx) = cx.as_mut() { |
905 | self.inner.writer.register(cx.waker()); |
906 | } |
907 | atomic::fence(Ordering::SeqCst); |
908 | |
909 | // Reload the head after registering the waker. |
910 | self.head = self.inner.head.load(Ordering::Acquire); |
911 | |
912 | // If the pipe is still full... |
913 | if distance(self.head, self.tail) == cap { |
914 | // Check whether the pipe is closed or just full. |
915 | if self.inner.closed.load(Ordering::Relaxed) { |
916 | return Poll::Ready(Ok(0)); |
917 | } else { |
918 | return Poll::Pending; |
919 | } |
920 | } |
921 | } |
922 | } |
923 | |
924 | // The pipe is not full so remove the waker. |
925 | self.inner.writer.take(); |
926 | |
927 | // Yield with some small probability - this improves fairness. |
928 | if let Some(cx) = cx { |
929 | ready!(maybe_yield(&mut self.rng, cx)); |
930 | } |
931 | |
932 | // Given an index in `0..2*cap`, returns the real index in `0..cap`. |
933 | let real_index = |i: usize| { |
934 | if i < cap { |
935 | i |
936 | } else { |
937 | i - cap |
938 | } |
939 | }; |
940 | |
941 | // Number of bytes written so far. |
942 | let mut count = 0; |
943 | |
944 | loop { |
945 | // Calculate how many bytes to write in this iteration. |
946 | let n = (128 * 1024) // Not too many bytes in one go - better to wake the reader soon! |
947 | .min(self.zeroed_until * 2 + 4096) // Don't zero too many bytes when starting. |
948 | .min(cap - distance(self.head, self.tail)) // No more than space in the pipe. |
949 | .min(cap - real_index(self.tail)); // Don't go past the buffer boundary. |
950 | |
951 | // Create a slice of available space in the pipe buffer. |
952 | let pipe_slice_mut = unsafe { |
953 | let from = real_index(self.tail); |
954 | let to = from + n; |
955 | |
956 | // Make sure all bytes in the slice are initialized. |
957 | if self.zeroed_until < to { |
958 | self.inner |
959 | .buffer |
960 | .add(self.zeroed_until) |
961 | .write_bytes(0u8, to - self.zeroed_until); |
962 | self.zeroed_until = to; |
963 | } |
964 | |
965 | slice::from_raw_parts_mut(self.inner.buffer.add(from), n) |
966 | }; |
967 | |
968 | // Copy bytes from `src` into the piper buffer. |
969 | let n = src.read(pipe_slice_mut)?; |
970 | count += n; |
971 | |
972 | // If the pipe is full or closed, or `src` is empty, return. |
973 | if n == 0 || self.inner.closed.load(Ordering::Relaxed) { |
974 | return Poll::Ready(Ok(count)); |
975 | } |
976 | |
977 | // Move the tail forward. |
978 | if self.tail + n < 2 * cap { |
979 | self.tail += n; |
980 | } else { |
981 | self.tail = 0; |
982 | } |
983 | |
984 | // Store the current tail index. |
985 | self.inner.tail.store(self.tail, Ordering::Release); |
986 | |
987 | // Wake the reader because the pipe is not empty. |
988 | self.inner.reader.wake(); |
989 | } |
990 | } |
991 | } |
992 | |
993 | #[cfg (feature = "std" )] |
994 | impl AsyncWrite for Writer { |
995 | fn poll_write( |
996 | mut self: Pin<&mut Self>, |
997 | cx: &mut Context<'_>, |
998 | buf: &[u8], |
999 | ) -> Poll<io::Result<usize>> { |
1000 | self.poll_fill_bytes(cx, buf).map(Ok) |
1001 | } |
1002 | |
1003 | fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
1004 | // Nothing to flush. |
1005 | Poll::Ready(Ok(())) |
1006 | } |
1007 | |
1008 | fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
1009 | // Set the closed flag. |
1010 | self.inner.closed.store(true, Ordering::Release); |
1011 | |
1012 | // Wake up any tasks that may be waiting on the pipe. |
1013 | self.inner.reader.wake(); |
1014 | self.inner.writer.wake(); |
1015 | |
1016 | // The pipe is now closed. |
1017 | Poll::Ready(Ok(())) |
1018 | } |
1019 | } |
1020 | |
1021 | /// A trait for reading bytes into a pipe. |
1022 | trait ReadLike { |
1023 | /// The error type. |
1024 | type Error; |
1025 | |
1026 | /// Reads bytes into the given buffer. |
1027 | fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error>; |
1028 | } |
1029 | |
1030 | #[cfg (feature = "std" )] |
1031 | impl<R: Read> ReadLike for R { |
1032 | type Error = io::Error; |
1033 | |
1034 | fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> { |
1035 | Read::read(self, buf) |
1036 | } |
1037 | } |
1038 | |
1039 | /// Implements `no_std` reading around a byte slice. |
1040 | struct ReadBytes<'a>(&'a [u8]); |
1041 | |
1042 | impl ReadLike for ReadBytes<'_> { |
1043 | type Error = Infallible; |
1044 | |
1045 | fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> { |
1046 | let n: usize = self.0.len().min(buf.len()); |
1047 | buf[..n].copy_from_slice(&self.0[..n]); |
1048 | self.0 = &self.0[n..]; |
1049 | Ok(n) |
1050 | } |
1051 | } |
1052 | |
1053 | /// A trait for writing bytes from a pipe. |
1054 | trait WriteLike { |
1055 | /// The error type. |
1056 | type Error; |
1057 | |
1058 | /// Writes bytes from the given buffer. |
1059 | fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error>; |
1060 | } |
1061 | |
1062 | #[cfg (feature = "std" )] |
1063 | impl<W: Write> WriteLike for W { |
1064 | type Error = io::Error; |
1065 | |
1066 | fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> { |
1067 | Write::write(self, buf) |
1068 | } |
1069 | } |
1070 | |
1071 | /// Implements `no_std` writing around a byte slice. |
1072 | struct WriteBytes<'a>(&'a mut [u8]); |
1073 | |
1074 | impl WriteLike for WriteBytes<'_> { |
1075 | type Error = Infallible; |
1076 | |
1077 | fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> { |
1078 | let n: usize = self.0.len().min(buf.len()); |
1079 | self.0[..n].copy_from_slice(&buf[..n]); |
1080 | |
1081 | // mem::take() is not available on 1.36 |
1082 | #[allow (clippy::mem_replace_with_default)] |
1083 | { |
1084 | let slice: &mut [u8] = mem::replace(&mut self.0, &mut []); |
1085 | self.0 = &mut slice[n..]; |
1086 | } |
1087 | |
1088 | Ok(n) |
1089 | } |
1090 | } |
1091 | |
1092 | /// Yield with some small probability. |
1093 | fn maybe_yield(rng: &mut fastrand::Rng, cx: &mut Context<'_>) -> Poll<()> { |
1094 | if rng.usize(..100) == 0 { |
1095 | cx.waker().wake_by_ref(); |
1096 | Poll::Pending |
1097 | } else { |
1098 | Poll::Ready(()) |
1099 | } |
1100 | } |
1101 | |
1102 | /// Get a random number generator. |
1103 | #[cfg (feature = "std" )] |
1104 | #[inline ] |
1105 | fn rng() -> fastrand::Rng { |
1106 | fastrand::Rng::new() |
1107 | } |
1108 | |
1109 | /// Get a random number generator. |
1110 | /// |
1111 | /// This uses a fixed seed due to the lack of a good RNG in `no_std` environments. |
1112 | #[cfg (not(feature = "std" ))] |
1113 | #[inline ] |
1114 | fn rng() -> fastrand::Rng { |
1115 | // Chosen by fair roll of the dice. |
1116 | fastrand::Rng::with_seed(0x7e9b496634c97ec6) |
1117 | } |
1118 | |
1119 | /// ``` |
1120 | /// use piper::{Reader, Writer}; |
1121 | /// fn _send_sync<T: Send + Sync>() {} |
1122 | /// _send_sync::<Reader>(); |
1123 | /// _send_sync::<Writer>(); |
1124 | /// ``` |
1125 | fn _assert_send_sync() {} |
1126 | |
1127 | mod sync { |
1128 | #[cfg (not(feature = "portable-atomic" ))] |
1129 | pub use core::sync::atomic; |
1130 | |
1131 | #[cfg (not(feature = "portable-atomic" ))] |
1132 | pub use alloc::sync::Arc; |
1133 | |
1134 | #[cfg (feature = "portable-atomic" )] |
1135 | pub use portable_atomic_crate as atomic; |
1136 | |
1137 | #[cfg (feature = "portable-atomic" )] |
1138 | pub use portable_atomic_util::Arc; |
1139 | } |
1140 | |