1//! A bounded single-producer single-consumer pipe.
2//!
3//! This crate provides a ring buffer that can be asynchronously read from and written to. It is
4//! created via the [`pipe`] function, which returns a pair of [`Reader`] and [`Writer`] handles.
5//! They implement the [`AsyncRead`] and [`AsyncWrite`] traits, respectively.
6//!
7//! The handles are single-producer/single-consumer; to clarify, they cannot be cloned and need `&mut`
8//! access to read or write to them. If multiple-producer/multiple-consumer handles are needed,
9//! consider wrapping them in an `Arc<Mutex<...>>` or similar.
10//!
11//! When the sender is dropped, remaining bytes in the pipe can still be read. After that, attempts
12//! to read will result in `Ok(0)`, i.e. they will always 'successfully' read 0 bytes.
13//!
14//! When the receiver is dropped, the pipe is closed and no more bytes and be written into it.
15//! Further writes will result in `Ok(0)`, i.e. they will always 'successfully' write 0 bytes.
16//!
17//! # Version 0.2.0 Notes
18//!
19//! Previously, this crate contained other synchronization primitives, such as bounded channels, locks,
20//! and event listeners. These have been split out into their own crates:
21//!
22//! - [`async-channel`](https://docs.rs/async-channel)
23//! - [`async-dup`](https://docs.rs/async-dup)
24//! - [`async-lock`](https://docs.rs/async-lock)
25//! - [`async-mutex`](https://docs.rs/async-mutex)
26//! - [`event-listener`](https://docs.rs/event-listener)
27//!
28//! # Examples
29//!
30//! ## Asynchronous Tasks
31//!
32//! Communicate between asynchronous tasks, potentially on other threads.
33//!
34//! ```
35//! use async_channel::unbounded;
36//! use async_executor::Executor;
37//! use easy_parallel::Parallel;
38//! use futures_lite::{future, prelude::*};
39//! use std::time::Duration;
40//!
41//! # if cfg!(miri) { return; }
42//!
43//! // Create a pair of handles.
44//! let (mut reader, mut writer) = piper::pipe(1024);
45//!
46//! // Create the executor.
47//! let ex = Executor::new();
48//! let (signal, shutdown) = unbounded::<()>();
49//!
50//! // Spawn a detached task for random data to the pipe.
51//! let writer = ex.spawn(async move {
52//! for _ in 0..1_000 {
53//! // Generate 8 random numnbers.
54//! let random = fastrand::u64(..).to_le_bytes();
55//!
56//! // Write them to the pipe.
57//! writer.write_all(&random).await.unwrap();
58//!
59//! // Wait a bit.
60//! async_io::Timer::after(Duration::from_millis(5)).await;
61//! }
62//!
63//! // Drop the writer to close the pipe.
64//! drop(writer);
65//! });
66//!
67//! // Detach the task so that it runs in the background.
68//! writer.detach();
69//!
70//! // Spawn a task for reading from the pipe.
71//! let reader = ex.spawn(async move {
72//! let mut buf = vec![];
73//!
74//! // Read all bytes from the pipe.
75//! reader.read_to_end(&mut buf).await.unwrap();
76//!
77//! println!("Random data: {:#?}", buf);
78//! });
79//!
80//! Parallel::new()
81//! // Run four executor threads.
82//! .each(0..4, |_| future::block_on(ex.run(shutdown.recv())))
83//! // Run the main future on the current thread.
84//! .finish(|| future::block_on(async {
85//! // Wait for the reader to finish.
86//! reader.await;
87//!
88//! // Signal the executor threads to shut down.
89//! drop(signal);
90//! }));
91//! ```
92//!
93//! ## Blocking I/O
94//!
95//! File I/O is blocking; therefore, in `async` code, you must run it on another thread. This example
96//! spawns another thread for reading a file and writing it to a pipe.
97//!
98//! ```no_run
99//! use futures_lite::{future, prelude::*};
100//! use std::fs::File;
101//! use std::io::prelude::*;
102//! use std::thread;
103//!
104//! // Create a pair of handles.
105//! let (mut r, mut w) = piper::pipe(1024);
106//!
107//! // Spawn a thread for reading a file.
108//! thread::spawn(move || {
109//! let mut file = File::open("Cargo.toml").unwrap();
110//!
111//! // Read the file into a buffer.
112//! let mut buf = [0u8; 16384];
113//! future::block_on(async move {
114//! loop {
115//! // Read a chunk of bytes from the file.
116//! // Blocking is okay here, since this is a separate thread.
117//! let n = file.read(&mut buf).unwrap();
118//! if n == 0 {
119//! break;
120//! }
121//!
122//! // Write the chunk to the pipe.
123//! w.write_all(&buf[..n]).await.unwrap();
124//! }
125//!
126//! // Close the pipe.
127//! drop(w);
128//! });
129//! });
130//!
131//! # future::block_on(async move {
132//! // Read bytes from the pipe.
133//! let mut buf = vec![];
134//! r.read_to_end(&mut buf).await.unwrap();
135//!
136//! println!("Read {} bytes", buf.len());
137//! # });
138//! ```
139//!
140//! However, the lower-level [`poll_fill`] and [`poll_drain`] methods take `impl Read` and `impl Write`
141//! arguments, respectively. This allows you to skip the buffer entirely and read/write directly from
142//! the file into the pipe. This approach should be preferred when possible, as it avoids an extra
143//! copy.
144//!
145//! ```no_run
146//! # use futures_lite::future;
147//! # use std::fs::File;
148//! # let mut file: File = unimplemented!();
149//! # let mut w: piper::Writer = unimplemented!();
150//! // In the `future::block_on` call above...
151//! # future::block_on(async move {
152//! loop {
153//! let n = future::poll_fn(|cx| w.poll_fill(cx, &mut file)).await.unwrap();
154//! if n == 0 {
155//! break;
156//! }
157//! }
158//! # });
159//! ```
160//!
161//! The [`blocking`] crate is preferred in this use case, since it uses more efficient strategies for
162//! thread management and pipes.
163//!
164//! [`poll_fill`]: struct.Writer.html#method.poll_fill
165//! [`poll_drain`]: struct.Reader.html#method.poll_drain
166//! [`blocking`]: https://docs.rs/blocking
167
168#![cfg_attr(not(feature = "std"), no_std)]
169#![forbid(missing_docs)]
170#![doc(
171 html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
172)]
173#![doc(
174 html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
175)]
176
177extern crate alloc;
178
179use core::convert::Infallible;
180use core::mem;
181use core::slice;
182use core::task::{Context, Poll};
183
184use alloc::vec::Vec;
185
186use sync::atomic::{self, AtomicBool, AtomicUsize, Ordering};
187use sync::Arc;
188
189#[cfg(feature = "std")]
190use std::{
191 io::{self, Read, Write},
192 pin::Pin,
193};
194
195use atomic_waker::AtomicWaker;
196
197#[cfg(feature = "std")]
198use futures_io::{AsyncRead, AsyncWrite};
199
200macro_rules! ready {
201 ($e:expr) => {{
202 match $e {
203 Poll::Ready(t) => t,
204 Poll::Pending => return Poll::Pending,
205 }
206 }};
207}
208
209/// Creates a bounded single-producer single-consumer pipe.
210///
211/// A pipe is a ring buffer of `cap` bytes that can be asynchronously read from and written to.
212///
213/// See the [crate-level documentation](index.html) for more details.
214///
215/// # Panics
216///
217/// This function panics if `cap` is 0 or if `cap * 2` overflows a `usize`.
218pub fn pipe(cap: usize) -> (Reader, Writer) {
219 assert!(cap > 0, "capacity must be positive");
220 assert!(cap.checked_mul(2).is_some(), "capacity is too large");
221
222 // Allocate the ring buffer.
223 let mut v = Vec::with_capacity(cap);
224 let buffer = v.as_mut_ptr();
225 mem::forget(v);
226
227 let inner = Arc::new(Pipe {
228 head: AtomicUsize::new(0),
229 tail: AtomicUsize::new(0),
230 reader: AtomicWaker::new(),
231 writer: AtomicWaker::new(),
232 closed: AtomicBool::new(false),
233 buffer,
234 cap,
235 });
236
237 // Use a random number generator to randomize fair yielding behavior.
238 let mut rng = rng();
239
240 let r = Reader {
241 inner: inner.clone(),
242 head: 0,
243 tail: 0,
244 rng: rng.fork(),
245 };
246
247 let w = Writer {
248 inner,
249 head: 0,
250 tail: 0,
251 zeroed_until: 0,
252 rng,
253 };
254
255 (r, w)
256}
257
258/// The reading side of a pipe.
259///
260/// This type is created by the [`pipe`] function. See its documentation for more details.
261pub struct Reader {
262 /// The inner ring buffer.
263 inner: Arc<Pipe>,
264
265 /// The head index, moved by the reader, in the range `0..2*cap`.
266 ///
267 /// This index always matches `inner.head`.
268 head: usize,
269
270 /// The tail index, moved by the writer, in the range `0..2*cap`.
271 ///
272 /// This index is a snapshot of `index.tail` that might become stale at any point.
273 tail: usize,
274
275 /// Random number generator.
276 rng: fastrand::Rng,
277}
278
279/// The writing side of a pipe.
280///
281/// This type is created by the [`pipe`] function. See its documentation for more details.
282pub struct Writer {
283 /// The inner ring buffer.
284 inner: Arc<Pipe>,
285
286 /// The head index, moved by the reader, in the range `0..2*cap`.
287 ///
288 /// This index is a snapshot of `index.head` that might become stale at any point.
289 head: usize,
290
291 /// The tail index, moved by the writer, in the range `0..2*cap`.
292 ///
293 /// This index always matches `inner.tail`.
294 tail: usize,
295
296 /// How many bytes at the beginning of the buffer have been zeroed.
297 ///
298 /// The pipe allocates an uninitialized buffer, and we must be careful about passing
299 /// uninitialized data to user code. Zeroing the buffer right after allocation would be too
300 /// expensive, so we zero it in smaller chunks as the writer makes progress.
301 zeroed_until: usize,
302
303 /// Random number generator.
304 rng: fastrand::Rng,
305}
306
307/// The inner ring buffer.
308///
309/// Head and tail indices are in the range `0..2*cap`, even though they really map onto the
310/// `0..cap` range. The distance between head and tail indices is never more than `cap`.
311///
312/// The reason why indices are not in the range `0..cap` is because we need to distinguish between
313/// the pipe being empty and being full. If head and tail were in `0..cap`, then `head == tail`
314/// could mean the pipe is either empty or full, but we don't know which!
315struct Pipe {
316 /// The head index, moved by the reader, in the range `0..2*cap`.
317 head: AtomicUsize,
318
319 /// The tail index, moved by the writer, in the range `0..2*cap`.
320 tail: AtomicUsize,
321
322 /// A waker representing the blocked reader.
323 reader: AtomicWaker,
324
325 /// A waker representing the blocked writer.
326 writer: AtomicWaker,
327
328 /// Set to `true` if the reader or writer was dropped.
329 closed: AtomicBool,
330
331 /// The byte buffer.
332 buffer: *mut u8,
333
334 /// The buffer capacity.
335 cap: usize,
336}
337
338unsafe impl Sync for Pipe {}
339unsafe impl Send for Pipe {}
340
341impl Drop for Pipe {
342 fn drop(&mut self) {
343 // Deallocate the byte buffer.
344 unsafe {
345 Vec::from_raw_parts(self.buffer, length:0, self.cap);
346 }
347 }
348}
349
350impl Drop for Reader {
351 fn drop(&mut self) {
352 // Dropping closes the pipe and then wakes the writer.
353 self.inner.closed.store(val:true, order:Ordering::SeqCst);
354 self.inner.writer.wake();
355 }
356}
357
358impl Drop for Writer {
359 fn drop(&mut self) {
360 // Dropping closes the pipe and then wakes the reader.
361 self.inner.closed.store(val:true, order:Ordering::SeqCst);
362 self.inner.reader.wake();
363 }
364}
365
366impl Pipe {
367 /// Get the length of the data in the pipe.
368 fn len(&self) -> usize {
369 let head: usize = self.head.load(order:Ordering::Acquire);
370 let tail: usize = self.tail.load(order:Ordering::Acquire);
371
372 if head <= tail {
373 tail - head
374 } else {
375 (2 * self.cap) - (head - tail)
376 }
377 }
378}
379
380impl Reader {
381 /// Gets the total length of the data in the pipe.
382 ///
383 /// This method returns the number of bytes that have been written into the pipe but haven't been
384 /// read yet.
385 ///
386 /// # Examples
387 ///
388 /// ```
389 /// # futures_lite::future::block_on(async {
390 /// # futures_lite::future::poll_fn(|cx| {
391 /// let (mut reader, mut writer) = piper::pipe(10);
392 /// let _ = writer.poll_fill_bytes(cx, &[0u8; 5]);
393 /// assert_eq!(reader.len(), 5);
394 /// # std::task::Poll::Ready(()) }).await;
395 /// # });
396 /// ```
397 pub fn len(&self) -> usize {
398 self.inner.len()
399 }
400
401 /// Tell whether or not the pipe is empty.
402 ///
403 /// This method returns `true` if the pipe is empty, and `false` otherwise.
404 ///
405 /// # Examples
406 ///
407 /// ```
408 /// # futures_lite::future::block_on(async {
409 /// # futures_lite::future::poll_fn(|cx| {
410 /// let (mut reader, mut writer) = piper::pipe(10);
411 /// assert!(reader.is_empty());
412 /// let _ = writer.poll_fill_bytes(cx, &[0u8; 5]);
413 /// assert!(!reader.is_empty());
414 /// # std::task::Poll::Ready(()) }).await;
415 /// # });
416 /// ```
417 pub fn is_empty(&self) -> bool {
418 self.inner.len() == 0
419 }
420
421 /// Gets the total capacity of the pipe.
422 ///
423 /// This method returns the number of bytes that the pipe can hold at a time.
424 ///
425 /// # Examples
426 ///
427 /// ```
428 /// # futures_lite::future::block_on(async {
429 /// let (reader, _) = piper::pipe(10);
430 /// assert_eq!(reader.capacity(), 10);
431 /// # });
432 /// ```
433 pub fn capacity(&self) -> usize {
434 self.inner.cap
435 }
436
437 /// Tell whether or not the pipe is full.
438 ///
439 /// The pipe is full if the number of bytes written into it is equal to its capacity. At this point,
440 /// writes will block until some data is read from the pipe.
441 ///
442 /// This method returns `true` if the pipe is full, and `false` otherwise.
443 ///
444 /// # Examples
445 ///
446 /// ```
447 /// # futures_lite::future::block_on(async {
448 /// # futures_lite::future::poll_fn(|cx| {
449 /// let (mut reader, mut writer) = piper::pipe(10);
450 /// assert!(!reader.is_full());
451 /// let _ = writer.poll_fill_bytes(cx, &[0u8; 10]);
452 /// assert!(reader.is_full());
453 /// let _ = reader.poll_drain_bytes(cx, &mut [0u8; 5]);
454 /// assert!(!reader.is_full());
455 /// # std::task::Poll::Ready(()) }).await;
456 /// # });
457 /// ```
458 pub fn is_full(&self) -> bool {
459 self.inner.len() == self.inner.cap
460 }
461
462 /// Tell whether or not the pipe is closed.
463 ///
464 /// The pipe is closed if either the reader or the writer has been dropped. At this point, attempting
465 /// to write into the pipe will return `Poll::Ready(Ok(0))` and attempting to read from the pipe after
466 /// any previously written bytes are read will return `Poll::Ready(Ok(0))`.
467 ///
468 /// # Examples
469 ///
470 /// ```
471 /// # futures_lite::future::block_on(async {
472 /// let (mut reader, mut writer) = piper::pipe(10);
473 /// assert!(!reader.is_closed());
474 /// drop(writer);
475 /// assert!(reader.is_closed());
476 /// # });
477 /// ```
478 pub fn is_closed(&self) -> bool {
479 self.inner.closed.load(Ordering::SeqCst)
480 }
481
482 /// Reads bytes from this reader and writes into blocking `dest`.
483 ///
484 /// This method reads directly from the pipe's internal buffer into `dest`. This avoids an extra copy,
485 /// but it may block the thread if `dest` blocks.
486 ///
487 /// If the pipe is empty, this method returns `Poll::Pending`. If the pipe is closed, this method
488 /// returns `Poll::Ready(Ok(0))`. Errors in `dest` are bubbled up through `Poll::Ready(Err(e))`.
489 /// Otherwise, this method returns `Poll::Ready(Ok(n))` where `n` is the number of bytes written.
490 ///
491 /// This method is only available when the `std` feature is enabled. For `no_std` environments,
492 /// consider using [`poll_drain_bytes`] instead.
493 ///
494 /// [`poll_drain_bytes`]: #method.poll_drain_bytes
495 ///
496 /// # Examples
497 ///
498 /// ```
499 /// use futures_lite::{future, prelude::*};
500 /// # future::block_on(async {
501 ///
502 /// let (mut r, mut w) = piper::pipe(1024);
503 ///
504 /// // Write some data to the pipe.
505 /// w.write_all(b"hello world").await.unwrap();
506 ///
507 /// // Try reading from the pipe.
508 /// let mut buf = [0; 1024];
509 /// let n = future::poll_fn(|cx| r.poll_drain(cx, &mut buf[..])).await.unwrap();
510 ///
511 /// // The data was written to the buffer.
512 /// assert_eq!(&buf[..n], b"hello world");
513 /// # });
514 /// ```
515 #[cfg(feature = "std")]
516 pub fn poll_drain(
517 &mut self,
518 cx: &mut Context<'_>,
519 dest: impl Write,
520 ) -> Poll<io::Result<usize>> {
521 self.drain_inner(Some(cx), dest)
522 }
523
524 /// Reads bytes from this reader.
525 ///
526 /// Rather than taking a `Write` trait object, this method takes a slice of bytes to write into.
527 /// Because of this, it is infallible and can be used in `no_std` environments.
528 ///
529 /// The same conditions that apply to [`poll_drain`] apply to this method.
530 ///
531 /// [`poll_drain`]: #method.poll_drain
532 ///
533 /// # Examples
534 ///
535 /// ```
536 /// use futures_lite::{future, prelude::*};
537 /// # future::block_on(async {
538 /// let (mut r, mut w) = piper::pipe(1024);
539 ///
540 /// // Write some data to the pipe.
541 /// w.write_all(b"hello world").await.unwrap();
542 ///
543 /// // Try reading from the pipe.
544 /// let mut buf = [0; 1024];
545 /// let n = future::poll_fn(|cx| r.poll_drain_bytes(cx, &mut buf[..])).await;
546 ///
547 /// // The data was written to the buffer.
548 /// assert_eq!(&buf[..n], b"hello world");
549 /// # });
550 /// ```
551 pub fn poll_drain_bytes(&mut self, cx: &mut Context<'_>, dest: &mut [u8]) -> Poll<usize> {
552 match self.drain_inner(Some(cx), WriteBytes(dest)) {
553 Poll::Ready(Ok(n)) => Poll::Ready(n),
554 Poll::Ready(Err(e)) => match e {},
555 Poll::Pending => Poll::Pending,
556 }
557 }
558
559 /// Tries to read bytes from this reader.
560 ///
561 /// Returns the total number of bytes that were read from this reader.
562 ///
563 /// # Examples
564 ///
565 /// ```
566 /// let (mut r, mut w) = piper::pipe(1024);
567 ///
568 /// // `try_drain()` returns 0 off the bat.
569 /// let mut buf = [0; 10];
570 /// assert_eq!(r.try_drain(&mut buf), 0);
571 ///
572 /// // After a write it returns the data.
573 /// w.try_fill(&[0, 1, 2, 3, 4]);
574 /// assert_eq!(r.try_drain(&mut buf), 5);
575 /// assert_eq!(&buf[..5], &[0, 1, 2, 3, 4]);
576 /// ```
577 pub fn try_drain(&mut self, dest: &mut [u8]) -> usize {
578 match self.drain_inner(None, WriteBytes(dest)) {
579 Poll::Ready(Ok(n)) => n,
580 Poll::Ready(Err(e)) => match e {},
581 Poll::Pending => 0,
582 }
583 }
584
585 /// Reads bytes from this reader and writes into blocking `dest`.
586 #[inline]
587 fn drain_inner<W: WriteLike>(
588 &mut self,
589 mut cx: Option<&mut Context<'_>>,
590 mut dest: W,
591 ) -> Poll<Result<usize, W::Error>> {
592 let cap = self.inner.cap;
593
594 // Calculates the distance between two indices.
595 let distance = |a: usize, b: usize| {
596 if a <= b {
597 b - a
598 } else {
599 2 * cap - (a - b)
600 }
601 };
602
603 // If the pipe appears to be empty...
604 if distance(self.head, self.tail) == 0 {
605 // Reload the tail in case it's become stale.
606 self.tail = self.inner.tail.load(Ordering::Acquire);
607
608 // If the pipe is now really empty...
609 if distance(self.head, self.tail) == 0 {
610 // Register the waker.
611 if let Some(cx) = cx.as_mut() {
612 self.inner.reader.register(cx.waker());
613 }
614 atomic::fence(Ordering::SeqCst);
615
616 // Reload the tail after registering the waker.
617 self.tail = self.inner.tail.load(Ordering::Acquire);
618
619 // If the pipe is still empty...
620 if distance(self.head, self.tail) == 0 {
621 // Check whether the pipe is closed or just empty.
622 if self.inner.closed.load(Ordering::Relaxed) {
623 return Poll::Ready(Ok(0));
624 } else {
625 return Poll::Pending;
626 }
627 }
628 }
629 }
630
631 // The pipe is not empty so remove the waker.
632 self.inner.reader.take();
633
634 // Yield with some small probability - this improves fairness.
635 if let Some(cx) = cx {
636 ready!(maybe_yield(&mut self.rng, cx));
637 }
638
639 // Given an index in `0..2*cap`, returns the real index in `0..cap`.
640 let real_index = |i: usize| {
641 if i < cap {
642 i
643 } else {
644 i - cap
645 }
646 };
647
648 // Number of bytes read so far.
649 let mut count = 0;
650
651 loop {
652 // Calculate how many bytes to read in this iteration.
653 let n = (128 * 1024) // Not too many bytes in one go - better to wake the writer soon!
654 .min(distance(self.head, self.tail)) // No more than bytes in the pipe.
655 .min(cap - real_index(self.head)); // Don't go past the buffer boundary.
656
657 // Create a slice of data in the pipe buffer.
658 let pipe_slice =
659 unsafe { slice::from_raw_parts(self.inner.buffer.add(real_index(self.head)), n) };
660
661 // Copy bytes from the pipe buffer into `dest`.
662 let n = dest.write(pipe_slice)?;
663 count += n;
664
665 // If pipe is empty or `dest` is full, return.
666 if n == 0 {
667 return Poll::Ready(Ok(count));
668 }
669
670 // Move the head forward.
671 if self.head + n < 2 * cap {
672 self.head += n;
673 } else {
674 self.head = 0;
675 }
676
677 // Store the current head index.
678 self.inner.head.store(self.head, Ordering::Release);
679
680 // Wake the writer because the pipe is not full.
681 self.inner.writer.wake();
682 }
683 }
684}
685
686#[cfg(feature = "std")]
687impl AsyncRead for Reader {
688 fn poll_read(
689 mut self: Pin<&mut Self>,
690 cx: &mut Context<'_>,
691 buf: &mut [u8],
692 ) -> Poll<io::Result<usize>> {
693 self.poll_drain_bytes(cx, dest:buf).map(Ok)
694 }
695}
696
697impl Writer {
698 /// Gets the total length of the data in the pipe.
699 ///
700 /// This method returns the number of bytes that have been written into the pipe but haven't been
701 /// read yet.
702 ///
703 /// # Examples
704 ///
705 /// ```
706 /// # futures_lite::future::block_on(async {
707 /// # futures_lite::future::poll_fn(|cx| {
708 /// let (_reader, mut writer) = piper::pipe(10);
709 /// let _ = writer.poll_fill_bytes(cx, &[0u8; 5]);
710 /// assert_eq!(writer.len(), 5);
711 /// # std::task::Poll::Ready(()) }).await;
712 /// # });
713 /// ```
714 pub fn len(&self) -> usize {
715 self.inner.len()
716 }
717
718 /// Tell whether or not the pipe is empty.
719 ///
720 /// This method returns `true` if the pipe is empty, and `false` otherwise.
721 ///
722 /// # Examples
723 ///
724 /// ```
725 /// # futures_lite::future::block_on(async {
726 /// # futures_lite::future::poll_fn(|cx| {
727 /// let (_reader, mut writer) = piper::pipe(10);
728 /// assert!(writer.is_empty());
729 /// let _ = writer.poll_fill_bytes(cx, &[0u8; 5]);
730 /// assert!(!writer.is_empty());
731 /// # std::task::Poll::Ready(()) }).await;
732 /// # });
733 /// ```
734 pub fn is_empty(&self) -> bool {
735 self.inner.len() == 0
736 }
737
738 /// Gets the total capacity of the pipe.
739 ///
740 /// This method returns the number of bytes that the pipe can hold at a time.
741 ///
742 /// # Examples
743 ///
744 /// ```
745 /// # futures_lite::future::block_on(async {
746 /// let (_, writer) = piper::pipe(10);
747 /// assert_eq!(writer.capacity(), 10);
748 /// # });
749 /// ```
750 pub fn capacity(&self) -> usize {
751 self.inner.cap
752 }
753
754 /// Tell whether or not the pipe is full.
755 ///
756 /// The pipe is full if the number of bytes written into it is equal to its capacity. At this point,
757 /// writes will block until some data is read from the pipe.
758 ///
759 /// This method returns `true` if the pipe is full, and `false` otherwise.
760 ///
761 /// # Examples
762 ///
763 /// ```
764 /// # futures_lite::future::block_on(async {
765 /// # futures_lite::future::poll_fn(|cx| {
766 /// let (mut reader, mut writer) = piper::pipe(10);
767 /// assert!(!writer.is_full());
768 /// let _ = writer.poll_fill_bytes(cx, &[0u8; 10]);
769 /// assert!(writer.is_full());
770 /// let _ = reader.poll_drain_bytes(cx, &mut [0u8; 5]);
771 /// assert!(!writer.is_full());
772 /// # std::task::Poll::Ready(()) }).await;
773 /// # });
774 /// ```
775 pub fn is_full(&self) -> bool {
776 self.inner.len() == self.inner.cap
777 }
778
779 /// Tell whether or not the pipe is closed.
780 ///
781 /// The pipe is closed if either the reader or the writer has been dropped. At this point, attempting
782 /// to write into the pipe will return `Poll::Ready(Ok(0))` and attempting to read from the pipe after
783 /// any previously written bytes are read will return `Poll::Ready(Ok(0))`.
784 ///
785 /// # Examples
786 ///
787 /// ```
788 /// # futures_lite::future::block_on(async {
789 /// let (reader, writer) = piper::pipe(10);
790 /// assert!(!writer.is_closed());
791 /// drop(reader);
792 /// assert!(writer.is_closed());
793 /// # });
794 /// ```
795 pub fn is_closed(&self) -> bool {
796 self.inner.closed.load(Ordering::SeqCst)
797 }
798
799 /// Reads bytes from blocking `src` and writes into this writer.
800 ///
801 /// This method writes directly from `src` into the pipe's internal buffer. This avoids an extra copy,
802 /// but it may block the thread if `src` blocks.
803 ///
804 /// If the pipe is full, this method returns `Poll::Pending`. If the pipe is closed, this method
805 /// returns `Poll::Ready(Ok(0))`. Errors in `src` are bubbled up through `Poll::Ready(Err(e))`.
806 /// Otherwise, this method returns `Poll::Ready(Ok(n))` where `n` is the number of bytes read.
807 ///
808 /// This method is only available when the `std` feature is enabled. For `no_std` environments,
809 /// consider using [`poll_fill_bytes`] instead.
810 ///
811 /// [`poll_fill_bytes`]: #method.poll_fill_bytes
812 ///
813 /// # Examples
814 ///
815 /// ```
816 /// use futures_lite::{future, prelude::*};
817 /// # future::block_on(async {
818 ///
819 /// // Create a pipe.
820 /// let (mut reader, mut writer) = piper::pipe(1024);
821 ///
822 /// // Fill the pipe with some bytes.
823 /// let data = b"hello world";
824 /// let n = future::poll_fn(|cx| writer.poll_fill(cx, &data[..])).await.unwrap();
825 /// assert_eq!(n, data.len());
826 ///
827 /// // Read the bytes back.
828 /// let mut buf = [0; 1024];
829 /// reader.read_exact(&mut buf[..data.len()]).await.unwrap();
830 /// assert_eq!(&buf[..data.len()], data);
831 /// # });
832 /// ```
833 #[cfg(feature = "std")]
834 pub fn poll_fill(&mut self, cx: &mut Context<'_>, src: impl Read) -> Poll<io::Result<usize>> {
835 self.fill_inner(Some(cx), src)
836 }
837
838 /// Writes bytes into this writer.
839 ///
840 /// Rather than taking a `Read` trait object, this method takes a slice of bytes to read from.
841 /// Because of this, it is infallible and can be used in `no_std` environments.
842 ///
843 /// The same conditions that apply to [`poll_fill`] apply to this method.
844 ///
845 /// [`poll_fill`]: #method.poll_fill
846 ///
847 /// # Examples
848 ///
849 /// ```
850 /// use futures_lite::{future, prelude::*};
851 /// # future::block_on(async {
852 ///
853 /// // Create a pipe.
854 /// let (mut reader, mut writer) = piper::pipe(1024);
855 ///
856 /// // Fill the pipe with some bytes.
857 /// let data = b"hello world";
858 /// let n = future::poll_fn(|cx| writer.poll_fill_bytes(cx, &data[..])).await;
859 /// assert_eq!(n, data.len());
860 ///
861 /// // Read the bytes back.
862 /// let mut buf = [0; 1024];
863 /// reader.read_exact(&mut buf[..data.len()]).await.unwrap();
864 /// assert_eq!(&buf[..data.len()], data);
865 /// # });
866 /// ```
867 pub fn poll_fill_bytes(&mut self, cx: &mut Context<'_>, bytes: &[u8]) -> Poll<usize> {
868 match self.fill_inner(Some(cx), ReadBytes(bytes)) {
869 Poll::Ready(Ok(n)) => Poll::Ready(n),
870 Poll::Ready(Err(e)) => match e {},
871 Poll::Pending => Poll::Pending,
872 }
873 }
874
875 /// Tries to write bytes to this writer.
876 ///
877 /// Returns the total number of bytes that were read from this reader.
878 ///
879 /// # Examples
880 ///
881 /// ```
882 /// let (mut r, mut w) = piper::pipe(1024);
883 ///
884 /// let mut buf = [0; 10];
885 /// assert_eq!(w.try_fill(&[0, 1, 2, 3, 4]), 5);
886 /// assert_eq!(r.try_drain(&mut buf), 5);
887 /// assert_eq!(&buf[..5], &[0, 1, 2, 3, 4]);
888 /// ```
889 pub fn try_fill(&mut self, dest: &[u8]) -> usize {
890 match self.fill_inner(None, ReadBytes(dest)) {
891 Poll::Ready(Ok(n)) => n,
892 Poll::Ready(Err(e)) => match e {},
893 Poll::Pending => 0,
894 }
895 }
896
897 /// Reads bytes from blocking `src` and writes into this writer.
898 #[inline]
899 fn fill_inner<R: ReadLike>(
900 &mut self,
901 mut cx: Option<&mut Context<'_>>,
902 mut src: R,
903 ) -> Poll<Result<usize, R::Error>> {
904 // Just a quick check if the pipe is closed, which is why a relaxed load is okay.
905 if self.inner.closed.load(Ordering::Relaxed) {
906 return Poll::Ready(Ok(0));
907 }
908
909 // Calculates the distance between two indices.
910 let cap = self.inner.cap;
911 let distance = |a: usize, b: usize| {
912 if a <= b {
913 b - a
914 } else {
915 2 * cap - (a - b)
916 }
917 };
918
919 // If the pipe appears to be full...
920 if distance(self.head, self.tail) == cap {
921 // Reload the head in case it's become stale.
922 self.head = self.inner.head.load(Ordering::Acquire);
923
924 // If the pipe is now really empty...
925 if distance(self.head, self.tail) == cap {
926 // Register the waker.
927 if let Some(cx) = cx.as_mut() {
928 self.inner.writer.register(cx.waker());
929 }
930 atomic::fence(Ordering::SeqCst);
931
932 // Reload the head after registering the waker.
933 self.head = self.inner.head.load(Ordering::Acquire);
934
935 // If the pipe is still full...
936 if distance(self.head, self.tail) == cap {
937 // Check whether the pipe is closed or just full.
938 if self.inner.closed.load(Ordering::Relaxed) {
939 return Poll::Ready(Ok(0));
940 } else {
941 return Poll::Pending;
942 }
943 }
944 }
945 }
946
947 // The pipe is not full so remove the waker.
948 self.inner.writer.take();
949
950 // Yield with some small probability - this improves fairness.
951 if let Some(cx) = cx {
952 ready!(maybe_yield(&mut self.rng, cx));
953 }
954
955 // Given an index in `0..2*cap`, returns the real index in `0..cap`.
956 let real_index = |i: usize| {
957 if i < cap {
958 i
959 } else {
960 i - cap
961 }
962 };
963
964 // Number of bytes written so far.
965 let mut count = 0;
966
967 loop {
968 // Calculate how many bytes to write in this iteration.
969 let n = (128 * 1024) // Not too many bytes in one go - better to wake the reader soon!
970 .min(self.zeroed_until * 2 + 4096) // Don't zero too many bytes when starting.
971 .min(cap - distance(self.head, self.tail)) // No more than space in the pipe.
972 .min(cap - real_index(self.tail)); // Don't go past the buffer boundary.
973
974 // Create a slice of available space in the pipe buffer.
975 let pipe_slice_mut = unsafe {
976 let from = real_index(self.tail);
977 let to = from + n;
978
979 // Make sure all bytes in the slice are initialized.
980 if self.zeroed_until < to {
981 self.inner
982 .buffer
983 .add(self.zeroed_until)
984 .write_bytes(0u8, to - self.zeroed_until);
985 self.zeroed_until = to;
986 }
987
988 slice::from_raw_parts_mut(self.inner.buffer.add(from), n)
989 };
990
991 // Copy bytes from `src` into the piper buffer.
992 let n = src.read(pipe_slice_mut)?;
993 count += n;
994
995 // If the pipe is full or closed, or `src` is empty, return.
996 if n == 0 || self.inner.closed.load(Ordering::Relaxed) {
997 return Poll::Ready(Ok(count));
998 }
999
1000 // Move the tail forward.
1001 if self.tail + n < 2 * cap {
1002 self.tail += n;
1003 } else {
1004 self.tail = 0;
1005 }
1006
1007 // Store the current tail index.
1008 self.inner.tail.store(self.tail, Ordering::Release);
1009
1010 // Wake the reader because the pipe is not empty.
1011 self.inner.reader.wake();
1012 }
1013 }
1014}
1015
1016#[cfg(feature = "std")]
1017impl AsyncWrite for Writer {
1018 fn poll_write(
1019 mut self: Pin<&mut Self>,
1020 cx: &mut Context<'_>,
1021 buf: &[u8],
1022 ) -> Poll<io::Result<usize>> {
1023 self.poll_fill_bytes(cx, buf).map(Ok)
1024 }
1025
1026 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1027 // Nothing to flush.
1028 Poll::Ready(Ok(()))
1029 }
1030
1031 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1032 // Set the closed flag.
1033 self.inner.closed.store(true, Ordering::Release);
1034
1035 // Wake up any tasks that may be waiting on the pipe.
1036 self.inner.reader.wake();
1037 self.inner.writer.wake();
1038
1039 // The pipe is now closed.
1040 Poll::Ready(Ok(()))
1041 }
1042}
1043
1044/// A trait for reading bytes into a pipe.
1045trait ReadLike {
1046 /// The error type.
1047 type Error;
1048
1049 /// Reads bytes into the given buffer.
1050 fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error>;
1051}
1052
1053#[cfg(feature = "std")]
1054impl<R: Read> ReadLike for R {
1055 type Error = io::Error;
1056
1057 fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
1058 Read::read(self, buf)
1059 }
1060}
1061
1062/// Implements `no_std` reading around a byte slice.
1063struct ReadBytes<'a>(&'a [u8]);
1064
1065impl ReadLike for ReadBytes<'_> {
1066 type Error = Infallible;
1067
1068 fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
1069 let n: usize = self.0.len().min(buf.len());
1070 buf[..n].copy_from_slice(&self.0[..n]);
1071 self.0 = &self.0[n..];
1072 Ok(n)
1073 }
1074}
1075
1076/// A trait for writing bytes from a pipe.
1077trait WriteLike {
1078 /// The error type.
1079 type Error;
1080
1081 /// Writes bytes from the given buffer.
1082 fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error>;
1083}
1084
1085#[cfg(feature = "std")]
1086impl<W: Write> WriteLike for W {
1087 type Error = io::Error;
1088
1089 fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
1090 Write::write(self, buf)
1091 }
1092}
1093
1094/// Implements `no_std` writing around a byte slice.
1095struct WriteBytes<'a>(&'a mut [u8]);
1096
1097impl WriteLike for WriteBytes<'_> {
1098 type Error = Infallible;
1099
1100 fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
1101 let n: usize = self.0.len().min(buf.len());
1102 self.0[..n].copy_from_slice(&buf[..n]);
1103
1104 // mem::take() is not available on 1.36
1105 #[allow(clippy::mem_replace_with_default)]
1106 {
1107 let slice: &mut [u8] = mem::replace(&mut self.0, &mut []);
1108 self.0 = &mut slice[n..];
1109 }
1110
1111 Ok(n)
1112 }
1113}
1114
1115/// Yield with some small probability.
1116fn maybe_yield(rng: &mut fastrand::Rng, cx: &mut Context<'_>) -> Poll<()> {
1117 if rng.usize(..100) == 0 {
1118 cx.waker().wake_by_ref();
1119 Poll::Pending
1120 } else {
1121 Poll::Ready(())
1122 }
1123}
1124
1125/// Get a random number generator.
1126#[cfg(feature = "std")]
1127#[inline]
1128fn rng() -> fastrand::Rng {
1129 fastrand::Rng::new()
1130}
1131
1132/// Get a random number generator.
1133///
1134/// This uses a fixed seed due to the lack of a good RNG in `no_std` environments.
1135#[cfg(not(feature = "std"))]
1136#[inline]
1137fn rng() -> fastrand::Rng {
1138 // Chosen by fair roll of the dice.
1139 fastrand::Rng::with_seed(0x7e9b496634c97ec6)
1140}
1141
1142/// ```
1143/// use piper::{Reader, Writer};
1144/// fn _send_sync<T: Send + Sync>() {}
1145/// _send_sync::<Reader>();
1146/// _send_sync::<Writer>();
1147/// ```
1148fn _assert_send_sync() {}
1149
1150mod sync {
1151 #[cfg(not(feature = "portable-atomic"))]
1152 pub use core::sync::atomic;
1153
1154 #[cfg(not(feature = "portable-atomic"))]
1155 pub use alloc::sync::Arc;
1156
1157 #[cfg(feature = "portable-atomic")]
1158 pub use portable_atomic_crate as atomic;
1159
1160 #[cfg(feature = "portable-atomic")]
1161 pub use portable_atomic_util::Arc;
1162}
1163