1 | //! A bounded single-producer single-consumer pipe. |
2 | //! |
3 | //! This crate provides a ring buffer that can be asynchronously read from and written to. It is |
4 | //! created via the [`pipe`] function, which returns a pair of [`Reader`] and [`Writer`] handles. |
5 | //! They implement the [`AsyncRead`] and [`AsyncWrite`] traits, respectively. |
6 | //! |
7 | //! The handles are single-producer/single-consumer; to clarify, they cannot be cloned and need `&mut` |
8 | //! access to read or write to them. If multiple-producer/multiple-consumer handles are needed, |
9 | //! consider wrapping them in an `Arc<Mutex<...>>` or similar. |
10 | //! |
11 | //! When the sender is dropped, remaining bytes in the pipe can still be read. After that, attempts |
12 | //! to read will result in `Ok(0)`, i.e. they will always 'successfully' read 0 bytes. |
13 | //! |
14 | //! When the receiver is dropped, the pipe is closed and no more bytes and be written into it. |
15 | //! Further writes will result in `Ok(0)`, i.e. they will always 'successfully' write 0 bytes. |
16 | //! |
17 | //! # Version 0.2.0 Notes |
18 | //! |
19 | //! Previously, this crate contained other synchronization primitives, such as bounded channels, locks, |
20 | //! and event listeners. These have been split out into their own crates: |
21 | //! |
22 | //! - [`async-channel`](https://docs.rs/async-channel) |
23 | //! - [`async-dup`](https://docs.rs/async-dup) |
24 | //! - [`async-lock`](https://docs.rs/async-lock) |
25 | //! - [`async-mutex`](https://docs.rs/async-mutex) |
26 | //! - [`event-listener`](https://docs.rs/event-listener) |
27 | //! |
28 | //! # Examples |
29 | //! |
30 | //! ## Asynchronous Tasks |
31 | //! |
32 | //! Communicate between asynchronous tasks, potentially on other threads. |
33 | //! |
34 | //! ``` |
35 | //! use async_channel::unbounded; |
36 | //! use async_executor::Executor; |
37 | //! use easy_parallel::Parallel; |
38 | //! use futures_lite::{future, prelude::*}; |
39 | //! use std::time::Duration; |
40 | //! |
41 | //! # if cfg!(miri) { return; } |
42 | //! |
43 | //! // Create a pair of handles. |
44 | //! let (mut reader, mut writer) = piper::pipe(1024); |
45 | //! |
46 | //! // Create the executor. |
47 | //! let ex = Executor::new(); |
48 | //! let (signal, shutdown) = unbounded::<()>(); |
49 | //! |
50 | //! // Spawn a detached task for random data to the pipe. |
51 | //! let writer = ex.spawn(async move { |
52 | //! for _ in 0..1_000 { |
53 | //! // Generate 8 random numnbers. |
54 | //! let random = fastrand::u64(..).to_le_bytes(); |
55 | //! |
56 | //! // Write them to the pipe. |
57 | //! writer.write_all(&random).await.unwrap(); |
58 | //! |
59 | //! // Wait a bit. |
60 | //! async_io::Timer::after(Duration::from_millis(5)).await; |
61 | //! } |
62 | //! |
63 | //! // Drop the writer to close the pipe. |
64 | //! drop(writer); |
65 | //! }); |
66 | //! |
67 | //! // Detach the task so that it runs in the background. |
68 | //! writer.detach(); |
69 | //! |
70 | //! // Spawn a task for reading from the pipe. |
71 | //! let reader = ex.spawn(async move { |
72 | //! let mut buf = vec![]; |
73 | //! |
74 | //! // Read all bytes from the pipe. |
75 | //! reader.read_to_end(&mut buf).await.unwrap(); |
76 | //! |
77 | //! println!("Random data: {:#?}" , buf); |
78 | //! }); |
79 | //! |
80 | //! Parallel::new() |
81 | //! // Run four executor threads. |
82 | //! .each(0..4, |_| future::block_on(ex.run(shutdown.recv()))) |
83 | //! // Run the main future on the current thread. |
84 | //! .finish(|| future::block_on(async { |
85 | //! // Wait for the reader to finish. |
86 | //! reader.await; |
87 | //! |
88 | //! // Signal the executor threads to shut down. |
89 | //! drop(signal); |
90 | //! })); |
91 | //! ``` |
92 | //! |
93 | //! ## Blocking I/O |
94 | //! |
95 | //! File I/O is blocking; therefore, in `async` code, you must run it on another thread. This example |
96 | //! spawns another thread for reading a file and writing it to a pipe. |
97 | //! |
98 | //! ```no_run |
99 | //! use futures_lite::{future, prelude::*}; |
100 | //! use std::fs::File; |
101 | //! use std::io::prelude::*; |
102 | //! use std::thread; |
103 | //! |
104 | //! // Create a pair of handles. |
105 | //! let (mut r, mut w) = piper::pipe(1024); |
106 | //! |
107 | //! // Spawn a thread for reading a file. |
108 | //! thread::spawn(move || { |
109 | //! let mut file = File::open("Cargo.toml" ).unwrap(); |
110 | //! |
111 | //! // Read the file into a buffer. |
112 | //! let mut buf = [0u8; 16384]; |
113 | //! future::block_on(async move { |
114 | //! loop { |
115 | //! // Read a chunk of bytes from the file. |
116 | //! // Blocking is okay here, since this is a separate thread. |
117 | //! let n = file.read(&mut buf).unwrap(); |
118 | //! if n == 0 { |
119 | //! break; |
120 | //! } |
121 | //! |
122 | //! // Write the chunk to the pipe. |
123 | //! w.write_all(&buf[..n]).await.unwrap(); |
124 | //! } |
125 | //! |
126 | //! // Close the pipe. |
127 | //! drop(w); |
128 | //! }); |
129 | //! }); |
130 | //! |
131 | //! # future::block_on(async move { |
132 | //! // Read bytes from the pipe. |
133 | //! let mut buf = vec![]; |
134 | //! r.read_to_end(&mut buf).await.unwrap(); |
135 | //! |
136 | //! println!("Read {} bytes" , buf.len()); |
137 | //! # }); |
138 | //! ``` |
139 | //! |
140 | //! However, the lower-level [`poll_fill`] and [`poll_drain`] methods take `impl Read` and `impl Write` |
141 | //! arguments, respectively. This allows you to skip the buffer entirely and read/write directly from |
142 | //! the file into the pipe. This approach should be preferred when possible, as it avoids an extra |
143 | //! copy. |
144 | //! |
145 | //! ```no_run |
146 | //! # use futures_lite::future; |
147 | //! # use std::fs::File; |
148 | //! # let mut file: File = unimplemented!(); |
149 | //! # let mut w: piper::Writer = unimplemented!(); |
150 | //! // In the `future::block_on` call above... |
151 | //! # future::block_on(async move { |
152 | //! loop { |
153 | //! let n = future::poll_fn(|cx| w.poll_fill(cx, &mut file)).await.unwrap(); |
154 | //! if n == 0 { |
155 | //! break; |
156 | //! } |
157 | //! } |
158 | //! # }); |
159 | //! ``` |
160 | //! |
161 | //! The [`blocking`] crate is preferred in this use case, since it uses more efficient strategies for |
162 | //! thread management and pipes. |
163 | //! |
164 | //! [`poll_fill`]: struct.Writer.html#method.poll_fill |
165 | //! [`poll_drain`]: struct.Reader.html#method.poll_drain |
166 | //! [`blocking`]: https://docs.rs/blocking |
167 | |
168 | #![cfg_attr (not(feature = "std" ), no_std)] |
169 | #![forbid (missing_docs)] |
170 | #![doc ( |
171 | html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" |
172 | )] |
173 | #![doc ( |
174 | html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" |
175 | )] |
176 | |
177 | extern crate alloc; |
178 | |
179 | use core::convert::Infallible; |
180 | use core::mem; |
181 | use core::slice; |
182 | use core::task::{Context, Poll}; |
183 | |
184 | use alloc::vec::Vec; |
185 | |
186 | use sync::atomic::{self, AtomicBool, AtomicUsize, Ordering}; |
187 | use sync::Arc; |
188 | |
189 | #[cfg (feature = "std" )] |
190 | use std::{ |
191 | io::{self, Read, Write}, |
192 | pin::Pin, |
193 | }; |
194 | |
195 | use atomic_waker::AtomicWaker; |
196 | |
197 | #[cfg (feature = "std" )] |
198 | use futures_io::{AsyncRead, AsyncWrite}; |
199 | |
200 | macro_rules! ready { |
201 | ($e:expr) => {{ |
202 | match $e { |
203 | Poll::Ready(t) => t, |
204 | Poll::Pending => return Poll::Pending, |
205 | } |
206 | }}; |
207 | } |
208 | |
209 | /// Creates a bounded single-producer single-consumer pipe. |
210 | /// |
211 | /// A pipe is a ring buffer of `cap` bytes that can be asynchronously read from and written to. |
212 | /// |
213 | /// See the [crate-level documentation](index.html) for more details. |
214 | /// |
215 | /// # Panics |
216 | /// |
217 | /// This function panics if `cap` is 0 or if `cap * 2` overflows a `usize`. |
218 | pub fn pipe(cap: usize) -> (Reader, Writer) { |
219 | assert!(cap > 0, "capacity must be positive" ); |
220 | assert!(cap.checked_mul(2).is_some(), "capacity is too large" ); |
221 | |
222 | // Allocate the ring buffer. |
223 | let mut v = Vec::with_capacity(cap); |
224 | let buffer = v.as_mut_ptr(); |
225 | mem::forget(v); |
226 | |
227 | let inner = Arc::new(Pipe { |
228 | head: AtomicUsize::new(0), |
229 | tail: AtomicUsize::new(0), |
230 | reader: AtomicWaker::new(), |
231 | writer: AtomicWaker::new(), |
232 | closed: AtomicBool::new(false), |
233 | buffer, |
234 | cap, |
235 | }); |
236 | |
237 | // Use a random number generator to randomize fair yielding behavior. |
238 | let mut rng = rng(); |
239 | |
240 | let r = Reader { |
241 | inner: inner.clone(), |
242 | head: 0, |
243 | tail: 0, |
244 | rng: rng.fork(), |
245 | }; |
246 | |
247 | let w = Writer { |
248 | inner, |
249 | head: 0, |
250 | tail: 0, |
251 | zeroed_until: 0, |
252 | rng, |
253 | }; |
254 | |
255 | (r, w) |
256 | } |
257 | |
258 | /// The reading side of a pipe. |
259 | /// |
260 | /// This type is created by the [`pipe`] function. See its documentation for more details. |
261 | pub struct Reader { |
262 | /// The inner ring buffer. |
263 | inner: Arc<Pipe>, |
264 | |
265 | /// The head index, moved by the reader, in the range `0..2*cap`. |
266 | /// |
267 | /// This index always matches `inner.head`. |
268 | head: usize, |
269 | |
270 | /// The tail index, moved by the writer, in the range `0..2*cap`. |
271 | /// |
272 | /// This index is a snapshot of `index.tail` that might become stale at any point. |
273 | tail: usize, |
274 | |
275 | /// Random number generator. |
276 | rng: fastrand::Rng, |
277 | } |
278 | |
279 | /// The writing side of a pipe. |
280 | /// |
281 | /// This type is created by the [`pipe`] function. See its documentation for more details. |
282 | pub struct Writer { |
283 | /// The inner ring buffer. |
284 | inner: Arc<Pipe>, |
285 | |
286 | /// The head index, moved by the reader, in the range `0..2*cap`. |
287 | /// |
288 | /// This index is a snapshot of `index.head` that might become stale at any point. |
289 | head: usize, |
290 | |
291 | /// The tail index, moved by the writer, in the range `0..2*cap`. |
292 | /// |
293 | /// This index always matches `inner.tail`. |
294 | tail: usize, |
295 | |
296 | /// How many bytes at the beginning of the buffer have been zeroed. |
297 | /// |
298 | /// The pipe allocates an uninitialized buffer, and we must be careful about passing |
299 | /// uninitialized data to user code. Zeroing the buffer right after allocation would be too |
300 | /// expensive, so we zero it in smaller chunks as the writer makes progress. |
301 | zeroed_until: usize, |
302 | |
303 | /// Random number generator. |
304 | rng: fastrand::Rng, |
305 | } |
306 | |
307 | /// The inner ring buffer. |
308 | /// |
309 | /// Head and tail indices are in the range `0..2*cap`, even though they really map onto the |
310 | /// `0..cap` range. The distance between head and tail indices is never more than `cap`. |
311 | /// |
312 | /// The reason why indices are not in the range `0..cap` is because we need to distinguish between |
313 | /// the pipe being empty and being full. If head and tail were in `0..cap`, then `head == tail` |
314 | /// could mean the pipe is either empty or full, but we don't know which! |
315 | struct Pipe { |
316 | /// The head index, moved by the reader, in the range `0..2*cap`. |
317 | head: AtomicUsize, |
318 | |
319 | /// The tail index, moved by the writer, in the range `0..2*cap`. |
320 | tail: AtomicUsize, |
321 | |
322 | /// A waker representing the blocked reader. |
323 | reader: AtomicWaker, |
324 | |
325 | /// A waker representing the blocked writer. |
326 | writer: AtomicWaker, |
327 | |
328 | /// Set to `true` if the reader or writer was dropped. |
329 | closed: AtomicBool, |
330 | |
331 | /// The byte buffer. |
332 | buffer: *mut u8, |
333 | |
334 | /// The buffer capacity. |
335 | cap: usize, |
336 | } |
337 | |
338 | unsafe impl Sync for Pipe {} |
339 | unsafe impl Send for Pipe {} |
340 | |
341 | impl Drop for Pipe { |
342 | fn drop(&mut self) { |
343 | // Deallocate the byte buffer. |
344 | unsafe { |
345 | Vec::from_raw_parts(self.buffer, length:0, self.cap); |
346 | } |
347 | } |
348 | } |
349 | |
350 | impl Drop for Reader { |
351 | fn drop(&mut self) { |
352 | // Dropping closes the pipe and then wakes the writer. |
353 | self.inner.closed.store(val:true, order:Ordering::SeqCst); |
354 | self.inner.writer.wake(); |
355 | } |
356 | } |
357 | |
358 | impl Drop for Writer { |
359 | fn drop(&mut self) { |
360 | // Dropping closes the pipe and then wakes the reader. |
361 | self.inner.closed.store(val:true, order:Ordering::SeqCst); |
362 | self.inner.reader.wake(); |
363 | } |
364 | } |
365 | |
366 | impl Pipe { |
367 | /// Get the length of the data in the pipe. |
368 | fn len(&self) -> usize { |
369 | let head: usize = self.head.load(order:Ordering::Acquire); |
370 | let tail: usize = self.tail.load(order:Ordering::Acquire); |
371 | |
372 | if head <= tail { |
373 | tail - head |
374 | } else { |
375 | (2 * self.cap) - (head - tail) |
376 | } |
377 | } |
378 | } |
379 | |
380 | impl Reader { |
381 | /// Gets the total length of the data in the pipe. |
382 | /// |
383 | /// This method returns the number of bytes that have been written into the pipe but haven't been |
384 | /// read yet. |
385 | /// |
386 | /// # Examples |
387 | /// |
388 | /// ``` |
389 | /// # futures_lite::future::block_on(async { |
390 | /// # futures_lite::future::poll_fn(|cx| { |
391 | /// let (mut reader, mut writer) = piper::pipe(10); |
392 | /// let _ = writer.poll_fill_bytes(cx, &[0u8; 5]); |
393 | /// assert_eq!(reader.len(), 5); |
394 | /// # std::task::Poll::Ready(()) }).await; |
395 | /// # }); |
396 | /// ``` |
397 | pub fn len(&self) -> usize { |
398 | self.inner.len() |
399 | } |
400 | |
401 | /// Tell whether or not the pipe is empty. |
402 | /// |
403 | /// This method returns `true` if the pipe is empty, and `false` otherwise. |
404 | /// |
405 | /// # Examples |
406 | /// |
407 | /// ``` |
408 | /// # futures_lite::future::block_on(async { |
409 | /// # futures_lite::future::poll_fn(|cx| { |
410 | /// let (mut reader, mut writer) = piper::pipe(10); |
411 | /// assert!(reader.is_empty()); |
412 | /// let _ = writer.poll_fill_bytes(cx, &[0u8; 5]); |
413 | /// assert!(!reader.is_empty()); |
414 | /// # std::task::Poll::Ready(()) }).await; |
415 | /// # }); |
416 | /// ``` |
417 | pub fn is_empty(&self) -> bool { |
418 | self.inner.len() == 0 |
419 | } |
420 | |
421 | /// Gets the total capacity of the pipe. |
422 | /// |
423 | /// This method returns the number of bytes that the pipe can hold at a time. |
424 | /// |
425 | /// # Examples |
426 | /// |
427 | /// ``` |
428 | /// # futures_lite::future::block_on(async { |
429 | /// let (reader, _) = piper::pipe(10); |
430 | /// assert_eq!(reader.capacity(), 10); |
431 | /// # }); |
432 | /// ``` |
433 | pub fn capacity(&self) -> usize { |
434 | self.inner.cap |
435 | } |
436 | |
437 | /// Tell whether or not the pipe is full. |
438 | /// |
439 | /// The pipe is full if the number of bytes written into it is equal to its capacity. At this point, |
440 | /// writes will block until some data is read from the pipe. |
441 | /// |
442 | /// This method returns `true` if the pipe is full, and `false` otherwise. |
443 | /// |
444 | /// # Examples |
445 | /// |
446 | /// ``` |
447 | /// # futures_lite::future::block_on(async { |
448 | /// # futures_lite::future::poll_fn(|cx| { |
449 | /// let (mut reader, mut writer) = piper::pipe(10); |
450 | /// assert!(!reader.is_full()); |
451 | /// let _ = writer.poll_fill_bytes(cx, &[0u8; 10]); |
452 | /// assert!(reader.is_full()); |
453 | /// let _ = reader.poll_drain_bytes(cx, &mut [0u8; 5]); |
454 | /// assert!(!reader.is_full()); |
455 | /// # std::task::Poll::Ready(()) }).await; |
456 | /// # }); |
457 | /// ``` |
458 | pub fn is_full(&self) -> bool { |
459 | self.inner.len() == self.inner.cap |
460 | } |
461 | |
462 | /// Tell whether or not the pipe is closed. |
463 | /// |
464 | /// The pipe is closed if either the reader or the writer has been dropped. At this point, attempting |
465 | /// to write into the pipe will return `Poll::Ready(Ok(0))` and attempting to read from the pipe after |
466 | /// any previously written bytes are read will return `Poll::Ready(Ok(0))`. |
467 | /// |
468 | /// # Examples |
469 | /// |
470 | /// ``` |
471 | /// # futures_lite::future::block_on(async { |
472 | /// let (mut reader, mut writer) = piper::pipe(10); |
473 | /// assert!(!reader.is_closed()); |
474 | /// drop(writer); |
475 | /// assert!(reader.is_closed()); |
476 | /// # }); |
477 | /// ``` |
478 | pub fn is_closed(&self) -> bool { |
479 | self.inner.closed.load(Ordering::SeqCst) |
480 | } |
481 | |
482 | /// Reads bytes from this reader and writes into blocking `dest`. |
483 | /// |
484 | /// This method reads directly from the pipe's internal buffer into `dest`. This avoids an extra copy, |
485 | /// but it may block the thread if `dest` blocks. |
486 | /// |
487 | /// If the pipe is empty, this method returns `Poll::Pending`. If the pipe is closed, this method |
488 | /// returns `Poll::Ready(Ok(0))`. Errors in `dest` are bubbled up through `Poll::Ready(Err(e))`. |
489 | /// Otherwise, this method returns `Poll::Ready(Ok(n))` where `n` is the number of bytes written. |
490 | /// |
491 | /// This method is only available when the `std` feature is enabled. For `no_std` environments, |
492 | /// consider using [`poll_drain_bytes`] instead. |
493 | /// |
494 | /// [`poll_drain_bytes`]: #method.poll_drain_bytes |
495 | /// |
496 | /// # Examples |
497 | /// |
498 | /// ``` |
499 | /// use futures_lite::{future, prelude::*}; |
500 | /// # future::block_on(async { |
501 | /// |
502 | /// let (mut r, mut w) = piper::pipe(1024); |
503 | /// |
504 | /// // Write some data to the pipe. |
505 | /// w.write_all(b"hello world" ).await.unwrap(); |
506 | /// |
507 | /// // Try reading from the pipe. |
508 | /// let mut buf = [0; 1024]; |
509 | /// let n = future::poll_fn(|cx| r.poll_drain(cx, &mut buf[..])).await.unwrap(); |
510 | /// |
511 | /// // The data was written to the buffer. |
512 | /// assert_eq!(&buf[..n], b"hello world" ); |
513 | /// # }); |
514 | /// ``` |
515 | #[cfg (feature = "std" )] |
516 | pub fn poll_drain( |
517 | &mut self, |
518 | cx: &mut Context<'_>, |
519 | dest: impl Write, |
520 | ) -> Poll<io::Result<usize>> { |
521 | self.drain_inner(Some(cx), dest) |
522 | } |
523 | |
524 | /// Reads bytes from this reader. |
525 | /// |
526 | /// Rather than taking a `Write` trait object, this method takes a slice of bytes to write into. |
527 | /// Because of this, it is infallible and can be used in `no_std` environments. |
528 | /// |
529 | /// The same conditions that apply to [`poll_drain`] apply to this method. |
530 | /// |
531 | /// [`poll_drain`]: #method.poll_drain |
532 | /// |
533 | /// # Examples |
534 | /// |
535 | /// ``` |
536 | /// use futures_lite::{future, prelude::*}; |
537 | /// # future::block_on(async { |
538 | /// let (mut r, mut w) = piper::pipe(1024); |
539 | /// |
540 | /// // Write some data to the pipe. |
541 | /// w.write_all(b"hello world" ).await.unwrap(); |
542 | /// |
543 | /// // Try reading from the pipe. |
544 | /// let mut buf = [0; 1024]; |
545 | /// let n = future::poll_fn(|cx| r.poll_drain_bytes(cx, &mut buf[..])).await; |
546 | /// |
547 | /// // The data was written to the buffer. |
548 | /// assert_eq!(&buf[..n], b"hello world" ); |
549 | /// # }); |
550 | /// ``` |
551 | pub fn poll_drain_bytes(&mut self, cx: &mut Context<'_>, dest: &mut [u8]) -> Poll<usize> { |
552 | match self.drain_inner(Some(cx), WriteBytes(dest)) { |
553 | Poll::Ready(Ok(n)) => Poll::Ready(n), |
554 | Poll::Ready(Err(e)) => match e {}, |
555 | Poll::Pending => Poll::Pending, |
556 | } |
557 | } |
558 | |
559 | /// Tries to read bytes from this reader. |
560 | /// |
561 | /// Returns the total number of bytes that were read from this reader. |
562 | /// |
563 | /// # Examples |
564 | /// |
565 | /// ``` |
566 | /// let (mut r, mut w) = piper::pipe(1024); |
567 | /// |
568 | /// // `try_drain()` returns 0 off the bat. |
569 | /// let mut buf = [0; 10]; |
570 | /// assert_eq!(r.try_drain(&mut buf), 0); |
571 | /// |
572 | /// // After a write it returns the data. |
573 | /// w.try_fill(&[0, 1, 2, 3, 4]); |
574 | /// assert_eq!(r.try_drain(&mut buf), 5); |
575 | /// assert_eq!(&buf[..5], &[0, 1, 2, 3, 4]); |
576 | /// ``` |
577 | pub fn try_drain(&mut self, dest: &mut [u8]) -> usize { |
578 | match self.drain_inner(None, WriteBytes(dest)) { |
579 | Poll::Ready(Ok(n)) => n, |
580 | Poll::Ready(Err(e)) => match e {}, |
581 | Poll::Pending => 0, |
582 | } |
583 | } |
584 | |
585 | /// Reads bytes from this reader and writes into blocking `dest`. |
586 | #[inline ] |
587 | fn drain_inner<W: WriteLike>( |
588 | &mut self, |
589 | mut cx: Option<&mut Context<'_>>, |
590 | mut dest: W, |
591 | ) -> Poll<Result<usize, W::Error>> { |
592 | let cap = self.inner.cap; |
593 | |
594 | // Calculates the distance between two indices. |
595 | let distance = |a: usize, b: usize| { |
596 | if a <= b { |
597 | b - a |
598 | } else { |
599 | 2 * cap - (a - b) |
600 | } |
601 | }; |
602 | |
603 | // If the pipe appears to be empty... |
604 | if distance(self.head, self.tail) == 0 { |
605 | // Reload the tail in case it's become stale. |
606 | self.tail = self.inner.tail.load(Ordering::Acquire); |
607 | |
608 | // If the pipe is now really empty... |
609 | if distance(self.head, self.tail) == 0 { |
610 | // Register the waker. |
611 | if let Some(cx) = cx.as_mut() { |
612 | self.inner.reader.register(cx.waker()); |
613 | } |
614 | atomic::fence(Ordering::SeqCst); |
615 | |
616 | // Reload the tail after registering the waker. |
617 | self.tail = self.inner.tail.load(Ordering::Acquire); |
618 | |
619 | // If the pipe is still empty... |
620 | if distance(self.head, self.tail) == 0 { |
621 | // Check whether the pipe is closed or just empty. |
622 | if self.inner.closed.load(Ordering::Relaxed) { |
623 | return Poll::Ready(Ok(0)); |
624 | } else { |
625 | return Poll::Pending; |
626 | } |
627 | } |
628 | } |
629 | } |
630 | |
631 | // The pipe is not empty so remove the waker. |
632 | self.inner.reader.take(); |
633 | |
634 | // Yield with some small probability - this improves fairness. |
635 | if let Some(cx) = cx { |
636 | ready!(maybe_yield(&mut self.rng, cx)); |
637 | } |
638 | |
639 | // Given an index in `0..2*cap`, returns the real index in `0..cap`. |
640 | let real_index = |i: usize| { |
641 | if i < cap { |
642 | i |
643 | } else { |
644 | i - cap |
645 | } |
646 | }; |
647 | |
648 | // Number of bytes read so far. |
649 | let mut count = 0; |
650 | |
651 | loop { |
652 | // Calculate how many bytes to read in this iteration. |
653 | let n = (128 * 1024) // Not too many bytes in one go - better to wake the writer soon! |
654 | .min(distance(self.head, self.tail)) // No more than bytes in the pipe. |
655 | .min(cap - real_index(self.head)); // Don't go past the buffer boundary. |
656 | |
657 | // Create a slice of data in the pipe buffer. |
658 | let pipe_slice = |
659 | unsafe { slice::from_raw_parts(self.inner.buffer.add(real_index(self.head)), n) }; |
660 | |
661 | // Copy bytes from the pipe buffer into `dest`. |
662 | let n = dest.write(pipe_slice)?; |
663 | count += n; |
664 | |
665 | // If pipe is empty or `dest` is full, return. |
666 | if n == 0 { |
667 | return Poll::Ready(Ok(count)); |
668 | } |
669 | |
670 | // Move the head forward. |
671 | if self.head + n < 2 * cap { |
672 | self.head += n; |
673 | } else { |
674 | self.head = 0; |
675 | } |
676 | |
677 | // Store the current head index. |
678 | self.inner.head.store(self.head, Ordering::Release); |
679 | |
680 | // Wake the writer because the pipe is not full. |
681 | self.inner.writer.wake(); |
682 | } |
683 | } |
684 | } |
685 | |
686 | #[cfg (feature = "std" )] |
687 | impl AsyncRead for Reader { |
688 | fn poll_read( |
689 | mut self: Pin<&mut Self>, |
690 | cx: &mut Context<'_>, |
691 | buf: &mut [u8], |
692 | ) -> Poll<io::Result<usize>> { |
693 | self.poll_drain_bytes(cx, dest:buf).map(Ok) |
694 | } |
695 | } |
696 | |
697 | impl Writer { |
698 | /// Gets the total length of the data in the pipe. |
699 | /// |
700 | /// This method returns the number of bytes that have been written into the pipe but haven't been |
701 | /// read yet. |
702 | /// |
703 | /// # Examples |
704 | /// |
705 | /// ``` |
706 | /// # futures_lite::future::block_on(async { |
707 | /// # futures_lite::future::poll_fn(|cx| { |
708 | /// let (_reader, mut writer) = piper::pipe(10); |
709 | /// let _ = writer.poll_fill_bytes(cx, &[0u8; 5]); |
710 | /// assert_eq!(writer.len(), 5); |
711 | /// # std::task::Poll::Ready(()) }).await; |
712 | /// # }); |
713 | /// ``` |
714 | pub fn len(&self) -> usize { |
715 | self.inner.len() |
716 | } |
717 | |
718 | /// Tell whether or not the pipe is empty. |
719 | /// |
720 | /// This method returns `true` if the pipe is empty, and `false` otherwise. |
721 | /// |
722 | /// # Examples |
723 | /// |
724 | /// ``` |
725 | /// # futures_lite::future::block_on(async { |
726 | /// # futures_lite::future::poll_fn(|cx| { |
727 | /// let (_reader, mut writer) = piper::pipe(10); |
728 | /// assert!(writer.is_empty()); |
729 | /// let _ = writer.poll_fill_bytes(cx, &[0u8; 5]); |
730 | /// assert!(!writer.is_empty()); |
731 | /// # std::task::Poll::Ready(()) }).await; |
732 | /// # }); |
733 | /// ``` |
734 | pub fn is_empty(&self) -> bool { |
735 | self.inner.len() == 0 |
736 | } |
737 | |
738 | /// Gets the total capacity of the pipe. |
739 | /// |
740 | /// This method returns the number of bytes that the pipe can hold at a time. |
741 | /// |
742 | /// # Examples |
743 | /// |
744 | /// ``` |
745 | /// # futures_lite::future::block_on(async { |
746 | /// let (_, writer) = piper::pipe(10); |
747 | /// assert_eq!(writer.capacity(), 10); |
748 | /// # }); |
749 | /// ``` |
750 | pub fn capacity(&self) -> usize { |
751 | self.inner.cap |
752 | } |
753 | |
754 | /// Tell whether or not the pipe is full. |
755 | /// |
756 | /// The pipe is full if the number of bytes written into it is equal to its capacity. At this point, |
757 | /// writes will block until some data is read from the pipe. |
758 | /// |
759 | /// This method returns `true` if the pipe is full, and `false` otherwise. |
760 | /// |
761 | /// # Examples |
762 | /// |
763 | /// ``` |
764 | /// # futures_lite::future::block_on(async { |
765 | /// # futures_lite::future::poll_fn(|cx| { |
766 | /// let (mut reader, mut writer) = piper::pipe(10); |
767 | /// assert!(!writer.is_full()); |
768 | /// let _ = writer.poll_fill_bytes(cx, &[0u8; 10]); |
769 | /// assert!(writer.is_full()); |
770 | /// let _ = reader.poll_drain_bytes(cx, &mut [0u8; 5]); |
771 | /// assert!(!writer.is_full()); |
772 | /// # std::task::Poll::Ready(()) }).await; |
773 | /// # }); |
774 | /// ``` |
775 | pub fn is_full(&self) -> bool { |
776 | self.inner.len() == self.inner.cap |
777 | } |
778 | |
779 | /// Tell whether or not the pipe is closed. |
780 | /// |
781 | /// The pipe is closed if either the reader or the writer has been dropped. At this point, attempting |
782 | /// to write into the pipe will return `Poll::Ready(Ok(0))` and attempting to read from the pipe after |
783 | /// any previously written bytes are read will return `Poll::Ready(Ok(0))`. |
784 | /// |
785 | /// # Examples |
786 | /// |
787 | /// ``` |
788 | /// # futures_lite::future::block_on(async { |
789 | /// let (reader, writer) = piper::pipe(10); |
790 | /// assert!(!writer.is_closed()); |
791 | /// drop(reader); |
792 | /// assert!(writer.is_closed()); |
793 | /// # }); |
794 | /// ``` |
795 | pub fn is_closed(&self) -> bool { |
796 | self.inner.closed.load(Ordering::SeqCst) |
797 | } |
798 | |
799 | /// Reads bytes from blocking `src` and writes into this writer. |
800 | /// |
801 | /// This method writes directly from `src` into the pipe's internal buffer. This avoids an extra copy, |
802 | /// but it may block the thread if `src` blocks. |
803 | /// |
804 | /// If the pipe is full, this method returns `Poll::Pending`. If the pipe is closed, this method |
805 | /// returns `Poll::Ready(Ok(0))`. Errors in `src` are bubbled up through `Poll::Ready(Err(e))`. |
806 | /// Otherwise, this method returns `Poll::Ready(Ok(n))` where `n` is the number of bytes read. |
807 | /// |
808 | /// This method is only available when the `std` feature is enabled. For `no_std` environments, |
809 | /// consider using [`poll_fill_bytes`] instead. |
810 | /// |
811 | /// [`poll_fill_bytes`]: #method.poll_fill_bytes |
812 | /// |
813 | /// # Examples |
814 | /// |
815 | /// ``` |
816 | /// use futures_lite::{future, prelude::*}; |
817 | /// # future::block_on(async { |
818 | /// |
819 | /// // Create a pipe. |
820 | /// let (mut reader, mut writer) = piper::pipe(1024); |
821 | /// |
822 | /// // Fill the pipe with some bytes. |
823 | /// let data = b"hello world" ; |
824 | /// let n = future::poll_fn(|cx| writer.poll_fill(cx, &data[..])).await.unwrap(); |
825 | /// assert_eq!(n, data.len()); |
826 | /// |
827 | /// // Read the bytes back. |
828 | /// let mut buf = [0; 1024]; |
829 | /// reader.read_exact(&mut buf[..data.len()]).await.unwrap(); |
830 | /// assert_eq!(&buf[..data.len()], data); |
831 | /// # }); |
832 | /// ``` |
833 | #[cfg (feature = "std" )] |
834 | pub fn poll_fill(&mut self, cx: &mut Context<'_>, src: impl Read) -> Poll<io::Result<usize>> { |
835 | self.fill_inner(Some(cx), src) |
836 | } |
837 | |
838 | /// Writes bytes into this writer. |
839 | /// |
840 | /// Rather than taking a `Read` trait object, this method takes a slice of bytes to read from. |
841 | /// Because of this, it is infallible and can be used in `no_std` environments. |
842 | /// |
843 | /// The same conditions that apply to [`poll_fill`] apply to this method. |
844 | /// |
845 | /// [`poll_fill`]: #method.poll_fill |
846 | /// |
847 | /// # Examples |
848 | /// |
849 | /// ``` |
850 | /// use futures_lite::{future, prelude::*}; |
851 | /// # future::block_on(async { |
852 | /// |
853 | /// // Create a pipe. |
854 | /// let (mut reader, mut writer) = piper::pipe(1024); |
855 | /// |
856 | /// // Fill the pipe with some bytes. |
857 | /// let data = b"hello world" ; |
858 | /// let n = future::poll_fn(|cx| writer.poll_fill_bytes(cx, &data[..])).await; |
859 | /// assert_eq!(n, data.len()); |
860 | /// |
861 | /// // Read the bytes back. |
862 | /// let mut buf = [0; 1024]; |
863 | /// reader.read_exact(&mut buf[..data.len()]).await.unwrap(); |
864 | /// assert_eq!(&buf[..data.len()], data); |
865 | /// # }); |
866 | /// ``` |
867 | pub fn poll_fill_bytes(&mut self, cx: &mut Context<'_>, bytes: &[u8]) -> Poll<usize> { |
868 | match self.fill_inner(Some(cx), ReadBytes(bytes)) { |
869 | Poll::Ready(Ok(n)) => Poll::Ready(n), |
870 | Poll::Ready(Err(e)) => match e {}, |
871 | Poll::Pending => Poll::Pending, |
872 | } |
873 | } |
874 | |
875 | /// Tries to write bytes to this writer. |
876 | /// |
877 | /// Returns the total number of bytes that were read from this reader. |
878 | /// |
879 | /// # Examples |
880 | /// |
881 | /// ``` |
882 | /// let (mut r, mut w) = piper::pipe(1024); |
883 | /// |
884 | /// let mut buf = [0; 10]; |
885 | /// assert_eq!(w.try_fill(&[0, 1, 2, 3, 4]), 5); |
886 | /// assert_eq!(r.try_drain(&mut buf), 5); |
887 | /// assert_eq!(&buf[..5], &[0, 1, 2, 3, 4]); |
888 | /// ``` |
889 | pub fn try_fill(&mut self, dest: &[u8]) -> usize { |
890 | match self.fill_inner(None, ReadBytes(dest)) { |
891 | Poll::Ready(Ok(n)) => n, |
892 | Poll::Ready(Err(e)) => match e {}, |
893 | Poll::Pending => 0, |
894 | } |
895 | } |
896 | |
897 | /// Reads bytes from blocking `src` and writes into this writer. |
898 | #[inline ] |
899 | fn fill_inner<R: ReadLike>( |
900 | &mut self, |
901 | mut cx: Option<&mut Context<'_>>, |
902 | mut src: R, |
903 | ) -> Poll<Result<usize, R::Error>> { |
904 | // Just a quick check if the pipe is closed, which is why a relaxed load is okay. |
905 | if self.inner.closed.load(Ordering::Relaxed) { |
906 | return Poll::Ready(Ok(0)); |
907 | } |
908 | |
909 | // Calculates the distance between two indices. |
910 | let cap = self.inner.cap; |
911 | let distance = |a: usize, b: usize| { |
912 | if a <= b { |
913 | b - a |
914 | } else { |
915 | 2 * cap - (a - b) |
916 | } |
917 | }; |
918 | |
919 | // If the pipe appears to be full... |
920 | if distance(self.head, self.tail) == cap { |
921 | // Reload the head in case it's become stale. |
922 | self.head = self.inner.head.load(Ordering::Acquire); |
923 | |
924 | // If the pipe is now really empty... |
925 | if distance(self.head, self.tail) == cap { |
926 | // Register the waker. |
927 | if let Some(cx) = cx.as_mut() { |
928 | self.inner.writer.register(cx.waker()); |
929 | } |
930 | atomic::fence(Ordering::SeqCst); |
931 | |
932 | // Reload the head after registering the waker. |
933 | self.head = self.inner.head.load(Ordering::Acquire); |
934 | |
935 | // If the pipe is still full... |
936 | if distance(self.head, self.tail) == cap { |
937 | // Check whether the pipe is closed or just full. |
938 | if self.inner.closed.load(Ordering::Relaxed) { |
939 | return Poll::Ready(Ok(0)); |
940 | } else { |
941 | return Poll::Pending; |
942 | } |
943 | } |
944 | } |
945 | } |
946 | |
947 | // The pipe is not full so remove the waker. |
948 | self.inner.writer.take(); |
949 | |
950 | // Yield with some small probability - this improves fairness. |
951 | if let Some(cx) = cx { |
952 | ready!(maybe_yield(&mut self.rng, cx)); |
953 | } |
954 | |
955 | // Given an index in `0..2*cap`, returns the real index in `0..cap`. |
956 | let real_index = |i: usize| { |
957 | if i < cap { |
958 | i |
959 | } else { |
960 | i - cap |
961 | } |
962 | }; |
963 | |
964 | // Number of bytes written so far. |
965 | let mut count = 0; |
966 | |
967 | loop { |
968 | // Calculate how many bytes to write in this iteration. |
969 | let n = (128 * 1024) // Not too many bytes in one go - better to wake the reader soon! |
970 | .min(self.zeroed_until * 2 + 4096) // Don't zero too many bytes when starting. |
971 | .min(cap - distance(self.head, self.tail)) // No more than space in the pipe. |
972 | .min(cap - real_index(self.tail)); // Don't go past the buffer boundary. |
973 | |
974 | // Create a slice of available space in the pipe buffer. |
975 | let pipe_slice_mut = unsafe { |
976 | let from = real_index(self.tail); |
977 | let to = from + n; |
978 | |
979 | // Make sure all bytes in the slice are initialized. |
980 | if self.zeroed_until < to { |
981 | self.inner |
982 | .buffer |
983 | .add(self.zeroed_until) |
984 | .write_bytes(0u8, to - self.zeroed_until); |
985 | self.zeroed_until = to; |
986 | } |
987 | |
988 | slice::from_raw_parts_mut(self.inner.buffer.add(from), n) |
989 | }; |
990 | |
991 | // Copy bytes from `src` into the piper buffer. |
992 | let n = src.read(pipe_slice_mut)?; |
993 | count += n; |
994 | |
995 | // If the pipe is full or closed, or `src` is empty, return. |
996 | if n == 0 || self.inner.closed.load(Ordering::Relaxed) { |
997 | return Poll::Ready(Ok(count)); |
998 | } |
999 | |
1000 | // Move the tail forward. |
1001 | if self.tail + n < 2 * cap { |
1002 | self.tail += n; |
1003 | } else { |
1004 | self.tail = 0; |
1005 | } |
1006 | |
1007 | // Store the current tail index. |
1008 | self.inner.tail.store(self.tail, Ordering::Release); |
1009 | |
1010 | // Wake the reader because the pipe is not empty. |
1011 | self.inner.reader.wake(); |
1012 | } |
1013 | } |
1014 | } |
1015 | |
1016 | #[cfg (feature = "std" )] |
1017 | impl AsyncWrite for Writer { |
1018 | fn poll_write( |
1019 | mut self: Pin<&mut Self>, |
1020 | cx: &mut Context<'_>, |
1021 | buf: &[u8], |
1022 | ) -> Poll<io::Result<usize>> { |
1023 | self.poll_fill_bytes(cx, buf).map(Ok) |
1024 | } |
1025 | |
1026 | fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
1027 | // Nothing to flush. |
1028 | Poll::Ready(Ok(())) |
1029 | } |
1030 | |
1031 | fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
1032 | // Set the closed flag. |
1033 | self.inner.closed.store(true, Ordering::Release); |
1034 | |
1035 | // Wake up any tasks that may be waiting on the pipe. |
1036 | self.inner.reader.wake(); |
1037 | self.inner.writer.wake(); |
1038 | |
1039 | // The pipe is now closed. |
1040 | Poll::Ready(Ok(())) |
1041 | } |
1042 | } |
1043 | |
1044 | /// A trait for reading bytes into a pipe. |
1045 | trait ReadLike { |
1046 | /// The error type. |
1047 | type Error; |
1048 | |
1049 | /// Reads bytes into the given buffer. |
1050 | fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error>; |
1051 | } |
1052 | |
1053 | #[cfg (feature = "std" )] |
1054 | impl<R: Read> ReadLike for R { |
1055 | type Error = io::Error; |
1056 | |
1057 | fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> { |
1058 | Read::read(self, buf) |
1059 | } |
1060 | } |
1061 | |
1062 | /// Implements `no_std` reading around a byte slice. |
1063 | struct ReadBytes<'a>(&'a [u8]); |
1064 | |
1065 | impl ReadLike for ReadBytes<'_> { |
1066 | type Error = Infallible; |
1067 | |
1068 | fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> { |
1069 | let n: usize = self.0.len().min(buf.len()); |
1070 | buf[..n].copy_from_slice(&self.0[..n]); |
1071 | self.0 = &self.0[n..]; |
1072 | Ok(n) |
1073 | } |
1074 | } |
1075 | |
1076 | /// A trait for writing bytes from a pipe. |
1077 | trait WriteLike { |
1078 | /// The error type. |
1079 | type Error; |
1080 | |
1081 | /// Writes bytes from the given buffer. |
1082 | fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error>; |
1083 | } |
1084 | |
1085 | #[cfg (feature = "std" )] |
1086 | impl<W: Write> WriteLike for W { |
1087 | type Error = io::Error; |
1088 | |
1089 | fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> { |
1090 | Write::write(self, buf) |
1091 | } |
1092 | } |
1093 | |
1094 | /// Implements `no_std` writing around a byte slice. |
1095 | struct WriteBytes<'a>(&'a mut [u8]); |
1096 | |
1097 | impl WriteLike for WriteBytes<'_> { |
1098 | type Error = Infallible; |
1099 | |
1100 | fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> { |
1101 | let n: usize = self.0.len().min(buf.len()); |
1102 | self.0[..n].copy_from_slice(&buf[..n]); |
1103 | |
1104 | // mem::take() is not available on 1.36 |
1105 | #[allow (clippy::mem_replace_with_default)] |
1106 | { |
1107 | let slice: &mut [u8] = mem::replace(&mut self.0, &mut []); |
1108 | self.0 = &mut slice[n..]; |
1109 | } |
1110 | |
1111 | Ok(n) |
1112 | } |
1113 | } |
1114 | |
1115 | /// Yield with some small probability. |
1116 | fn maybe_yield(rng: &mut fastrand::Rng, cx: &mut Context<'_>) -> Poll<()> { |
1117 | if rng.usize(..100) == 0 { |
1118 | cx.waker().wake_by_ref(); |
1119 | Poll::Pending |
1120 | } else { |
1121 | Poll::Ready(()) |
1122 | } |
1123 | } |
1124 | |
1125 | /// Get a random number generator. |
1126 | #[cfg (feature = "std" )] |
1127 | #[inline ] |
1128 | fn rng() -> fastrand::Rng { |
1129 | fastrand::Rng::new() |
1130 | } |
1131 | |
1132 | /// Get a random number generator. |
1133 | /// |
1134 | /// This uses a fixed seed due to the lack of a good RNG in `no_std` environments. |
1135 | #[cfg (not(feature = "std" ))] |
1136 | #[inline ] |
1137 | fn rng() -> fastrand::Rng { |
1138 | // Chosen by fair roll of the dice. |
1139 | fastrand::Rng::with_seed(0x7e9b496634c97ec6) |
1140 | } |
1141 | |
1142 | /// ``` |
1143 | /// use piper::{Reader, Writer}; |
1144 | /// fn _send_sync<T: Send + Sync>() {} |
1145 | /// _send_sync::<Reader>(); |
1146 | /// _send_sync::<Writer>(); |
1147 | /// ``` |
1148 | fn _assert_send_sync() {} |
1149 | |
1150 | mod sync { |
1151 | #[cfg (not(feature = "portable-atomic" ))] |
1152 | pub use core::sync::atomic; |
1153 | |
1154 | #[cfg (not(feature = "portable-atomic" ))] |
1155 | pub use alloc::sync::Arc; |
1156 | |
1157 | #[cfg (feature = "portable-atomic" )] |
1158 | pub use portable_atomic_crate as atomic; |
1159 | |
1160 | #[cfg (feature = "portable-atomic" )] |
1161 | pub use portable_atomic_util::Arc; |
1162 | } |
1163 | |