| 1 | //! A bounded single-producer single-consumer pipe. |
| 2 | //! |
| 3 | //! This crate provides a ring buffer that can be asynchronously read from and written to. It is |
| 4 | //! created via the [`pipe`] function, which returns a pair of [`Reader`] and [`Writer`] handles. |
| 5 | //! They implement the [`AsyncRead`] and [`AsyncWrite`] traits, respectively. |
| 6 | //! |
| 7 | //! The handles are single-producer/single-consumer; to clarify, they cannot be cloned and need `&mut` |
| 8 | //! access to read or write to them. If multiple-producer/multiple-consumer handles are needed, |
| 9 | //! consider wrapping them in an `Arc<Mutex<...>>` or similar. |
| 10 | //! |
| 11 | //! When the sender is dropped, remaining bytes in the pipe can still be read. After that, attempts |
| 12 | //! to read will result in `Ok(0)`, i.e. they will always 'successfully' read 0 bytes. |
| 13 | //! |
| 14 | //! When the receiver is dropped, the pipe is closed and no more bytes and be written into it. |
| 15 | //! Further writes will result in `Ok(0)`, i.e. they will always 'successfully' write 0 bytes. |
| 16 | //! |
| 17 | //! # Version 0.2.0 Notes |
| 18 | //! |
| 19 | //! Previously, this crate contained other synchronization primitives, such as bounded channels, locks, |
| 20 | //! and event listeners. These have been split out into their own crates: |
| 21 | //! |
| 22 | //! - [`async-channel`](https://docs.rs/async-channel) |
| 23 | //! - [`async-dup`](https://docs.rs/async-dup) |
| 24 | //! - [`async-lock`](https://docs.rs/async-lock) |
| 25 | //! - [`async-mutex`](https://docs.rs/async-mutex) |
| 26 | //! - [`event-listener`](https://docs.rs/event-listener) |
| 27 | //! |
| 28 | //! # Examples |
| 29 | //! |
| 30 | //! ## Asynchronous Tasks |
| 31 | //! |
| 32 | //! Communicate between asynchronous tasks, potentially on other threads. |
| 33 | //! |
| 34 | //! ``` |
| 35 | //! use async_channel::unbounded; |
| 36 | //! use async_executor::Executor; |
| 37 | //! use easy_parallel::Parallel; |
| 38 | //! use futures_lite::{future, prelude::*}; |
| 39 | //! use std::time::Duration; |
| 40 | //! |
| 41 | //! # if cfg!(miri) { return; } |
| 42 | //! |
| 43 | //! // Create a pair of handles. |
| 44 | //! let (mut reader, mut writer) = piper::pipe(1024); |
| 45 | //! |
| 46 | //! // Create the executor. |
| 47 | //! let ex = Executor::new(); |
| 48 | //! let (signal, shutdown) = unbounded::<()>(); |
| 49 | //! |
| 50 | //! // Spawn a detached task for random data to the pipe. |
| 51 | //! let writer = ex.spawn(async move { |
| 52 | //! for _ in 0..1_000 { |
| 53 | //! // Generate 8 random numnbers. |
| 54 | //! let random = fastrand::u64(..).to_le_bytes(); |
| 55 | //! |
| 56 | //! // Write them to the pipe. |
| 57 | //! writer.write_all(&random).await.unwrap(); |
| 58 | //! |
| 59 | //! // Wait a bit. |
| 60 | //! async_io::Timer::after(Duration::from_millis(5)).await; |
| 61 | //! } |
| 62 | //! |
| 63 | //! // Drop the writer to close the pipe. |
| 64 | //! drop(writer); |
| 65 | //! }); |
| 66 | //! |
| 67 | //! // Detach the task so that it runs in the background. |
| 68 | //! writer.detach(); |
| 69 | //! |
| 70 | //! // Spawn a task for reading from the pipe. |
| 71 | //! let reader = ex.spawn(async move { |
| 72 | //! let mut buf = vec![]; |
| 73 | //! |
| 74 | //! // Read all bytes from the pipe. |
| 75 | //! reader.read_to_end(&mut buf).await.unwrap(); |
| 76 | //! |
| 77 | //! println!("Random data: {:#?}" , buf); |
| 78 | //! }); |
| 79 | //! |
| 80 | //! Parallel::new() |
| 81 | //! // Run four executor threads. |
| 82 | //! .each(0..4, |_| future::block_on(ex.run(shutdown.recv()))) |
| 83 | //! // Run the main future on the current thread. |
| 84 | //! .finish(|| future::block_on(async { |
| 85 | //! // Wait for the reader to finish. |
| 86 | //! reader.await; |
| 87 | //! |
| 88 | //! // Signal the executor threads to shut down. |
| 89 | //! drop(signal); |
| 90 | //! })); |
| 91 | //! ``` |
| 92 | //! |
| 93 | //! ## Blocking I/O |
| 94 | //! |
| 95 | //! File I/O is blocking; therefore, in `async` code, you must run it on another thread. This example |
| 96 | //! spawns another thread for reading a file and writing it to a pipe. |
| 97 | //! |
| 98 | //! ```no_run |
| 99 | //! use futures_lite::{future, prelude::*}; |
| 100 | //! use std::fs::File; |
| 101 | //! use std::io::prelude::*; |
| 102 | //! use std::thread; |
| 103 | //! |
| 104 | //! // Create a pair of handles. |
| 105 | //! let (mut r, mut w) = piper::pipe(1024); |
| 106 | //! |
| 107 | //! // Spawn a thread for reading a file. |
| 108 | //! thread::spawn(move || { |
| 109 | //! let mut file = File::open("Cargo.toml" ).unwrap(); |
| 110 | //! |
| 111 | //! // Read the file into a buffer. |
| 112 | //! let mut buf = [0u8; 16384]; |
| 113 | //! future::block_on(async move { |
| 114 | //! loop { |
| 115 | //! // Read a chunk of bytes from the file. |
| 116 | //! // Blocking is okay here, since this is a separate thread. |
| 117 | //! let n = file.read(&mut buf).unwrap(); |
| 118 | //! if n == 0 { |
| 119 | //! break; |
| 120 | //! } |
| 121 | //! |
| 122 | //! // Write the chunk to the pipe. |
| 123 | //! w.write_all(&buf[..n]).await.unwrap(); |
| 124 | //! } |
| 125 | //! |
| 126 | //! // Close the pipe. |
| 127 | //! drop(w); |
| 128 | //! }); |
| 129 | //! }); |
| 130 | //! |
| 131 | //! # future::block_on(async move { |
| 132 | //! // Read bytes from the pipe. |
| 133 | //! let mut buf = vec![]; |
| 134 | //! r.read_to_end(&mut buf).await.unwrap(); |
| 135 | //! |
| 136 | //! println!("Read {} bytes" , buf.len()); |
| 137 | //! # }); |
| 138 | //! ``` |
| 139 | //! |
| 140 | //! However, the lower-level [`poll_fill`] and [`poll_drain`] methods take `impl Read` and `impl Write` |
| 141 | //! arguments, respectively. This allows you to skip the buffer entirely and read/write directly from |
| 142 | //! the file into the pipe. This approach should be preferred when possible, as it avoids an extra |
| 143 | //! copy. |
| 144 | //! |
| 145 | //! ```no_run |
| 146 | //! # use futures_lite::future; |
| 147 | //! # use std::fs::File; |
| 148 | //! # let mut file: File = unimplemented!(); |
| 149 | //! # let mut w: piper::Writer = unimplemented!(); |
| 150 | //! // In the `future::block_on` call above... |
| 151 | //! # future::block_on(async move { |
| 152 | //! loop { |
| 153 | //! let n = future::poll_fn(|cx| w.poll_fill(cx, &mut file)).await.unwrap(); |
| 154 | //! if n == 0 { |
| 155 | //! break; |
| 156 | //! } |
| 157 | //! } |
| 158 | //! # }); |
| 159 | //! ``` |
| 160 | //! |
| 161 | //! The [`blocking`] crate is preferred in this use case, since it uses more efficient strategies for |
| 162 | //! thread management and pipes. |
| 163 | //! |
| 164 | //! [`poll_fill`]: struct.Writer.html#method.poll_fill |
| 165 | //! [`poll_drain`]: struct.Reader.html#method.poll_drain |
| 166 | //! [`blocking`]: https://docs.rs/blocking |
| 167 | |
| 168 | #![cfg_attr (not(feature = "std" ), no_std)] |
| 169 | #![forbid (missing_docs)] |
| 170 | #![doc ( |
| 171 | html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" |
| 172 | )] |
| 173 | #![doc ( |
| 174 | html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" |
| 175 | )] |
| 176 | |
| 177 | extern crate alloc; |
| 178 | |
| 179 | use core::convert::Infallible; |
| 180 | use core::mem; |
| 181 | use core::slice; |
| 182 | use core::task::{Context, Poll}; |
| 183 | |
| 184 | use alloc::vec::Vec; |
| 185 | |
| 186 | use sync::atomic::{self, AtomicBool, AtomicUsize, Ordering}; |
| 187 | use sync::Arc; |
| 188 | |
| 189 | #[cfg (feature = "std" )] |
| 190 | use std::{ |
| 191 | io::{self, Read, Write}, |
| 192 | pin::Pin, |
| 193 | }; |
| 194 | |
| 195 | use atomic_waker::AtomicWaker; |
| 196 | |
| 197 | #[cfg (feature = "std" )] |
| 198 | use futures_io::{AsyncRead, AsyncWrite}; |
| 199 | |
| 200 | macro_rules! ready { |
| 201 | ($e:expr) => {{ |
| 202 | match $e { |
| 203 | Poll::Ready(t) => t, |
| 204 | Poll::Pending => return Poll::Pending, |
| 205 | } |
| 206 | }}; |
| 207 | } |
| 208 | |
| 209 | /// Creates a bounded single-producer single-consumer pipe. |
| 210 | /// |
| 211 | /// A pipe is a ring buffer of `cap` bytes that can be asynchronously read from and written to. |
| 212 | /// |
| 213 | /// See the [crate-level documentation](index.html) for more details. |
| 214 | /// |
| 215 | /// # Panics |
| 216 | /// |
| 217 | /// This function panics if `cap` is 0 or if `cap * 2` overflows a `usize`. |
| 218 | #[allow (clippy::incompatible_msrv)] // false positive: https://github.com/rust-lang/rust-clippy/issues/12280 |
| 219 | pub fn pipe(cap: usize) -> (Reader, Writer) { |
| 220 | assert!(cap > 0, "capacity must be positive" ); |
| 221 | assert!(cap.checked_mul(2).is_some(), "capacity is too large" ); |
| 222 | |
| 223 | // Allocate the ring buffer. |
| 224 | let mut v = Vec::with_capacity(cap); |
| 225 | let buffer = v.as_mut_ptr(); |
| 226 | mem::forget(v); |
| 227 | |
| 228 | let inner = Arc::new(Pipe { |
| 229 | head: AtomicUsize::new(0), |
| 230 | tail: AtomicUsize::new(0), |
| 231 | reader: AtomicWaker::new(), |
| 232 | writer: AtomicWaker::new(), |
| 233 | closed: AtomicBool::new(false), |
| 234 | buffer, |
| 235 | cap, |
| 236 | }); |
| 237 | |
| 238 | // Use a random number generator to randomize fair yielding behavior. |
| 239 | let mut rng = rng(); |
| 240 | |
| 241 | let r = Reader { |
| 242 | inner: inner.clone(), |
| 243 | head: 0, |
| 244 | tail: 0, |
| 245 | rng: rng.fork(), |
| 246 | }; |
| 247 | |
| 248 | let w = Writer { |
| 249 | inner, |
| 250 | head: 0, |
| 251 | tail: 0, |
| 252 | zeroed_until: 0, |
| 253 | rng, |
| 254 | }; |
| 255 | |
| 256 | (r, w) |
| 257 | } |
| 258 | |
| 259 | /// The reading side of a pipe. |
| 260 | /// |
| 261 | /// This type is created by the [`pipe`] function. See its documentation for more details. |
| 262 | pub struct Reader { |
| 263 | /// The inner ring buffer. |
| 264 | inner: Arc<Pipe>, |
| 265 | |
| 266 | /// The head index, moved by the reader, in the range `0..2*cap`. |
| 267 | /// |
| 268 | /// This index always matches `inner.head`. |
| 269 | head: usize, |
| 270 | |
| 271 | /// The tail index, moved by the writer, in the range `0..2*cap`. |
| 272 | /// |
| 273 | /// This index is a snapshot of `index.tail` that might become stale at any point. |
| 274 | tail: usize, |
| 275 | |
| 276 | /// Random number generator. |
| 277 | rng: fastrand::Rng, |
| 278 | } |
| 279 | |
| 280 | /// The writing side of a pipe. |
| 281 | /// |
| 282 | /// This type is created by the [`pipe`] function. See its documentation for more details. |
| 283 | pub struct Writer { |
| 284 | /// The inner ring buffer. |
| 285 | inner: Arc<Pipe>, |
| 286 | |
| 287 | /// The head index, moved by the reader, in the range `0..2*cap`. |
| 288 | /// |
| 289 | /// This index is a snapshot of `index.head` that might become stale at any point. |
| 290 | head: usize, |
| 291 | |
| 292 | /// The tail index, moved by the writer, in the range `0..2*cap`. |
| 293 | /// |
| 294 | /// This index always matches `inner.tail`. |
| 295 | tail: usize, |
| 296 | |
| 297 | /// How many bytes at the beginning of the buffer have been zeroed. |
| 298 | /// |
| 299 | /// The pipe allocates an uninitialized buffer, and we must be careful about passing |
| 300 | /// uninitialized data to user code. Zeroing the buffer right after allocation would be too |
| 301 | /// expensive, so we zero it in smaller chunks as the writer makes progress. |
| 302 | zeroed_until: usize, |
| 303 | |
| 304 | /// Random number generator. |
| 305 | rng: fastrand::Rng, |
| 306 | } |
| 307 | |
| 308 | /// The inner ring buffer. |
| 309 | /// |
| 310 | /// Head and tail indices are in the range `0..2*cap`, even though they really map onto the |
| 311 | /// `0..cap` range. The distance between head and tail indices is never more than `cap`. |
| 312 | /// |
| 313 | /// The reason why indices are not in the range `0..cap` is because we need to distinguish between |
| 314 | /// the pipe being empty and being full. If head and tail were in `0..cap`, then `head == tail` |
| 315 | /// could mean the pipe is either empty or full, but we don't know which! |
| 316 | struct Pipe { |
| 317 | /// The head index, moved by the reader, in the range `0..2*cap`. |
| 318 | head: AtomicUsize, |
| 319 | |
| 320 | /// The tail index, moved by the writer, in the range `0..2*cap`. |
| 321 | tail: AtomicUsize, |
| 322 | |
| 323 | /// A waker representing the blocked reader. |
| 324 | reader: AtomicWaker, |
| 325 | |
| 326 | /// A waker representing the blocked writer. |
| 327 | writer: AtomicWaker, |
| 328 | |
| 329 | /// Set to `true` if the reader or writer was dropped. |
| 330 | closed: AtomicBool, |
| 331 | |
| 332 | /// The byte buffer. |
| 333 | buffer: *mut u8, |
| 334 | |
| 335 | /// The buffer capacity. |
| 336 | cap: usize, |
| 337 | } |
| 338 | |
| 339 | unsafe impl Sync for Pipe {} |
| 340 | unsafe impl Send for Pipe {} |
| 341 | |
| 342 | impl Drop for Pipe { |
| 343 | fn drop(&mut self) { |
| 344 | // Deallocate the byte buffer. |
| 345 | unsafe { |
| 346 | Vec::from_raw_parts(self.buffer, length:0, self.cap); |
| 347 | } |
| 348 | } |
| 349 | } |
| 350 | |
| 351 | impl Drop for Reader { |
| 352 | fn drop(&mut self) { |
| 353 | // Dropping closes the pipe and then wakes the writer. |
| 354 | self.inner.closed.store(val:true, order:Ordering::SeqCst); |
| 355 | self.inner.writer.wake(); |
| 356 | } |
| 357 | } |
| 358 | |
| 359 | impl Drop for Writer { |
| 360 | fn drop(&mut self) { |
| 361 | // Dropping closes the pipe and then wakes the reader. |
| 362 | self.inner.closed.store(val:true, order:Ordering::SeqCst); |
| 363 | self.inner.reader.wake(); |
| 364 | } |
| 365 | } |
| 366 | |
| 367 | impl Pipe { |
| 368 | /// Get the length of the data in the pipe. |
| 369 | fn len(&self) -> usize { |
| 370 | let head: usize = self.head.load(order:Ordering::Acquire); |
| 371 | let tail: usize = self.tail.load(order:Ordering::Acquire); |
| 372 | |
| 373 | if head <= tail { |
| 374 | tail - head |
| 375 | } else { |
| 376 | (2 * self.cap) - (head - tail) |
| 377 | } |
| 378 | } |
| 379 | } |
| 380 | |
| 381 | impl Reader { |
| 382 | /// Gets the total length of the data in the pipe. |
| 383 | /// |
| 384 | /// This method returns the number of bytes that have been written into the pipe but haven't been |
| 385 | /// read yet. |
| 386 | /// |
| 387 | /// # Examples |
| 388 | /// |
| 389 | /// ``` |
| 390 | /// let (mut reader, mut writer) = piper::pipe(10); |
| 391 | /// let _ = writer.try_fill(&[0u8; 5]); |
| 392 | /// assert_eq!(reader.len(), 5); |
| 393 | /// ``` |
| 394 | pub fn len(&self) -> usize { |
| 395 | self.inner.len() |
| 396 | } |
| 397 | |
| 398 | /// Tell whether or not the pipe is empty. |
| 399 | /// |
| 400 | /// This method returns `true` if the pipe is empty, and `false` otherwise. |
| 401 | /// |
| 402 | /// # Examples |
| 403 | /// |
| 404 | /// ``` |
| 405 | /// let (mut reader, mut writer) = piper::pipe(10); |
| 406 | /// assert!(reader.is_empty()); |
| 407 | /// let _ = writer.try_fill(&[0u8; 5]); |
| 408 | /// assert!(!reader.is_empty()); |
| 409 | /// ``` |
| 410 | pub fn is_empty(&self) -> bool { |
| 411 | self.inner.len() == 0 |
| 412 | } |
| 413 | |
| 414 | /// Gets the total capacity of the pipe. |
| 415 | /// |
| 416 | /// This method returns the number of bytes that the pipe can hold at a time. |
| 417 | /// |
| 418 | /// # Examples |
| 419 | /// |
| 420 | /// ``` |
| 421 | /// # futures_lite::future::block_on(async { |
| 422 | /// let (reader, _) = piper::pipe(10); |
| 423 | /// assert_eq!(reader.capacity(), 10); |
| 424 | /// # }); |
| 425 | /// ``` |
| 426 | pub fn capacity(&self) -> usize { |
| 427 | self.inner.cap |
| 428 | } |
| 429 | |
| 430 | /// Tell whether or not the pipe is full. |
| 431 | /// |
| 432 | /// The pipe is full if the number of bytes written into it is equal to its capacity. At this point, |
| 433 | /// writes will block until some data is read from the pipe. |
| 434 | /// |
| 435 | /// This method returns `true` if the pipe is full, and `false` otherwise. |
| 436 | /// |
| 437 | /// # Examples |
| 438 | /// |
| 439 | /// ``` |
| 440 | /// let (mut reader, mut writer) = piper::pipe(10); |
| 441 | /// assert!(!reader.is_full()); |
| 442 | /// let _ = writer.try_fill(&[0u8; 10]); |
| 443 | /// assert!(reader.is_full()); |
| 444 | /// let _ = reader.try_drain(&mut [0u8; 5]); |
| 445 | /// assert!(!reader.is_full()); |
| 446 | /// ``` |
| 447 | pub fn is_full(&self) -> bool { |
| 448 | self.inner.len() == self.inner.cap |
| 449 | } |
| 450 | |
| 451 | /// Tell whether or not the pipe is closed. |
| 452 | /// |
| 453 | /// The pipe is closed if either the reader or the writer has been dropped. At this point, attempting |
| 454 | /// to write into the pipe will return `Poll::Ready(Ok(0))` and attempting to read from the pipe after |
| 455 | /// any previously written bytes are read will return `Poll::Ready(Ok(0))`. |
| 456 | /// |
| 457 | /// # Examples |
| 458 | /// |
| 459 | /// ``` |
| 460 | /// # futures_lite::future::block_on(async { |
| 461 | /// let (mut reader, mut writer) = piper::pipe(10); |
| 462 | /// assert!(!reader.is_closed()); |
| 463 | /// drop(writer); |
| 464 | /// assert!(reader.is_closed()); |
| 465 | /// # }); |
| 466 | /// ``` |
| 467 | pub fn is_closed(&self) -> bool { |
| 468 | self.inner.closed.load(Ordering::SeqCst) |
| 469 | } |
| 470 | |
| 471 | /// Reads bytes from this reader and writes into blocking `dest`. |
| 472 | /// |
| 473 | /// This method reads directly from the pipe's internal buffer into `dest`. This avoids an extra copy, |
| 474 | /// but it may block the thread if `dest` blocks. |
| 475 | /// |
| 476 | /// If the pipe is empty, this method returns `Poll::Pending`. If the pipe is closed, this method |
| 477 | /// returns `Poll::Ready(Ok(0))`. Errors in `dest` are bubbled up through `Poll::Ready(Err(e))`. |
| 478 | /// Otherwise, this method returns `Poll::Ready(Ok(n))` where `n` is the number of bytes written. |
| 479 | /// |
| 480 | /// This method is only available when the `std` feature is enabled. For `no_std` environments, |
| 481 | /// consider using [`poll_drain_bytes`] instead. |
| 482 | /// |
| 483 | /// [`poll_drain_bytes`]: #method.poll_drain_bytes |
| 484 | /// |
| 485 | /// # Examples |
| 486 | /// |
| 487 | /// ``` |
| 488 | /// use futures_lite::{future, prelude::*}; |
| 489 | /// # future::block_on(async { |
| 490 | /// |
| 491 | /// let (mut r, mut w) = piper::pipe(1024); |
| 492 | /// |
| 493 | /// // Write some data to the pipe. |
| 494 | /// w.write_all(b"hello world" ).await.unwrap(); |
| 495 | /// |
| 496 | /// // Try reading from the pipe. |
| 497 | /// let mut buf = [0; 1024]; |
| 498 | /// let n = future::poll_fn(|cx| r.poll_drain(cx, &mut buf[..])).await.unwrap(); |
| 499 | /// |
| 500 | /// // The data was written to the buffer. |
| 501 | /// assert_eq!(&buf[..n], b"hello world" ); |
| 502 | /// # }); |
| 503 | /// ``` |
| 504 | #[cfg (feature = "std" )] |
| 505 | pub fn poll_drain( |
| 506 | &mut self, |
| 507 | cx: &mut Context<'_>, |
| 508 | dest: impl Write, |
| 509 | ) -> Poll<io::Result<usize>> { |
| 510 | self.drain_inner(Some(cx), dest) |
| 511 | } |
| 512 | |
| 513 | /// Reads bytes from this reader. |
| 514 | /// |
| 515 | /// Rather than taking a `Write` trait object, this method takes a slice of bytes to write into. |
| 516 | /// Because of this, it is infallible and can be used in `no_std` environments. |
| 517 | /// |
| 518 | /// The same conditions that apply to [`poll_drain`] apply to this method. |
| 519 | /// |
| 520 | /// [`poll_drain`]: #method.poll_drain |
| 521 | /// |
| 522 | /// # Examples |
| 523 | /// |
| 524 | /// ``` |
| 525 | /// use futures_lite::{future, prelude::*}; |
| 526 | /// # future::block_on(async { |
| 527 | /// let (mut r, mut w) = piper::pipe(1024); |
| 528 | /// |
| 529 | /// // Write some data to the pipe. |
| 530 | /// w.write_all(b"hello world" ).await.unwrap(); |
| 531 | /// |
| 532 | /// // Try reading from the pipe. |
| 533 | /// let mut buf = [0; 1024]; |
| 534 | /// let n = future::poll_fn(|cx| r.poll_drain_bytes(cx, &mut buf[..])).await; |
| 535 | /// |
| 536 | /// // The data was written to the buffer. |
| 537 | /// assert_eq!(&buf[..n], b"hello world" ); |
| 538 | /// # }); |
| 539 | /// ``` |
| 540 | pub fn poll_drain_bytes(&mut self, cx: &mut Context<'_>, dest: &mut [u8]) -> Poll<usize> { |
| 541 | match self.drain_inner(Some(cx), WriteBytes(dest)) { |
| 542 | Poll::Ready(Ok(n)) => Poll::Ready(n), |
| 543 | Poll::Ready(Err(e)) => match e {}, |
| 544 | Poll::Pending => Poll::Pending, |
| 545 | } |
| 546 | } |
| 547 | |
| 548 | /// Tries to read bytes from this reader. |
| 549 | /// |
| 550 | /// Returns the total number of bytes that were read from this reader. |
| 551 | /// |
| 552 | /// # Examples |
| 553 | /// |
| 554 | /// ``` |
| 555 | /// let (mut r, mut w) = piper::pipe(1024); |
| 556 | /// |
| 557 | /// // `try_drain()` returns 0 off the bat. |
| 558 | /// let mut buf = [0; 10]; |
| 559 | /// assert_eq!(r.try_drain(&mut buf), 0); |
| 560 | /// |
| 561 | /// // After a write it returns the data. |
| 562 | /// w.try_fill(&[0, 1, 2, 3, 4]); |
| 563 | /// assert_eq!(r.try_drain(&mut buf), 5); |
| 564 | /// assert_eq!(&buf[..5], &[0, 1, 2, 3, 4]); |
| 565 | /// ``` |
| 566 | pub fn try_drain(&mut self, dest: &mut [u8]) -> usize { |
| 567 | match self.drain_inner(None, WriteBytes(dest)) { |
| 568 | Poll::Ready(Ok(n)) => n, |
| 569 | Poll::Ready(Err(e)) => match e {}, |
| 570 | Poll::Pending => 0, |
| 571 | } |
| 572 | } |
| 573 | |
| 574 | /// Reads bytes from this reader and writes into blocking `dest`. |
| 575 | #[inline ] |
| 576 | fn drain_inner<W: WriteLike>( |
| 577 | &mut self, |
| 578 | mut cx: Option<&mut Context<'_>>, |
| 579 | mut dest: W, |
| 580 | ) -> Poll<Result<usize, W::Error>> { |
| 581 | let cap = self.inner.cap; |
| 582 | |
| 583 | // Calculates the distance between two indices. |
| 584 | let distance = |a: usize, b: usize| { |
| 585 | if a <= b { |
| 586 | b - a |
| 587 | } else { |
| 588 | 2 * cap - (a - b) |
| 589 | } |
| 590 | }; |
| 591 | |
| 592 | // If the pipe appears to be empty... |
| 593 | if distance(self.head, self.tail) == 0 { |
| 594 | // Reload the tail in case it's become stale. |
| 595 | self.tail = self.inner.tail.load(Ordering::Acquire); |
| 596 | |
| 597 | // If the pipe is now really empty... |
| 598 | if distance(self.head, self.tail) == 0 { |
| 599 | // Register the waker. |
| 600 | if let Some(cx) = cx.as_mut() { |
| 601 | self.inner.reader.register(cx.waker()); |
| 602 | } |
| 603 | atomic::fence(Ordering::SeqCst); |
| 604 | |
| 605 | // Reload the tail after registering the waker. |
| 606 | self.tail = self.inner.tail.load(Ordering::Acquire); |
| 607 | |
| 608 | // If the pipe is still empty... |
| 609 | if distance(self.head, self.tail) == 0 { |
| 610 | // Check whether the pipe is closed or just empty. |
| 611 | if self.inner.closed.load(Ordering::Relaxed) { |
| 612 | return Poll::Ready(Ok(0)); |
| 613 | } else { |
| 614 | return Poll::Pending; |
| 615 | } |
| 616 | } |
| 617 | } |
| 618 | } |
| 619 | |
| 620 | // The pipe is not empty so remove the waker. |
| 621 | self.inner.reader.take(); |
| 622 | |
| 623 | // Yield with some small probability - this improves fairness. |
| 624 | if let Some(cx) = cx { |
| 625 | ready!(maybe_yield(&mut self.rng, cx)); |
| 626 | } |
| 627 | |
| 628 | // Given an index in `0..2*cap`, returns the real index in `0..cap`. |
| 629 | let real_index = |i: usize| { |
| 630 | if i < cap { |
| 631 | i |
| 632 | } else { |
| 633 | i - cap |
| 634 | } |
| 635 | }; |
| 636 | |
| 637 | // Number of bytes read so far. |
| 638 | let mut count = 0; |
| 639 | |
| 640 | loop { |
| 641 | // Calculate how many bytes to read in this iteration. |
| 642 | let n = (128 * 1024) // Not too many bytes in one go - better to wake the writer soon! |
| 643 | .min(distance(self.head, self.tail)) // No more than bytes in the pipe. |
| 644 | .min(cap - real_index(self.head)); // Don't go past the buffer boundary. |
| 645 | |
| 646 | // Create a slice of data in the pipe buffer. |
| 647 | let pipe_slice = |
| 648 | unsafe { slice::from_raw_parts(self.inner.buffer.add(real_index(self.head)), n) }; |
| 649 | |
| 650 | // Copy bytes from the pipe buffer into `dest`. |
| 651 | let n = dest.write(pipe_slice)?; |
| 652 | count += n; |
| 653 | |
| 654 | // If pipe is empty or `dest` is full, return. |
| 655 | if n == 0 { |
| 656 | return Poll::Ready(Ok(count)); |
| 657 | } |
| 658 | |
| 659 | // Move the head forward. |
| 660 | if self.head + n < 2 * cap { |
| 661 | self.head += n; |
| 662 | } else { |
| 663 | self.head = 0; |
| 664 | } |
| 665 | |
| 666 | // Store the current head index. |
| 667 | self.inner.head.store(self.head, Ordering::Release); |
| 668 | |
| 669 | // Wake the writer because the pipe is not full. |
| 670 | self.inner.writer.wake(); |
| 671 | } |
| 672 | } |
| 673 | } |
| 674 | |
| 675 | #[cfg (feature = "std" )] |
| 676 | impl AsyncRead for Reader { |
| 677 | fn poll_read( |
| 678 | mut self: Pin<&mut Self>, |
| 679 | cx: &mut Context<'_>, |
| 680 | buf: &mut [u8], |
| 681 | ) -> Poll<io::Result<usize>> { |
| 682 | self.poll_drain_bytes(cx, dest:buf).map(Ok) |
| 683 | } |
| 684 | } |
| 685 | |
| 686 | impl Writer { |
| 687 | /// Gets the total length of the data in the pipe. |
| 688 | /// |
| 689 | /// This method returns the number of bytes that have been written into the pipe but haven't been |
| 690 | /// read yet. |
| 691 | /// |
| 692 | /// # Examples |
| 693 | /// |
| 694 | /// ``` |
| 695 | /// let (_reader, mut writer) = piper::pipe(10); |
| 696 | /// let _ = writer.try_fill(&[0u8; 5]); |
| 697 | /// assert_eq!(writer.len(), 5); |
| 698 | /// ``` |
| 699 | pub fn len(&self) -> usize { |
| 700 | self.inner.len() |
| 701 | } |
| 702 | |
| 703 | /// Tell whether or not the pipe is empty. |
| 704 | /// |
| 705 | /// This method returns `true` if the pipe is empty, and `false` otherwise. |
| 706 | /// |
| 707 | /// # Examples |
| 708 | /// |
| 709 | /// ``` |
| 710 | /// let (_reader, mut writer) = piper::pipe(10); |
| 711 | /// assert!(writer.is_empty()); |
| 712 | /// let _ = writer.try_fill(&[0u8; 5]); |
| 713 | /// assert!(!writer.is_empty()); |
| 714 | /// ``` |
| 715 | pub fn is_empty(&self) -> bool { |
| 716 | self.inner.len() == 0 |
| 717 | } |
| 718 | |
| 719 | /// Gets the total capacity of the pipe. |
| 720 | /// |
| 721 | /// This method returns the number of bytes that the pipe can hold at a time. |
| 722 | /// |
| 723 | /// # Examples |
| 724 | /// |
| 725 | /// ``` |
| 726 | /// # futures_lite::future::block_on(async { |
| 727 | /// let (_, writer) = piper::pipe(10); |
| 728 | /// assert_eq!(writer.capacity(), 10); |
| 729 | /// # }); |
| 730 | /// ``` |
| 731 | pub fn capacity(&self) -> usize { |
| 732 | self.inner.cap |
| 733 | } |
| 734 | |
| 735 | /// Tell whether or not the pipe is full. |
| 736 | /// |
| 737 | /// The pipe is full if the number of bytes written into it is equal to its capacity. At this point, |
| 738 | /// writes will block until some data is read from the pipe. |
| 739 | /// |
| 740 | /// This method returns `true` if the pipe is full, and `false` otherwise. |
| 741 | /// |
| 742 | /// # Examples |
| 743 | /// |
| 744 | /// ``` |
| 745 | /// let (mut reader, mut writer) = piper::pipe(10); |
| 746 | /// assert!(!writer.is_full()); |
| 747 | /// let _ = writer.try_fill(&[0u8; 10]); |
| 748 | /// assert!(writer.is_full()); |
| 749 | /// let _ = reader.try_drain(&mut [0u8; 5]); |
| 750 | /// assert!(!writer.is_full()); |
| 751 | /// ``` |
| 752 | pub fn is_full(&self) -> bool { |
| 753 | self.inner.len() == self.inner.cap |
| 754 | } |
| 755 | |
| 756 | /// Tell whether or not the pipe is closed. |
| 757 | /// |
| 758 | /// The pipe is closed if either the reader or the writer has been dropped. At this point, attempting |
| 759 | /// to write into the pipe will return `Poll::Ready(Ok(0))` and attempting to read from the pipe after |
| 760 | /// any previously written bytes are read will return `Poll::Ready(Ok(0))`. |
| 761 | /// |
| 762 | /// # Examples |
| 763 | /// |
| 764 | /// ``` |
| 765 | /// # futures_lite::future::block_on(async { |
| 766 | /// let (reader, writer) = piper::pipe(10); |
| 767 | /// assert!(!writer.is_closed()); |
| 768 | /// drop(reader); |
| 769 | /// assert!(writer.is_closed()); |
| 770 | /// # }); |
| 771 | /// ``` |
| 772 | pub fn is_closed(&self) -> bool { |
| 773 | self.inner.closed.load(Ordering::SeqCst) |
| 774 | } |
| 775 | |
| 776 | /// Reads bytes from blocking `src` and writes into this writer. |
| 777 | /// |
| 778 | /// This method writes directly from `src` into the pipe's internal buffer. This avoids an extra copy, |
| 779 | /// but it may block the thread if `src` blocks. |
| 780 | /// |
| 781 | /// If the pipe is full, this method returns `Poll::Pending`. If the pipe is closed, this method |
| 782 | /// returns `Poll::Ready(Ok(0))`. Errors in `src` are bubbled up through `Poll::Ready(Err(e))`. |
| 783 | /// Otherwise, this method returns `Poll::Ready(Ok(n))` where `n` is the number of bytes read. |
| 784 | /// |
| 785 | /// This method is only available when the `std` feature is enabled. For `no_std` environments, |
| 786 | /// consider using [`poll_fill_bytes`] instead. |
| 787 | /// |
| 788 | /// [`poll_fill_bytes`]: #method.poll_fill_bytes |
| 789 | /// |
| 790 | /// # Examples |
| 791 | /// |
| 792 | /// ``` |
| 793 | /// use futures_lite::{future, prelude::*}; |
| 794 | /// # future::block_on(async { |
| 795 | /// |
| 796 | /// // Create a pipe. |
| 797 | /// let (mut reader, mut writer) = piper::pipe(1024); |
| 798 | /// |
| 799 | /// // Fill the pipe with some bytes. |
| 800 | /// let data = b"hello world" ; |
| 801 | /// let n = future::poll_fn(|cx| writer.poll_fill(cx, &data[..])).await.unwrap(); |
| 802 | /// assert_eq!(n, data.len()); |
| 803 | /// |
| 804 | /// // Read the bytes back. |
| 805 | /// let mut buf = [0; 1024]; |
| 806 | /// reader.read_exact(&mut buf[..data.len()]).await.unwrap(); |
| 807 | /// assert_eq!(&buf[..data.len()], data); |
| 808 | /// # }); |
| 809 | /// ``` |
| 810 | #[cfg (feature = "std" )] |
| 811 | pub fn poll_fill(&mut self, cx: &mut Context<'_>, src: impl Read) -> Poll<io::Result<usize>> { |
| 812 | self.fill_inner(Some(cx), src) |
| 813 | } |
| 814 | |
| 815 | /// Writes bytes into this writer. |
| 816 | /// |
| 817 | /// Rather than taking a `Read` trait object, this method takes a slice of bytes to read from. |
| 818 | /// Because of this, it is infallible and can be used in `no_std` environments. |
| 819 | /// |
| 820 | /// The same conditions that apply to [`poll_fill`] apply to this method. |
| 821 | /// |
| 822 | /// [`poll_fill`]: #method.poll_fill |
| 823 | /// |
| 824 | /// # Examples |
| 825 | /// |
| 826 | /// ``` |
| 827 | /// use futures_lite::{future, prelude::*}; |
| 828 | /// # future::block_on(async { |
| 829 | /// |
| 830 | /// // Create a pipe. |
| 831 | /// let (mut reader, mut writer) = piper::pipe(1024); |
| 832 | /// |
| 833 | /// // Fill the pipe with some bytes. |
| 834 | /// let data = b"hello world" ; |
| 835 | /// let n = future::poll_fn(|cx| writer.poll_fill_bytes(cx, &data[..])).await; |
| 836 | /// assert_eq!(n, data.len()); |
| 837 | /// |
| 838 | /// // Read the bytes back. |
| 839 | /// let mut buf = [0; 1024]; |
| 840 | /// reader.read_exact(&mut buf[..data.len()]).await.unwrap(); |
| 841 | /// assert_eq!(&buf[..data.len()], data); |
| 842 | /// # }); |
| 843 | /// ``` |
| 844 | pub fn poll_fill_bytes(&mut self, cx: &mut Context<'_>, bytes: &[u8]) -> Poll<usize> { |
| 845 | match self.fill_inner(Some(cx), ReadBytes(bytes)) { |
| 846 | Poll::Ready(Ok(n)) => Poll::Ready(n), |
| 847 | Poll::Ready(Err(e)) => match e {}, |
| 848 | Poll::Pending => Poll::Pending, |
| 849 | } |
| 850 | } |
| 851 | |
| 852 | /// Tries to write bytes to this writer. |
| 853 | /// |
| 854 | /// Returns the total number of bytes that were read from this reader. |
| 855 | /// |
| 856 | /// # Examples |
| 857 | /// |
| 858 | /// ``` |
| 859 | /// let (mut r, mut w) = piper::pipe(1024); |
| 860 | /// |
| 861 | /// let mut buf = [0; 10]; |
| 862 | /// assert_eq!(w.try_fill(&[0, 1, 2, 3, 4]), 5); |
| 863 | /// assert_eq!(r.try_drain(&mut buf), 5); |
| 864 | /// assert_eq!(&buf[..5], &[0, 1, 2, 3, 4]); |
| 865 | /// ``` |
| 866 | pub fn try_fill(&mut self, dest: &[u8]) -> usize { |
| 867 | match self.fill_inner(None, ReadBytes(dest)) { |
| 868 | Poll::Ready(Ok(n)) => n, |
| 869 | Poll::Ready(Err(e)) => match e {}, |
| 870 | Poll::Pending => 0, |
| 871 | } |
| 872 | } |
| 873 | |
| 874 | /// Reads bytes from blocking `src` and writes into this writer. |
| 875 | #[inline ] |
| 876 | fn fill_inner<R: ReadLike>( |
| 877 | &mut self, |
| 878 | mut cx: Option<&mut Context<'_>>, |
| 879 | mut src: R, |
| 880 | ) -> Poll<Result<usize, R::Error>> { |
| 881 | // Just a quick check if the pipe is closed, which is why a relaxed load is okay. |
| 882 | if self.inner.closed.load(Ordering::Relaxed) { |
| 883 | return Poll::Ready(Ok(0)); |
| 884 | } |
| 885 | |
| 886 | // Calculates the distance between two indices. |
| 887 | let cap = self.inner.cap; |
| 888 | let distance = |a: usize, b: usize| { |
| 889 | if a <= b { |
| 890 | b - a |
| 891 | } else { |
| 892 | 2 * cap - (a - b) |
| 893 | } |
| 894 | }; |
| 895 | |
| 896 | // If the pipe appears to be full... |
| 897 | if distance(self.head, self.tail) == cap { |
| 898 | // Reload the head in case it's become stale. |
| 899 | self.head = self.inner.head.load(Ordering::Acquire); |
| 900 | |
| 901 | // If the pipe is now really empty... |
| 902 | if distance(self.head, self.tail) == cap { |
| 903 | // Register the waker. |
| 904 | if let Some(cx) = cx.as_mut() { |
| 905 | self.inner.writer.register(cx.waker()); |
| 906 | } |
| 907 | atomic::fence(Ordering::SeqCst); |
| 908 | |
| 909 | // Reload the head after registering the waker. |
| 910 | self.head = self.inner.head.load(Ordering::Acquire); |
| 911 | |
| 912 | // If the pipe is still full... |
| 913 | if distance(self.head, self.tail) == cap { |
| 914 | // Check whether the pipe is closed or just full. |
| 915 | if self.inner.closed.load(Ordering::Relaxed) { |
| 916 | return Poll::Ready(Ok(0)); |
| 917 | } else { |
| 918 | return Poll::Pending; |
| 919 | } |
| 920 | } |
| 921 | } |
| 922 | } |
| 923 | |
| 924 | // The pipe is not full so remove the waker. |
| 925 | self.inner.writer.take(); |
| 926 | |
| 927 | // Yield with some small probability - this improves fairness. |
| 928 | if let Some(cx) = cx { |
| 929 | ready!(maybe_yield(&mut self.rng, cx)); |
| 930 | } |
| 931 | |
| 932 | // Given an index in `0..2*cap`, returns the real index in `0..cap`. |
| 933 | let real_index = |i: usize| { |
| 934 | if i < cap { |
| 935 | i |
| 936 | } else { |
| 937 | i - cap |
| 938 | } |
| 939 | }; |
| 940 | |
| 941 | // Number of bytes written so far. |
| 942 | let mut count = 0; |
| 943 | |
| 944 | loop { |
| 945 | // Calculate how many bytes to write in this iteration. |
| 946 | let n = (128 * 1024) // Not too many bytes in one go - better to wake the reader soon! |
| 947 | .min(self.zeroed_until * 2 + 4096) // Don't zero too many bytes when starting. |
| 948 | .min(cap - distance(self.head, self.tail)) // No more than space in the pipe. |
| 949 | .min(cap - real_index(self.tail)); // Don't go past the buffer boundary. |
| 950 | |
| 951 | // Create a slice of available space in the pipe buffer. |
| 952 | let pipe_slice_mut = unsafe { |
| 953 | let from = real_index(self.tail); |
| 954 | let to = from + n; |
| 955 | |
| 956 | // Make sure all bytes in the slice are initialized. |
| 957 | if self.zeroed_until < to { |
| 958 | self.inner |
| 959 | .buffer |
| 960 | .add(self.zeroed_until) |
| 961 | .write_bytes(0u8, to - self.zeroed_until); |
| 962 | self.zeroed_until = to; |
| 963 | } |
| 964 | |
| 965 | slice::from_raw_parts_mut(self.inner.buffer.add(from), n) |
| 966 | }; |
| 967 | |
| 968 | // Copy bytes from `src` into the piper buffer. |
| 969 | let n = src.read(pipe_slice_mut)?; |
| 970 | count += n; |
| 971 | |
| 972 | // If the pipe is full or closed, or `src` is empty, return. |
| 973 | if n == 0 || self.inner.closed.load(Ordering::Relaxed) { |
| 974 | return Poll::Ready(Ok(count)); |
| 975 | } |
| 976 | |
| 977 | // Move the tail forward. |
| 978 | if self.tail + n < 2 * cap { |
| 979 | self.tail += n; |
| 980 | } else { |
| 981 | self.tail = 0; |
| 982 | } |
| 983 | |
| 984 | // Store the current tail index. |
| 985 | self.inner.tail.store(self.tail, Ordering::Release); |
| 986 | |
| 987 | // Wake the reader because the pipe is not empty. |
| 988 | self.inner.reader.wake(); |
| 989 | } |
| 990 | } |
| 991 | } |
| 992 | |
| 993 | #[cfg (feature = "std" )] |
| 994 | impl AsyncWrite for Writer { |
| 995 | fn poll_write( |
| 996 | mut self: Pin<&mut Self>, |
| 997 | cx: &mut Context<'_>, |
| 998 | buf: &[u8], |
| 999 | ) -> Poll<io::Result<usize>> { |
| 1000 | self.poll_fill_bytes(cx, buf).map(Ok) |
| 1001 | } |
| 1002 | |
| 1003 | fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
| 1004 | // Nothing to flush. |
| 1005 | Poll::Ready(Ok(())) |
| 1006 | } |
| 1007 | |
| 1008 | fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
| 1009 | // Set the closed flag. |
| 1010 | self.inner.closed.store(true, Ordering::Release); |
| 1011 | |
| 1012 | // Wake up any tasks that may be waiting on the pipe. |
| 1013 | self.inner.reader.wake(); |
| 1014 | self.inner.writer.wake(); |
| 1015 | |
| 1016 | // The pipe is now closed. |
| 1017 | Poll::Ready(Ok(())) |
| 1018 | } |
| 1019 | } |
| 1020 | |
| 1021 | /// A trait for reading bytes into a pipe. |
| 1022 | trait ReadLike { |
| 1023 | /// The error type. |
| 1024 | type Error; |
| 1025 | |
| 1026 | /// Reads bytes into the given buffer. |
| 1027 | fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error>; |
| 1028 | } |
| 1029 | |
| 1030 | #[cfg (feature = "std" )] |
| 1031 | impl<R: Read> ReadLike for R { |
| 1032 | type Error = io::Error; |
| 1033 | |
| 1034 | fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> { |
| 1035 | Read::read(self, buf) |
| 1036 | } |
| 1037 | } |
| 1038 | |
| 1039 | /// Implements `no_std` reading around a byte slice. |
| 1040 | struct ReadBytes<'a>(&'a [u8]); |
| 1041 | |
| 1042 | impl ReadLike for ReadBytes<'_> { |
| 1043 | type Error = Infallible; |
| 1044 | |
| 1045 | fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> { |
| 1046 | let n: usize = self.0.len().min(buf.len()); |
| 1047 | buf[..n].copy_from_slice(&self.0[..n]); |
| 1048 | self.0 = &self.0[n..]; |
| 1049 | Ok(n) |
| 1050 | } |
| 1051 | } |
| 1052 | |
| 1053 | /// A trait for writing bytes from a pipe. |
| 1054 | trait WriteLike { |
| 1055 | /// The error type. |
| 1056 | type Error; |
| 1057 | |
| 1058 | /// Writes bytes from the given buffer. |
| 1059 | fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error>; |
| 1060 | } |
| 1061 | |
| 1062 | #[cfg (feature = "std" )] |
| 1063 | impl<W: Write> WriteLike for W { |
| 1064 | type Error = io::Error; |
| 1065 | |
| 1066 | fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> { |
| 1067 | Write::write(self, buf) |
| 1068 | } |
| 1069 | } |
| 1070 | |
| 1071 | /// Implements `no_std` writing around a byte slice. |
| 1072 | struct WriteBytes<'a>(&'a mut [u8]); |
| 1073 | |
| 1074 | impl WriteLike for WriteBytes<'_> { |
| 1075 | type Error = Infallible; |
| 1076 | |
| 1077 | fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> { |
| 1078 | let n: usize = self.0.len().min(buf.len()); |
| 1079 | self.0[..n].copy_from_slice(&buf[..n]); |
| 1080 | |
| 1081 | // mem::take() is not available on 1.36 |
| 1082 | #[allow (clippy::mem_replace_with_default)] |
| 1083 | { |
| 1084 | let slice: &mut [u8] = mem::replace(&mut self.0, &mut []); |
| 1085 | self.0 = &mut slice[n..]; |
| 1086 | } |
| 1087 | |
| 1088 | Ok(n) |
| 1089 | } |
| 1090 | } |
| 1091 | |
| 1092 | /// Yield with some small probability. |
| 1093 | fn maybe_yield(rng: &mut fastrand::Rng, cx: &mut Context<'_>) -> Poll<()> { |
| 1094 | if rng.usize(..100) == 0 { |
| 1095 | cx.waker().wake_by_ref(); |
| 1096 | Poll::Pending |
| 1097 | } else { |
| 1098 | Poll::Ready(()) |
| 1099 | } |
| 1100 | } |
| 1101 | |
| 1102 | /// Get a random number generator. |
| 1103 | #[cfg (feature = "std" )] |
| 1104 | #[inline ] |
| 1105 | fn rng() -> fastrand::Rng { |
| 1106 | fastrand::Rng::new() |
| 1107 | } |
| 1108 | |
| 1109 | /// Get a random number generator. |
| 1110 | /// |
| 1111 | /// This uses a fixed seed due to the lack of a good RNG in `no_std` environments. |
| 1112 | #[cfg (not(feature = "std" ))] |
| 1113 | #[inline ] |
| 1114 | fn rng() -> fastrand::Rng { |
| 1115 | // Chosen by fair roll of the dice. |
| 1116 | fastrand::Rng::with_seed(0x7e9b496634c97ec6) |
| 1117 | } |
| 1118 | |
| 1119 | /// ``` |
| 1120 | /// use piper::{Reader, Writer}; |
| 1121 | /// fn _send_sync<T: Send + Sync>() {} |
| 1122 | /// _send_sync::<Reader>(); |
| 1123 | /// _send_sync::<Writer>(); |
| 1124 | /// ``` |
| 1125 | fn _assert_send_sync() {} |
| 1126 | |
| 1127 | mod sync { |
| 1128 | #[cfg (not(feature = "portable-atomic" ))] |
| 1129 | pub use core::sync::atomic; |
| 1130 | |
| 1131 | #[cfg (not(feature = "portable-atomic" ))] |
| 1132 | pub use alloc::sync::Arc; |
| 1133 | |
| 1134 | #[cfg (feature = "portable-atomic" )] |
| 1135 | pub use portable_atomic_crate as atomic; |
| 1136 | |
| 1137 | #[cfg (feature = "portable-atomic" )] |
| 1138 | pub use portable_atomic_util::Arc; |
| 1139 | } |
| 1140 | |