1 | //! Tools and combinators for I/O. |
2 | //! |
3 | //! # Examples |
4 | //! |
5 | //! ``` |
6 | //! use futures_lite::io::{self, AsyncReadExt}; |
7 | //! |
8 | //! # spin_on::spin_on(async { |
9 | //! let input: &[u8] = b"hello" ; |
10 | //! let mut reader = io::BufReader::new(input); |
11 | //! |
12 | //! let mut contents = String::new(); |
13 | //! reader.read_to_string(&mut contents).await?; |
14 | //! # std::io::Result::Ok(()) }); |
15 | //! ``` |
16 | |
17 | #[doc (no_inline)] |
18 | pub use std::io::{Error, ErrorKind, Result, SeekFrom}; |
19 | |
20 | #[doc (no_inline)] |
21 | pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite}; |
22 | |
23 | use std::borrow::{Borrow, BorrowMut}; |
24 | use std::cmp; |
25 | use std::fmt; |
26 | use std::future::Future; |
27 | use std::io::{IoSlice, IoSliceMut}; |
28 | use std::mem; |
29 | use std::pin::Pin; |
30 | use std::sync::{Arc, Mutex}; |
31 | use std::task::{Context, Poll}; |
32 | |
33 | use futures_core::stream::Stream; |
34 | use pin_project_lite::pin_project; |
35 | |
36 | use crate::future; |
37 | use crate::ready; |
38 | |
39 | const DEFAULT_BUF_SIZE: usize = 8 * 1024; |
40 | |
41 | /// Copies the entire contents of a reader into a writer. |
42 | /// |
43 | /// This function will read data from `reader` and write it into `writer` in a streaming fashion |
44 | /// until `reader` returns EOF. |
45 | /// |
46 | /// On success, returns the total number of bytes copied. |
47 | /// |
48 | /// # Examples |
49 | /// |
50 | /// ``` |
51 | /// use futures_lite::io::{self, BufReader, BufWriter}; |
52 | /// |
53 | /// # spin_on::spin_on(async { |
54 | /// let input: &[u8] = b"hello" ; |
55 | /// let reader = BufReader::new(input); |
56 | /// |
57 | /// let mut output = Vec::new(); |
58 | /// let writer = BufWriter::new(&mut output); |
59 | /// |
60 | /// io::copy(reader, writer).await?; |
61 | /// # std::io::Result::Ok(()) }); |
62 | /// ``` |
63 | pub async fn copy<R, W>(reader: R, writer: W) -> Result<u64> |
64 | where |
65 | R: AsyncRead, |
66 | W: AsyncWrite, |
67 | { |
68 | pin_project! { |
69 | struct CopyFuture<R, W> { |
70 | #[pin] |
71 | reader: R, |
72 | #[pin] |
73 | writer: W, |
74 | amt: u64, |
75 | } |
76 | } |
77 | |
78 | impl<R, W> Future for CopyFuture<R, W> |
79 | where |
80 | R: AsyncBufRead, |
81 | W: AsyncWrite, |
82 | { |
83 | type Output = Result<u64>; |
84 | |
85 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
86 | let mut this = self.project(); |
87 | loop { |
88 | let buffer = ready!(this.reader.as_mut().poll_fill_buf(cx))?; |
89 | if buffer.is_empty() { |
90 | ready!(this.writer.as_mut().poll_flush(cx))?; |
91 | return Poll::Ready(Ok(*this.amt)); |
92 | } |
93 | |
94 | let i = ready!(this.writer.as_mut().poll_write(cx, buffer))?; |
95 | if i == 0 { |
96 | return Poll::Ready(Err(ErrorKind::WriteZero.into())); |
97 | } |
98 | *this.amt += i as u64; |
99 | this.reader.as_mut().consume(i); |
100 | } |
101 | } |
102 | } |
103 | |
104 | let future = CopyFuture { |
105 | reader: BufReader::new(reader), |
106 | writer, |
107 | amt: 0, |
108 | }; |
109 | future.await |
110 | } |
111 | |
112 | /// Asserts that a type implementing [`std::io`] traits can be used as an async type. |
113 | /// |
114 | /// The underlying I/O handle should never block nor return the [`ErrorKind::WouldBlock`] error. |
115 | /// This is usually the case for in-memory buffered I/O. |
116 | /// |
117 | /// # Examples |
118 | /// |
119 | /// ``` |
120 | /// use futures_lite::io::{AssertAsync, AsyncReadExt}; |
121 | /// |
122 | /// let reader: &[u8] = b"hello" ; |
123 | /// |
124 | /// # spin_on::spin_on(async { |
125 | /// let mut async_reader = AssertAsync::new(reader); |
126 | /// let mut contents = String::new(); |
127 | /// |
128 | /// // This line works in async manner - note that there is await: |
129 | /// async_reader.read_to_string(&mut contents).await?; |
130 | /// # std::io::Result::Ok(()) }); |
131 | /// ``` |
132 | #[derive (Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] |
133 | pub struct AssertAsync<T>(T); |
134 | |
135 | impl<T> Unpin for AssertAsync<T> {} |
136 | |
137 | impl<T> AssertAsync<T> { |
138 | /// Wraps an I/O handle implementing [`std::io`] traits. |
139 | /// |
140 | /// # Examples |
141 | /// |
142 | /// ``` |
143 | /// use futures_lite::io::AssertAsync; |
144 | /// |
145 | /// let reader: &[u8] = b"hello" ; |
146 | /// |
147 | /// let async_reader = AssertAsync::new(reader); |
148 | /// ``` |
149 | #[inline (always)] |
150 | pub fn new(io: T) -> Self { |
151 | AssertAsync(io) |
152 | } |
153 | |
154 | /// Gets a reference to the inner I/O handle. |
155 | /// |
156 | /// # Examples |
157 | /// |
158 | /// ``` |
159 | /// use futures_lite::io::AssertAsync; |
160 | /// |
161 | /// let reader: &[u8] = b"hello" ; |
162 | /// |
163 | /// let async_reader = AssertAsync::new(reader); |
164 | /// let r = async_reader.get_ref(); |
165 | /// ``` |
166 | #[inline (always)] |
167 | pub fn get_ref(&self) -> &T { |
168 | &self.0 |
169 | } |
170 | |
171 | /// Gets a mutable reference to the inner I/O handle. |
172 | /// |
173 | /// # Examples |
174 | /// |
175 | /// ``` |
176 | /// use futures_lite::io::AssertAsync; |
177 | /// |
178 | /// let reader: &[u8] = b"hello" ; |
179 | /// |
180 | /// let mut async_reader = AssertAsync::new(reader); |
181 | /// let r = async_reader.get_mut(); |
182 | /// ``` |
183 | #[inline (always)] |
184 | pub fn get_mut(&mut self) -> &mut T { |
185 | &mut self.0 |
186 | } |
187 | |
188 | /// Extracts the inner I/O handle. |
189 | /// |
190 | /// # Examples |
191 | /// |
192 | /// ``` |
193 | /// use futures_lite::io::AssertAsync; |
194 | /// |
195 | /// let reader: &[u8] = b"hello" ; |
196 | /// |
197 | /// let async_reader = AssertAsync::new(reader); |
198 | /// let inner = async_reader.into_inner(); |
199 | /// ``` |
200 | #[inline (always)] |
201 | pub fn into_inner(self) -> T { |
202 | self.0 |
203 | } |
204 | } |
205 | |
206 | fn assert_async_wrapio<F, T>(mut f: F) -> Poll<std::io::Result<T>> |
207 | where |
208 | F: FnMut() -> std::io::Result<T>, |
209 | { |
210 | loop { |
211 | match f() { |
212 | Err(err: Error) if err.kind() == ErrorKind::Interrupted => {} |
213 | res: Result => return Poll::Ready(res), |
214 | } |
215 | } |
216 | } |
217 | |
218 | impl<T: std::io::Read> AsyncRead for AssertAsync<T> { |
219 | #[inline ] |
220 | fn poll_read( |
221 | mut self: Pin<&mut Self>, |
222 | _: &mut Context<'_>, |
223 | buf: &mut [u8], |
224 | ) -> Poll<Result<usize>> { |
225 | assert_async_wrapio(move || self.0.read(buf)) |
226 | } |
227 | |
228 | #[inline ] |
229 | fn poll_read_vectored( |
230 | mut self: Pin<&mut Self>, |
231 | _: &mut Context<'_>, |
232 | bufs: &mut [IoSliceMut<'_>], |
233 | ) -> Poll<Result<usize>> { |
234 | assert_async_wrapio(move || self.0.read_vectored(bufs)) |
235 | } |
236 | } |
237 | |
238 | impl<T: std::io::Write> AsyncWrite for AssertAsync<T> { |
239 | #[inline ] |
240 | fn poll_write( |
241 | mut self: Pin<&mut Self>, |
242 | _: &mut Context<'_>, |
243 | buf: &[u8], |
244 | ) -> Poll<Result<usize>> { |
245 | assert_async_wrapio(move || self.0.write(buf)) |
246 | } |
247 | |
248 | #[inline ] |
249 | fn poll_write_vectored( |
250 | mut self: Pin<&mut Self>, |
251 | _: &mut Context<'_>, |
252 | bufs: &[IoSlice<'_>], |
253 | ) -> Poll<Result<usize>> { |
254 | assert_async_wrapio(move || self.0.write_vectored(bufs)) |
255 | } |
256 | |
257 | #[inline ] |
258 | fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> { |
259 | assert_async_wrapio(move || self.0.flush()) |
260 | } |
261 | |
262 | #[inline ] |
263 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { |
264 | self.poll_flush(cx) |
265 | } |
266 | } |
267 | |
268 | impl<T: std::io::Seek> AsyncSeek for AssertAsync<T> { |
269 | #[inline ] |
270 | fn poll_seek( |
271 | mut self: Pin<&mut Self>, |
272 | _: &mut Context<'_>, |
273 | pos: SeekFrom, |
274 | ) -> Poll<Result<u64>> { |
275 | assert_async_wrapio(move || self.0.seek(pos)) |
276 | } |
277 | } |
278 | |
279 | /// A wrapper around a type that implements `AsyncRead` or `AsyncWrite` that converts `Pending` |
280 | /// polls to `WouldBlock` errors. |
281 | /// |
282 | /// This wrapper can be used as a compatibility layer between `AsyncRead` and `Read`, for types |
283 | /// that take `Read` as a parameter. |
284 | /// |
285 | /// # Examples |
286 | /// |
287 | /// ``` |
288 | /// use std::io::Read; |
289 | /// use std::task::{Poll, Context}; |
290 | /// |
291 | /// fn poll_for_io(cx: &mut Context<'_>) -> Poll<usize> { |
292 | /// // Assume we have a library that's built around `Read` and `Write` traits. |
293 | /// use cooltls::Session; |
294 | /// |
295 | /// // We want to use it with our writer that implements `AsyncWrite`. |
296 | /// let writer = Stream::new(); |
297 | /// |
298 | /// // First, we wrap our `Writer` with `AsyncAsSync` to convert `Pending` polls to `WouldBlock`. |
299 | /// use futures_lite::io::AsyncAsSync; |
300 | /// let writer = AsyncAsSync::new(cx, writer); |
301 | /// |
302 | /// // Now, we can use it with `cooltls`. |
303 | /// let mut session = Session::new(writer); |
304 | /// |
305 | /// // Match on the result of `read()` and translate it to poll. |
306 | /// match session.read(&mut [0; 1024]) { |
307 | /// Ok(n) => Poll::Ready(n), |
308 | /// Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => Poll::Pending, |
309 | /// Err(err) => panic!("unexpected error: {}" , err), |
310 | /// } |
311 | /// } |
312 | /// |
313 | /// // Usually, poll-based functions are best wrapped using `poll_fn`. |
314 | /// use futures_lite::future::poll_fn; |
315 | /// # futures_lite::future::block_on(async { |
316 | /// poll_fn(|cx| poll_for_io(cx)).await; |
317 | /// # }); |
318 | /// # struct Stream; |
319 | /// # impl Stream { |
320 | /// # fn new() -> Stream { |
321 | /// # Stream |
322 | /// # } |
323 | /// # } |
324 | /// # impl futures_lite::io::AsyncRead for Stream { |
325 | /// # fn poll_read(self: std::pin::Pin<&mut Self>, _: &mut Context<'_>, _: &mut [u8]) -> Poll<std::io::Result<usize>> { |
326 | /// # Poll::Ready(Ok(0)) |
327 | /// # } |
328 | /// # } |
329 | /// # mod cooltls { |
330 | /// # pub struct Session<W> { |
331 | /// # reader: W, |
332 | /// # } |
333 | /// # impl<W> Session<W> { |
334 | /// # pub fn new(reader: W) -> Session<W> { |
335 | /// # Session { reader } |
336 | /// # } |
337 | /// # } |
338 | /// # impl<W: std::io::Read> std::io::Read for Session<W> { |
339 | /// # fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { |
340 | /// # self.reader.read(buf) |
341 | /// # } |
342 | /// # } |
343 | /// # } |
344 | /// ``` |
345 | #[derive (Debug)] |
346 | pub struct AsyncAsSync<'r, 'ctx, T> { |
347 | /// The context we are using to poll the future. |
348 | pub context: &'r mut Context<'ctx>, |
349 | |
350 | /// The actual reader/writer we are wrapping. |
351 | pub inner: T, |
352 | } |
353 | |
354 | impl<'r, 'ctx, T> AsyncAsSync<'r, 'ctx, T> { |
355 | /// Wraps an I/O handle implementing [`AsyncRead`] or [`AsyncWrite`] traits. |
356 | /// |
357 | /// # Examples |
358 | /// |
359 | /// ``` |
360 | /// use futures_lite::io::AsyncAsSync; |
361 | /// use std::task::Context; |
362 | /// use waker_fn::waker_fn; |
363 | /// |
364 | /// let reader: &[u8] = b"hello" ; |
365 | /// let waker = waker_fn(|| {}); |
366 | /// let mut context = Context::from_waker(&waker); |
367 | /// |
368 | /// let async_reader = AsyncAsSync::new(&mut context, reader); |
369 | /// ``` |
370 | #[inline ] |
371 | pub fn new(context: &'r mut Context<'ctx>, inner: T) -> Self { |
372 | AsyncAsSync { context, inner } |
373 | } |
374 | |
375 | /// Attempt to shutdown the I/O handle. |
376 | /// |
377 | /// # Examples |
378 | /// |
379 | /// ``` |
380 | /// use futures_lite::io::AsyncAsSync; |
381 | /// use std::task::Context; |
382 | /// use waker_fn::waker_fn; |
383 | /// |
384 | /// let reader: Vec<u8> = b"hello" .to_vec(); |
385 | /// let waker = waker_fn(|| {}); |
386 | /// let mut context = Context::from_waker(&waker); |
387 | /// |
388 | /// let mut async_reader = AsyncAsSync::new(&mut context, reader); |
389 | /// async_reader.close().unwrap(); |
390 | /// ``` |
391 | #[inline ] |
392 | pub fn close(&mut self) -> Result<()> |
393 | where |
394 | T: AsyncWrite + Unpin, |
395 | { |
396 | self.poll_with(|io, cx| io.poll_close(cx)) |
397 | } |
398 | |
399 | /// Poll this `AsyncAsSync` for some function. |
400 | /// |
401 | /// # Examples |
402 | /// |
403 | /// ``` |
404 | /// use futures_lite::io::{AsyncAsSync, AsyncRead}; |
405 | /// use std::task::Context; |
406 | /// use waker_fn::waker_fn; |
407 | /// |
408 | /// let reader: &[u8] = b"hello" ; |
409 | /// let waker = waker_fn(|| {}); |
410 | /// let mut context = Context::from_waker(&waker); |
411 | /// |
412 | /// let mut async_reader = AsyncAsSync::new(&mut context, reader); |
413 | /// let r = async_reader.poll_with(|io, cx| io.poll_read(cx, &mut [0; 1024])); |
414 | /// assert_eq!(r.unwrap(), 5); |
415 | /// ``` |
416 | #[inline ] |
417 | pub fn poll_with<R>( |
418 | &mut self, |
419 | f: impl FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll<Result<R>>, |
420 | ) -> Result<R> |
421 | where |
422 | T: Unpin, |
423 | { |
424 | match f(Pin::new(&mut self.inner), self.context) { |
425 | Poll::Ready(res) => res, |
426 | Poll::Pending => Err(ErrorKind::WouldBlock.into()), |
427 | } |
428 | } |
429 | } |
430 | |
431 | impl<T: AsyncRead + Unpin> std::io::Read for AsyncAsSync<'_, '_, T> { |
432 | #[inline ] |
433 | fn read(&mut self, buf: &mut [u8]) -> Result<usize> { |
434 | self.poll_with(|io: Pin<&mut T>, cx: &mut Context<'_>| io.poll_read(cx, buf)) |
435 | } |
436 | |
437 | #[inline ] |
438 | fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> Result<usize> { |
439 | self.poll_with(|io: Pin<&mut T>, cx: &mut Context<'_>| io.poll_read_vectored(cx, bufs)) |
440 | } |
441 | } |
442 | |
443 | impl<T: AsyncWrite + Unpin> std::io::Write for AsyncAsSync<'_, '_, T> { |
444 | #[inline ] |
445 | fn write(&mut self, buf: &[u8]) -> Result<usize> { |
446 | self.poll_with(|io: Pin<&mut T>, cx: &mut Context<'_>| io.poll_write(cx, buf)) |
447 | } |
448 | |
449 | #[inline ] |
450 | fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> { |
451 | self.poll_with(|io: Pin<&mut T>, cx: &mut Context<'_>| io.poll_write_vectored(cx, bufs)) |
452 | } |
453 | |
454 | #[inline ] |
455 | fn flush(&mut self) -> Result<()> { |
456 | self.poll_with(|io: Pin<&mut T>, cx: &mut Context<'_>| io.poll_flush(cx)) |
457 | } |
458 | } |
459 | |
460 | impl<T: AsyncSeek + Unpin> std::io::Seek for AsyncAsSync<'_, '_, T> { |
461 | #[inline ] |
462 | fn seek(&mut self, pos: SeekFrom) -> Result<u64> { |
463 | self.poll_with(|io: Pin<&mut T>, cx: &mut Context<'_>| io.poll_seek(cx, pos)) |
464 | } |
465 | } |
466 | |
467 | impl<T> AsRef<T> for AsyncAsSync<'_, '_, T> { |
468 | #[inline ] |
469 | fn as_ref(&self) -> &T { |
470 | &self.inner |
471 | } |
472 | } |
473 | |
474 | impl<T> AsMut<T> for AsyncAsSync<'_, '_, T> { |
475 | #[inline ] |
476 | fn as_mut(&mut self) -> &mut T { |
477 | &mut self.inner |
478 | } |
479 | } |
480 | |
481 | impl<T> Borrow<T> for AsyncAsSync<'_, '_, T> { |
482 | #[inline ] |
483 | fn borrow(&self) -> &T { |
484 | &self.inner |
485 | } |
486 | } |
487 | |
488 | impl<T> BorrowMut<T> for AsyncAsSync<'_, '_, T> { |
489 | #[inline ] |
490 | fn borrow_mut(&mut self) -> &mut T { |
491 | &mut self.inner |
492 | } |
493 | } |
494 | |
495 | /// Blocks on all async I/O operations and implements [`std::io`] traits. |
496 | /// |
497 | /// Sometimes async I/O needs to be used in a blocking manner. If calling [`future::block_on()`] |
498 | /// manually all the time becomes too tedious, use this type for more convenient blocking on async |
499 | /// I/O operations. |
500 | /// |
501 | /// This type implements traits [`Read`][`std::io::Read`], [`Write`][`std::io::Write`], or |
502 | /// [`Seek`][`std::io::Seek`] if the inner type implements [`AsyncRead`], [`AsyncWrite`], or |
503 | /// [`AsyncSeek`], respectively. |
504 | /// |
505 | /// If writing data through the [`Write`][`std::io::Write`] trait, make sure to flush before |
506 | /// dropping the [`BlockOn`] handle or some buffered data might get lost. |
507 | /// |
508 | /// # Examples |
509 | /// |
510 | /// ``` |
511 | /// use futures_lite::io::BlockOn; |
512 | /// use futures_lite::pin; |
513 | /// use std::io::Read; |
514 | /// |
515 | /// let reader: &[u8] = b"hello" ; |
516 | /// pin!(reader); |
517 | /// |
518 | /// let mut blocking_reader = BlockOn::new(reader); |
519 | /// let mut contents = String::new(); |
520 | /// |
521 | /// // This line blocks - note that there is no await: |
522 | /// blocking_reader.read_to_string(&mut contents)?; |
523 | /// # std::io::Result::Ok(()) |
524 | /// ``` |
525 | #[derive (Debug)] |
526 | pub struct BlockOn<T>(T); |
527 | |
528 | impl<T> BlockOn<T> { |
529 | /// Wraps an async I/O handle into a blocking interface. |
530 | /// |
531 | /// # Examples |
532 | /// |
533 | /// ``` |
534 | /// use futures_lite::io::BlockOn; |
535 | /// use futures_lite::pin; |
536 | /// |
537 | /// let reader: &[u8] = b"hello" ; |
538 | /// pin!(reader); |
539 | /// |
540 | /// let blocking_reader = BlockOn::new(reader); |
541 | /// ``` |
542 | pub fn new(io: T) -> BlockOn<T> { |
543 | BlockOn(io) |
544 | } |
545 | |
546 | /// Gets a reference to the async I/O handle. |
547 | /// |
548 | /// # Examples |
549 | /// |
550 | /// ``` |
551 | /// use futures_lite::io::BlockOn; |
552 | /// use futures_lite::pin; |
553 | /// |
554 | /// let reader: &[u8] = b"hello" ; |
555 | /// pin!(reader); |
556 | /// |
557 | /// let blocking_reader = BlockOn::new(reader); |
558 | /// let r = blocking_reader.get_ref(); |
559 | /// ``` |
560 | pub fn get_ref(&self) -> &T { |
561 | &self.0 |
562 | } |
563 | |
564 | /// Gets a mutable reference to the async I/O handle. |
565 | /// |
566 | /// # Examples |
567 | /// |
568 | /// ``` |
569 | /// use futures_lite::io::BlockOn; |
570 | /// use futures_lite::pin; |
571 | /// |
572 | /// let reader: &[u8] = b"hello" ; |
573 | /// pin!(reader); |
574 | /// |
575 | /// let mut blocking_reader = BlockOn::new(reader); |
576 | /// let r = blocking_reader.get_mut(); |
577 | /// ``` |
578 | pub fn get_mut(&mut self) -> &mut T { |
579 | &mut self.0 |
580 | } |
581 | |
582 | /// Extracts the inner async I/O handle. |
583 | /// |
584 | /// # Examples |
585 | /// |
586 | /// ``` |
587 | /// use futures_lite::io::BlockOn; |
588 | /// use futures_lite::pin; |
589 | /// |
590 | /// let reader: &[u8] = b"hello" ; |
591 | /// pin!(reader); |
592 | /// |
593 | /// let blocking_reader = BlockOn::new(reader); |
594 | /// let inner = blocking_reader.into_inner(); |
595 | /// ``` |
596 | pub fn into_inner(self) -> T { |
597 | self.0 |
598 | } |
599 | } |
600 | |
601 | impl<T: AsyncRead + Unpin> std::io::Read for BlockOn<T> { |
602 | fn read(&mut self, buf: &mut [u8]) -> Result<usize> { |
603 | future::block_on(self.0.read(buf)) |
604 | } |
605 | } |
606 | |
607 | impl<T: AsyncBufRead + Unpin> std::io::BufRead for BlockOn<T> { |
608 | fn fill_buf(&mut self) -> Result<&[u8]> { |
609 | future::block_on(self.0.fill_buf()) |
610 | } |
611 | |
612 | fn consume(&mut self, amt: usize) { |
613 | Pin::new(&mut self.0).consume(amt) |
614 | } |
615 | } |
616 | |
617 | impl<T: AsyncWrite + Unpin> std::io::Write for BlockOn<T> { |
618 | fn write(&mut self, buf: &[u8]) -> Result<usize> { |
619 | future::block_on(self.0.write(buf)) |
620 | } |
621 | |
622 | fn flush(&mut self) -> Result<()> { |
623 | future::block_on(self.0.flush()) |
624 | } |
625 | } |
626 | |
627 | impl<T: AsyncSeek + Unpin> std::io::Seek for BlockOn<T> { |
628 | fn seek(&mut self, pos: SeekFrom) -> Result<u64> { |
629 | future::block_on(self.0.seek(pos)) |
630 | } |
631 | } |
632 | |
633 | pin_project! { |
634 | /// Adds buffering to a reader. |
635 | /// |
636 | /// It can be excessively inefficient to work directly with an [`AsyncRead`] instance. A |
637 | /// [`BufReader`] performs large, infrequent reads on the underlying [`AsyncRead`] and |
638 | /// maintains an in-memory buffer of the incoming byte stream. |
639 | /// |
640 | /// [`BufReader`] can improve the speed of programs that make *small* and *repeated* reads to |
641 | /// the same file or networking socket. It does not help when reading very large amounts at |
642 | /// once, or reading just once or a few times. It also provides no advantage when reading from |
643 | /// a source that is already in memory, like a `Vec<u8>`. |
644 | /// |
645 | /// When a [`BufReader`] is dropped, the contents of its buffer are discarded. Creating |
646 | /// multiple instances of [`BufReader`] on the same reader can cause data loss. |
647 | /// |
648 | /// # Examples |
649 | /// |
650 | /// ``` |
651 | /// use futures_lite::io::{AsyncBufReadExt, BufReader}; |
652 | /// |
653 | /// # spin_on::spin_on(async { |
654 | /// let input: &[u8] = b"hello"; |
655 | /// let mut reader = BufReader::new(input); |
656 | /// |
657 | /// let mut line = String::new(); |
658 | /// reader.read_line(&mut line).await?; |
659 | /// # std::io::Result::Ok(()) }); |
660 | /// ``` |
661 | pub struct BufReader<R> { |
662 | #[pin] |
663 | inner: R, |
664 | buf: Box<[u8]>, |
665 | pos: usize, |
666 | cap: usize, |
667 | } |
668 | } |
669 | |
670 | impl<R: AsyncRead> BufReader<R> { |
671 | /// Creates a buffered reader with the default buffer capacity. |
672 | /// |
673 | /// The default capacity is currently 8 KB, but that may change in the future. |
674 | /// |
675 | /// # Examples |
676 | /// |
677 | /// ``` |
678 | /// use futures_lite::io::BufReader; |
679 | /// |
680 | /// let input: &[u8] = b"hello" ; |
681 | /// let reader = BufReader::new(input); |
682 | /// ``` |
683 | pub fn new(inner: R) -> BufReader<R> { |
684 | BufReader::with_capacity(DEFAULT_BUF_SIZE, inner) |
685 | } |
686 | |
687 | /// Creates a buffered reader with the specified capacity. |
688 | /// |
689 | /// # Examples |
690 | /// |
691 | /// ``` |
692 | /// use futures_lite::io::BufReader; |
693 | /// |
694 | /// let input: &[u8] = b"hello" ; |
695 | /// let reader = BufReader::with_capacity(1024, input); |
696 | /// ``` |
697 | pub fn with_capacity(capacity: usize, inner: R) -> BufReader<R> { |
698 | BufReader { |
699 | inner, |
700 | buf: vec![0; capacity].into_boxed_slice(), |
701 | pos: 0, |
702 | cap: 0, |
703 | } |
704 | } |
705 | } |
706 | |
707 | impl<R> BufReader<R> { |
708 | /// Gets a reference to the underlying reader. |
709 | /// |
710 | /// It is not advisable to directly read from the underlying reader. |
711 | /// |
712 | /// # Examples |
713 | /// |
714 | /// ``` |
715 | /// use futures_lite::io::BufReader; |
716 | /// |
717 | /// let input: &[u8] = b"hello" ; |
718 | /// let reader = BufReader::new(input); |
719 | /// |
720 | /// let r = reader.get_ref(); |
721 | /// ``` |
722 | pub fn get_ref(&self) -> &R { |
723 | &self.inner |
724 | } |
725 | |
726 | /// Gets a mutable reference to the underlying reader. |
727 | /// |
728 | /// It is not advisable to directly read from the underlying reader. |
729 | /// |
730 | /// # Examples |
731 | /// |
732 | /// ``` |
733 | /// use futures_lite::io::BufReader; |
734 | /// |
735 | /// let input: &[u8] = b"hello" ; |
736 | /// let mut reader = BufReader::new(input); |
737 | /// |
738 | /// let r = reader.get_mut(); |
739 | /// ``` |
740 | pub fn get_mut(&mut self) -> &mut R { |
741 | &mut self.inner |
742 | } |
743 | |
744 | /// Gets a pinned mutable reference to the underlying reader. |
745 | /// |
746 | /// It is not advisable to directly read from the underlying reader. |
747 | fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> { |
748 | self.project().inner |
749 | } |
750 | |
751 | /// Returns a reference to the internal buffer. |
752 | /// |
753 | /// This method will not attempt to fill the buffer if it is empty. |
754 | /// |
755 | /// # Examples |
756 | /// |
757 | /// ``` |
758 | /// use futures_lite::io::BufReader; |
759 | /// |
760 | /// let input: &[u8] = b"hello" ; |
761 | /// let reader = BufReader::new(input); |
762 | /// |
763 | /// // The internal buffer is empty until the first read request. |
764 | /// assert_eq!(reader.buffer(), &[]); |
765 | /// ``` |
766 | pub fn buffer(&self) -> &[u8] { |
767 | &self.buf[self.pos..self.cap] |
768 | } |
769 | |
770 | /// Unwraps the buffered reader, returning the underlying reader. |
771 | /// |
772 | /// Note that any leftover data in the internal buffer will be lost. |
773 | /// |
774 | /// # Examples |
775 | /// |
776 | /// ``` |
777 | /// use futures_lite::io::BufReader; |
778 | /// |
779 | /// let input: &[u8] = b"hello" ; |
780 | /// let reader = BufReader::new(input); |
781 | /// |
782 | /// assert_eq!(reader.into_inner(), input); |
783 | /// ``` |
784 | pub fn into_inner(self) -> R { |
785 | self.inner |
786 | } |
787 | |
788 | /// Invalidates all data in the internal buffer. |
789 | #[inline ] |
790 | fn discard_buffer(self: Pin<&mut Self>) { |
791 | let this = self.project(); |
792 | *this.pos = 0; |
793 | *this.cap = 0; |
794 | } |
795 | } |
796 | |
797 | impl<R: AsyncRead> AsyncRead for BufReader<R> { |
798 | fn poll_read( |
799 | mut self: Pin<&mut Self>, |
800 | cx: &mut Context<'_>, |
801 | buf: &mut [u8], |
802 | ) -> Poll<Result<usize>> { |
803 | // If we don't have any buffered data and we're doing a massive read |
804 | // (larger than our internal buffer), bypass our internal buffer |
805 | // entirely. |
806 | if self.pos == self.cap && buf.len() >= self.buf.len() { |
807 | let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf)); |
808 | self.discard_buffer(); |
809 | return Poll::Ready(res); |
810 | } |
811 | let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?; |
812 | let nread = std::io::Read::read(&mut rem, buf)?; |
813 | self.consume(nread); |
814 | Poll::Ready(Ok(nread)) |
815 | } |
816 | |
817 | fn poll_read_vectored( |
818 | mut self: Pin<&mut Self>, |
819 | cx: &mut Context<'_>, |
820 | bufs: &mut [IoSliceMut<'_>], |
821 | ) -> Poll<Result<usize>> { |
822 | let total_len = bufs.iter().map(|b| b.len()).sum::<usize>(); |
823 | if self.pos == self.cap && total_len >= self.buf.len() { |
824 | let res = ready!(self.as_mut().get_pin_mut().poll_read_vectored(cx, bufs)); |
825 | self.discard_buffer(); |
826 | return Poll::Ready(res); |
827 | } |
828 | let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?; |
829 | let nread = std::io::Read::read_vectored(&mut rem, bufs)?; |
830 | self.consume(nread); |
831 | Poll::Ready(Ok(nread)) |
832 | } |
833 | } |
834 | |
835 | impl<R: AsyncRead> AsyncBufRead for BufReader<R> { |
836 | fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, cx: &mut Context<'_>) -> Poll<Result<&'a [u8]>> { |
837 | let mut this: Projection<'_, R> = self.project(); |
838 | |
839 | // If we've reached the end of our internal buffer then we need to fetch |
840 | // some more data from the underlying reader. |
841 | // Branch using `>=` instead of the more correct `==` |
842 | // to tell the compiler that the pos..cap slice is always valid. |
843 | if *this.pos >= *this.cap { |
844 | debug_assert!(*this.pos == *this.cap); |
845 | *this.cap = ready!(this.inner.as_mut().poll_read(cx, this.buf))?; |
846 | *this.pos = 0; |
847 | } |
848 | Poll::Ready(Ok(&this.buf[*this.pos..*this.cap])) |
849 | } |
850 | |
851 | fn consume(self: Pin<&mut Self>, amt: usize) { |
852 | let this: Projection<'_, R> = self.project(); |
853 | *this.pos = cmp::min(*this.pos + amt, *this.cap); |
854 | } |
855 | } |
856 | |
857 | impl<R: fmt::Debug> fmt::Debug for BufReader<R> { |
858 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
859 | f&mut DebugStruct<'_, '_>.debug_struct("BufReader" ) |
860 | .field("reader" , &self.inner) |
861 | .field( |
862 | name:"buffer" , |
863 | &format_args!(" {}/ {}" , self.cap - self.pos, self.buf.len()), |
864 | ) |
865 | .finish() |
866 | } |
867 | } |
868 | |
869 | impl<R: AsyncSeek> AsyncSeek for BufReader<R> { |
870 | /// Seeks to an offset, in bytes, in the underlying reader. |
871 | /// |
872 | /// The position used for seeking with [`SeekFrom::Current`] is the position the underlying |
873 | /// reader would be at if the [`BufReader`] had no internal buffer. |
874 | /// |
875 | /// Seeking always discards the internal buffer, even if the seek position would otherwise fall |
876 | /// within it. This guarantees that calling [`into_inner()`][`BufReader::into_inner()`] |
877 | /// immediately after a seek yields the underlying reader at the same position. |
878 | /// |
879 | /// See [`AsyncSeek`] for more details. |
880 | /// |
881 | /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)` where `n` minus the |
882 | /// internal buffer length overflows an `i64`, two seeks will be performed instead of one. If |
883 | /// the second seek returns `Err`, the underlying reader will be left at the same position it |
884 | /// would have if you called [`seek()`][`AsyncSeekExt::seek()`] with `SeekFrom::Current(0)`. |
885 | fn poll_seek( |
886 | mut self: Pin<&mut Self>, |
887 | cx: &mut Context<'_>, |
888 | pos: SeekFrom, |
889 | ) -> Poll<Result<u64>> { |
890 | let result: u64; |
891 | if let SeekFrom::Current(n) = pos { |
892 | let remainder = (self.cap - self.pos) as i64; |
893 | // it should be safe to assume that remainder fits within an i64 as the alternative |
894 | // means we managed to allocate 8 exbibytes and that's absurd. |
895 | // But it's not out of the realm of possibility for some weird underlying reader to |
896 | // support seeking by i64::min_value() so we need to handle underflow when subtracting |
897 | // remainder. |
898 | if let Some(offset) = n.checked_sub(remainder) { |
899 | result = ready!(self |
900 | .as_mut() |
901 | .get_pin_mut() |
902 | .poll_seek(cx, SeekFrom::Current(offset)))?; |
903 | } else { |
904 | // seek backwards by our remainder, and then by the offset |
905 | ready!(self |
906 | .as_mut() |
907 | .get_pin_mut() |
908 | .poll_seek(cx, SeekFrom::Current(-remainder)))?; |
909 | self.as_mut().discard_buffer(); |
910 | result = ready!(self |
911 | .as_mut() |
912 | .get_pin_mut() |
913 | .poll_seek(cx, SeekFrom::Current(n)))?; |
914 | } |
915 | } else { |
916 | // Seeking with Start/End doesn't care about our buffer length. |
917 | result = ready!(self.as_mut().get_pin_mut().poll_seek(cx, pos))?; |
918 | } |
919 | self.discard_buffer(); |
920 | Poll::Ready(Ok(result)) |
921 | } |
922 | } |
923 | |
924 | impl<R: AsyncWrite> AsyncWrite for BufReader<R> { |
925 | fn poll_write( |
926 | mut self: Pin<&mut Self>, |
927 | cx: &mut Context<'_>, |
928 | buf: &[u8], |
929 | ) -> Poll<Result<usize>> { |
930 | self.as_mut().get_pin_mut().poll_write(cx, buf) |
931 | } |
932 | |
933 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { |
934 | self.as_mut().get_pin_mut().poll_flush(cx) |
935 | } |
936 | |
937 | fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { |
938 | self.as_mut().get_pin_mut().poll_close(cx) |
939 | } |
940 | } |
941 | |
942 | pin_project! { |
943 | /// Adds buffering to a writer. |
944 | /// |
945 | /// It can be excessively inefficient to work directly with something that implements |
946 | /// [`AsyncWrite`]. For example, every call to [`write()`][`AsyncWriteExt::write()`] on a TCP |
947 | /// stream results in a system call. A [`BufWriter`] keeps an in-memory buffer of data and |
948 | /// writes it to the underlying writer in large, infrequent batches. |
949 | /// |
950 | /// [`BufWriter`] can improve the speed of programs that make *small* and *repeated* writes to |
951 | /// the same file or networking socket. It does not help when writing very large amounts at |
952 | /// once, or writing just once or a few times. It also provides no advantage when writing to a |
953 | /// destination that is in memory, like a `Vec<u8>`. |
954 | /// |
955 | /// Unlike [`std::io::BufWriter`], this type does not write out the contents of its buffer when |
956 | /// it is dropped. Therefore, it is important that users explicitly flush the buffer before |
957 | /// dropping the [`BufWriter`]. |
958 | /// |
959 | /// # Examples |
960 | /// |
961 | /// ``` |
962 | /// use futures_lite::io::{AsyncWriteExt, BufWriter}; |
963 | /// |
964 | /// # spin_on::spin_on(async { |
965 | /// let mut output = Vec::new(); |
966 | /// let mut writer = BufWriter::new(&mut output); |
967 | /// |
968 | /// writer.write_all(b"hello").await?; |
969 | /// writer.flush().await?; |
970 | /// # std::io::Result::Ok(()) }); |
971 | /// ``` |
972 | pub struct BufWriter<W> { |
973 | #[pin] |
974 | inner: W, |
975 | buf: Vec<u8>, |
976 | written: usize, |
977 | } |
978 | } |
979 | |
980 | impl<W: AsyncWrite> BufWriter<W> { |
981 | /// Creates a buffered writer with the default buffer capacity. |
982 | /// |
983 | /// The default capacity is currently 8 KB, but that may change in the future. |
984 | /// |
985 | /// # Examples |
986 | /// |
987 | /// ``` |
988 | /// use futures_lite::io::BufWriter; |
989 | /// |
990 | /// let mut output = Vec::new(); |
991 | /// let writer = BufWriter::new(&mut output); |
992 | /// ``` |
993 | pub fn new(inner: W) -> BufWriter<W> { |
994 | BufWriter::with_capacity(DEFAULT_BUF_SIZE, inner) |
995 | } |
996 | |
997 | /// Creates a buffered writer with the specified buffer capacity. |
998 | /// |
999 | /// # Examples |
1000 | /// |
1001 | /// ``` |
1002 | /// use futures_lite::io::BufWriter; |
1003 | /// |
1004 | /// let mut output = Vec::new(); |
1005 | /// let writer = BufWriter::with_capacity(100, &mut output); |
1006 | /// ``` |
1007 | pub fn with_capacity(capacity: usize, inner: W) -> BufWriter<W> { |
1008 | BufWriter { |
1009 | inner, |
1010 | buf: Vec::with_capacity(capacity), |
1011 | written: 0, |
1012 | } |
1013 | } |
1014 | |
1015 | /// Gets a reference to the underlying writer. |
1016 | /// |
1017 | /// # Examples |
1018 | /// |
1019 | /// ``` |
1020 | /// use futures_lite::io::BufWriter; |
1021 | /// |
1022 | /// let mut output = Vec::new(); |
1023 | /// let writer = BufWriter::new(&mut output); |
1024 | /// |
1025 | /// let r = writer.get_ref(); |
1026 | /// ``` |
1027 | pub fn get_ref(&self) -> &W { |
1028 | &self.inner |
1029 | } |
1030 | |
1031 | /// Gets a mutable reference to the underlying writer. |
1032 | /// |
1033 | /// It is not advisable to directly write to the underlying writer. |
1034 | /// |
1035 | /// # Examples |
1036 | /// |
1037 | /// ``` |
1038 | /// use futures_lite::io::BufWriter; |
1039 | /// |
1040 | /// let mut output = Vec::new(); |
1041 | /// let mut writer = BufWriter::new(&mut output); |
1042 | /// |
1043 | /// let r = writer.get_mut(); |
1044 | /// ``` |
1045 | pub fn get_mut(&mut self) -> &mut W { |
1046 | &mut self.inner |
1047 | } |
1048 | |
1049 | /// Gets a pinned mutable reference to the underlying writer. |
1050 | /// |
1051 | /// It is not not advisable to directly write to the underlying writer. |
1052 | fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> { |
1053 | self.project().inner |
1054 | } |
1055 | |
1056 | /// Unwraps the buffered writer, returning the underlying writer. |
1057 | /// |
1058 | /// Note that any leftover data in the internal buffer will be lost. If you don't want to lose |
1059 | /// that data, flush the buffered writer before unwrapping it. |
1060 | /// |
1061 | /// # Examples |
1062 | /// |
1063 | /// ``` |
1064 | /// use futures_lite::io::{AsyncWriteExt, BufWriter}; |
1065 | /// |
1066 | /// # spin_on::spin_on(async { |
1067 | /// let mut output = vec![1, 2, 3]; |
1068 | /// let mut writer = BufWriter::new(&mut output); |
1069 | /// |
1070 | /// writer.write_all(&[4]).await?; |
1071 | /// writer.flush().await?; |
1072 | /// assert_eq!(writer.into_inner(), &[1, 2, 3, 4]); |
1073 | /// # std::io::Result::Ok(()) }); |
1074 | /// ``` |
1075 | pub fn into_inner(self) -> W { |
1076 | self.inner |
1077 | } |
1078 | |
1079 | /// Returns a reference to the internal buffer. |
1080 | /// |
1081 | /// # Examples |
1082 | /// |
1083 | /// ``` |
1084 | /// use futures_lite::io::BufWriter; |
1085 | /// |
1086 | /// let mut output = Vec::new(); |
1087 | /// let writer = BufWriter::new(&mut output); |
1088 | /// |
1089 | /// // The internal buffer is empty until the first write request. |
1090 | /// assert_eq!(writer.buffer(), &[]); |
1091 | /// ``` |
1092 | pub fn buffer(&self) -> &[u8] { |
1093 | &self.buf |
1094 | } |
1095 | |
1096 | /// Flush the buffer. |
1097 | fn poll_flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { |
1098 | let mut this = self.project(); |
1099 | let len = this.buf.len(); |
1100 | let mut ret = Ok(()); |
1101 | |
1102 | while *this.written < len { |
1103 | match this |
1104 | .inner |
1105 | .as_mut() |
1106 | .poll_write(cx, &this.buf[*this.written..]) |
1107 | { |
1108 | Poll::Ready(Ok(0)) => { |
1109 | ret = Err(Error::new( |
1110 | ErrorKind::WriteZero, |
1111 | "Failed to write buffered data" , |
1112 | )); |
1113 | break; |
1114 | } |
1115 | Poll::Ready(Ok(n)) => *this.written += n, |
1116 | Poll::Ready(Err(ref e)) if e.kind() == ErrorKind::Interrupted => {} |
1117 | Poll::Ready(Err(e)) => { |
1118 | ret = Err(e); |
1119 | break; |
1120 | } |
1121 | Poll::Pending => return Poll::Pending, |
1122 | } |
1123 | } |
1124 | |
1125 | if *this.written > 0 { |
1126 | this.buf.drain(..*this.written); |
1127 | } |
1128 | *this.written = 0; |
1129 | |
1130 | Poll::Ready(ret) |
1131 | } |
1132 | } |
1133 | |
1134 | impl<W: fmt::Debug> fmt::Debug for BufWriter<W> { |
1135 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1136 | f&mut DebugStruct<'_, '_>.debug_struct("BufWriter" ) |
1137 | .field("writer" , &self.inner) |
1138 | .field(name:"buf" , &self.buf) |
1139 | .finish() |
1140 | } |
1141 | } |
1142 | |
1143 | impl<W: AsyncWrite> AsyncWrite for BufWriter<W> { |
1144 | fn poll_write( |
1145 | mut self: Pin<&mut Self>, |
1146 | cx: &mut Context<'_>, |
1147 | buf: &[u8], |
1148 | ) -> Poll<Result<usize>> { |
1149 | if self.buf.len() + buf.len() > self.buf.capacity() { |
1150 | ready!(self.as_mut().poll_flush_buf(cx))?; |
1151 | } |
1152 | if buf.len() >= self.buf.capacity() { |
1153 | self.get_pin_mut().poll_write(cx, buf) |
1154 | } else { |
1155 | Pin::new(&mut *self.project().buf).poll_write(cx, buf) |
1156 | } |
1157 | } |
1158 | |
1159 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { |
1160 | ready!(self.as_mut().poll_flush_buf(cx))?; |
1161 | self.get_pin_mut().poll_flush(cx) |
1162 | } |
1163 | |
1164 | fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { |
1165 | ready!(self.as_mut().poll_flush_buf(cx))?; |
1166 | self.get_pin_mut().poll_close(cx) |
1167 | } |
1168 | } |
1169 | |
1170 | impl<W: AsyncWrite + AsyncSeek> AsyncSeek for BufWriter<W> { |
1171 | /// Seek to the offset, in bytes, in the underlying writer. |
1172 | /// |
1173 | /// Seeking always writes out the internal buffer before seeking. |
1174 | fn poll_seek( |
1175 | mut self: Pin<&mut Self>, |
1176 | cx: &mut Context<'_>, |
1177 | pos: SeekFrom, |
1178 | ) -> Poll<Result<u64>> { |
1179 | ready!(self.as_mut().poll_flush_buf(cx))?; |
1180 | self.get_pin_mut().poll_seek(cx, pos) |
1181 | } |
1182 | } |
1183 | |
1184 | /// Gives an in-memory buffer a cursor for reading and writing. |
1185 | /// |
1186 | /// # Examples |
1187 | /// |
1188 | /// ``` |
1189 | /// use futures_lite::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, Cursor, SeekFrom}; |
1190 | /// |
1191 | /// # spin_on::spin_on(async { |
1192 | /// let mut bytes = b"hello" .to_vec(); |
1193 | /// let mut cursor = Cursor::new(&mut bytes); |
1194 | /// |
1195 | /// // Overwrite 'h' with 'H'. |
1196 | /// cursor.write_all(b"H" ).await?; |
1197 | /// |
1198 | /// // Move the cursor one byte forward. |
1199 | /// cursor.seek(SeekFrom::Current(1)).await?; |
1200 | /// |
1201 | /// // Read a byte. |
1202 | /// let mut byte = [0]; |
1203 | /// cursor.read_exact(&mut byte).await?; |
1204 | /// assert_eq!(&byte, b"l" ); |
1205 | /// |
1206 | /// // Check the final buffer. |
1207 | /// assert_eq!(bytes, b"Hello" ); |
1208 | /// # std::io::Result::Ok(()) }); |
1209 | /// ``` |
1210 | #[derive (Clone, Debug, Default)] |
1211 | pub struct Cursor<T> { |
1212 | inner: std::io::Cursor<T>, |
1213 | } |
1214 | |
1215 | impl<T> Cursor<T> { |
1216 | /// Creates a cursor for an in-memory buffer. |
1217 | /// |
1218 | /// Cursor's initial position is 0 even if the underlying buffer is not empty. Writing using |
1219 | /// [`Cursor`] will overwrite the existing contents unless the cursor is moved to the end of |
1220 | /// the buffer using [`set_position()`][Cursor::set_position()`] or |
1221 | /// [`seek()`][`AsyncSeekExt::seek()`]. |
1222 | /// |
1223 | /// # Examples |
1224 | /// |
1225 | /// ``` |
1226 | /// use futures_lite::io::Cursor; |
1227 | /// |
1228 | /// let cursor = Cursor::new(Vec::<u8>::new()); |
1229 | /// ``` |
1230 | pub fn new(inner: T) -> Cursor<T> { |
1231 | Cursor { |
1232 | inner: std::io::Cursor::new(inner), |
1233 | } |
1234 | } |
1235 | |
1236 | /// Gets a reference to the underlying buffer. |
1237 | /// |
1238 | /// # Examples |
1239 | /// |
1240 | /// ``` |
1241 | /// use futures_lite::io::Cursor; |
1242 | /// |
1243 | /// let cursor = Cursor::new(Vec::<u8>::new()); |
1244 | /// let r = cursor.get_ref(); |
1245 | /// ``` |
1246 | pub fn get_ref(&self) -> &T { |
1247 | self.inner.get_ref() |
1248 | } |
1249 | |
1250 | /// Gets a mutable reference to the underlying buffer. |
1251 | /// |
1252 | /// # Examples |
1253 | /// |
1254 | /// ``` |
1255 | /// use futures_lite::io::Cursor; |
1256 | /// |
1257 | /// let mut cursor = Cursor::new(Vec::<u8>::new()); |
1258 | /// let r = cursor.get_mut(); |
1259 | /// ``` |
1260 | pub fn get_mut(&mut self) -> &mut T { |
1261 | self.inner.get_mut() |
1262 | } |
1263 | |
1264 | /// Unwraps the cursor, returning the underlying buffer. |
1265 | /// |
1266 | /// # Examples |
1267 | /// |
1268 | /// ``` |
1269 | /// use futures_lite::io::Cursor; |
1270 | /// |
1271 | /// let cursor = Cursor::new(vec![1, 2, 3]); |
1272 | /// assert_eq!(cursor.into_inner(), [1, 2, 3]); |
1273 | /// ``` |
1274 | pub fn into_inner(self) -> T { |
1275 | self.inner.into_inner() |
1276 | } |
1277 | |
1278 | /// Returns the current position of this cursor. |
1279 | /// |
1280 | /// # Examples |
1281 | /// |
1282 | /// ``` |
1283 | /// use futures_lite::io::{AsyncSeekExt, Cursor, SeekFrom}; |
1284 | /// |
1285 | /// # spin_on::spin_on(async { |
1286 | /// let mut cursor = Cursor::new(b"hello" ); |
1287 | /// assert_eq!(cursor.position(), 0); |
1288 | /// |
1289 | /// cursor.seek(SeekFrom::Start(2)).await?; |
1290 | /// assert_eq!(cursor.position(), 2); |
1291 | /// # std::io::Result::Ok(()) }); |
1292 | /// ``` |
1293 | pub fn position(&self) -> u64 { |
1294 | self.inner.position() |
1295 | } |
1296 | |
1297 | /// Sets the position of this cursor. |
1298 | /// |
1299 | /// # Examples |
1300 | /// |
1301 | /// ``` |
1302 | /// use futures_lite::io::Cursor; |
1303 | /// |
1304 | /// let mut cursor = Cursor::new(b"hello" ); |
1305 | /// assert_eq!(cursor.position(), 0); |
1306 | /// |
1307 | /// cursor.set_position(2); |
1308 | /// assert_eq!(cursor.position(), 2); |
1309 | /// ``` |
1310 | pub fn set_position(&mut self, pos: u64) { |
1311 | self.inner.set_position(pos) |
1312 | } |
1313 | } |
1314 | |
1315 | impl<T> AsyncSeek for Cursor<T> |
1316 | where |
1317 | T: AsRef<[u8]> + Unpin, |
1318 | { |
1319 | fn poll_seek( |
1320 | mut self: Pin<&mut Self>, |
1321 | _: &mut Context<'_>, |
1322 | pos: SeekFrom, |
1323 | ) -> Poll<Result<u64>> { |
1324 | Poll::Ready(std::io::Seek::seek(&mut self.inner, pos)) |
1325 | } |
1326 | } |
1327 | |
1328 | impl<T> AsyncRead for Cursor<T> |
1329 | where |
1330 | T: AsRef<[u8]> + Unpin, |
1331 | { |
1332 | fn poll_read( |
1333 | mut self: Pin<&mut Self>, |
1334 | _cx: &mut Context<'_>, |
1335 | buf: &mut [u8], |
1336 | ) -> Poll<Result<usize>> { |
1337 | Poll::Ready(std::io::Read::read(&mut self.inner, buf)) |
1338 | } |
1339 | |
1340 | fn poll_read_vectored( |
1341 | mut self: Pin<&mut Self>, |
1342 | _: &mut Context<'_>, |
1343 | bufs: &mut [IoSliceMut<'_>], |
1344 | ) -> Poll<Result<usize>> { |
1345 | Poll::Ready(std::io::Read::read_vectored(&mut self.inner, bufs)) |
1346 | } |
1347 | } |
1348 | |
1349 | impl<T> AsyncBufRead for Cursor<T> |
1350 | where |
1351 | T: AsRef<[u8]> + Unpin, |
1352 | { |
1353 | fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<&[u8]>> { |
1354 | Poll::Ready(std::io::BufRead::fill_buf(&mut self.get_mut().inner)) |
1355 | } |
1356 | |
1357 | fn consume(mut self: Pin<&mut Self>, amt: usize) { |
1358 | std::io::BufRead::consume(&mut self.inner, amt) |
1359 | } |
1360 | } |
1361 | |
1362 | impl AsyncWrite for Cursor<&mut [u8]> { |
1363 | fn poll_write( |
1364 | mut self: Pin<&mut Self>, |
1365 | _: &mut Context<'_>, |
1366 | buf: &[u8], |
1367 | ) -> Poll<Result<usize>> { |
1368 | Poll::Ready(std::io::Write::write(&mut self.inner, buf)) |
1369 | } |
1370 | |
1371 | fn poll_write_vectored( |
1372 | mut self: Pin<&mut Self>, |
1373 | _: &mut Context<'_>, |
1374 | bufs: &[IoSlice<'_>], |
1375 | ) -> Poll<Result<usize>> { |
1376 | Poll::Ready(std::io::Write::write_vectored(&mut self.inner, bufs)) |
1377 | } |
1378 | |
1379 | fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> { |
1380 | Poll::Ready(std::io::Write::flush(&mut self.inner)) |
1381 | } |
1382 | |
1383 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { |
1384 | self.poll_flush(cx) |
1385 | } |
1386 | } |
1387 | |
1388 | impl AsyncWrite for Cursor<&mut Vec<u8>> { |
1389 | fn poll_write( |
1390 | mut self: Pin<&mut Self>, |
1391 | _: &mut Context<'_>, |
1392 | buf: &[u8], |
1393 | ) -> Poll<Result<usize>> { |
1394 | Poll::Ready(std::io::Write::write(&mut self.inner, buf)) |
1395 | } |
1396 | |
1397 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { |
1398 | self.poll_flush(cx) |
1399 | } |
1400 | |
1401 | fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> { |
1402 | Poll::Ready(std::io::Write::flush(&mut self.inner)) |
1403 | } |
1404 | } |
1405 | |
1406 | impl AsyncWrite for Cursor<Vec<u8>> { |
1407 | fn poll_write( |
1408 | mut self: Pin<&mut Self>, |
1409 | _: &mut Context<'_>, |
1410 | buf: &[u8], |
1411 | ) -> Poll<Result<usize>> { |
1412 | Poll::Ready(std::io::Write::write(&mut self.inner, buf)) |
1413 | } |
1414 | |
1415 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { |
1416 | self.poll_flush(cx) |
1417 | } |
1418 | |
1419 | fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> { |
1420 | Poll::Ready(std::io::Write::flush(&mut self.inner)) |
1421 | } |
1422 | } |
1423 | |
1424 | /// Creates an empty reader. |
1425 | /// |
1426 | /// # Examples |
1427 | /// |
1428 | /// ``` |
1429 | /// use futures_lite::io::{self, AsyncReadExt}; |
1430 | /// |
1431 | /// # spin_on::spin_on(async { |
1432 | /// let mut reader = io::empty(); |
1433 | /// |
1434 | /// let mut contents = Vec::new(); |
1435 | /// reader.read_to_end(&mut contents).await?; |
1436 | /// assert!(contents.is_empty()); |
1437 | /// # std::io::Result::Ok(()) }); |
1438 | /// ``` |
1439 | pub fn empty() -> Empty { |
1440 | Empty { _private: () } |
1441 | } |
1442 | |
1443 | /// Reader for the [`empty()`] function. |
1444 | pub struct Empty { |
1445 | _private: (), |
1446 | } |
1447 | |
1448 | impl fmt::Debug for Empty { |
1449 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1450 | f.pad("Empty { .. }" ) |
1451 | } |
1452 | } |
1453 | |
1454 | impl AsyncRead for Empty { |
1455 | #[inline ] |
1456 | fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, _: &mut [u8]) -> Poll<Result<usize>> { |
1457 | Poll::Ready(Ok(0)) |
1458 | } |
1459 | } |
1460 | |
1461 | impl AsyncBufRead for Empty { |
1462 | #[inline ] |
1463 | fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, _: &mut Context<'_>) -> Poll<Result<&'a [u8]>> { |
1464 | Poll::Ready(Ok(&[])) |
1465 | } |
1466 | |
1467 | #[inline ] |
1468 | fn consume(self: Pin<&mut Self>, _: usize) {} |
1469 | } |
1470 | |
1471 | /// Creates an infinite reader that reads the same byte repeatedly. |
1472 | /// |
1473 | /// # Examples |
1474 | /// |
1475 | /// ``` |
1476 | /// use futures_lite::io::{self, AsyncReadExt}; |
1477 | /// |
1478 | /// # spin_on::spin_on(async { |
1479 | /// let mut reader = io::repeat(b'a' ); |
1480 | /// |
1481 | /// let mut contents = vec![0; 5]; |
1482 | /// reader.read_exact(&mut contents).await?; |
1483 | /// assert_eq!(contents, b"aaaaa" ); |
1484 | /// # std::io::Result::Ok(()) }); |
1485 | /// ``` |
1486 | pub fn repeat(byte: u8) -> Repeat { |
1487 | Repeat { byte } |
1488 | } |
1489 | |
1490 | /// Reader for the [`repeat()`] function. |
1491 | #[derive (Debug)] |
1492 | pub struct Repeat { |
1493 | byte: u8, |
1494 | } |
1495 | |
1496 | impl AsyncRead for Repeat { |
1497 | #[inline ] |
1498 | fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> { |
1499 | for b: &mut u8 in &mut *buf { |
1500 | *b = self.byte; |
1501 | } |
1502 | Poll::Ready(Ok(buf.len())) |
1503 | } |
1504 | } |
1505 | |
1506 | /// Creates a writer that consumes and drops all data. |
1507 | /// |
1508 | /// # Examples |
1509 | /// |
1510 | /// ``` |
1511 | /// use futures_lite::io::{self, AsyncWriteExt}; |
1512 | /// |
1513 | /// # spin_on::spin_on(async { |
1514 | /// let mut writer = io::sink(); |
1515 | /// writer.write_all(b"hello" ).await?; |
1516 | /// # std::io::Result::Ok(()) }); |
1517 | /// ``` |
1518 | pub fn sink() -> Sink { |
1519 | Sink { _private: () } |
1520 | } |
1521 | |
1522 | /// Writer for the [`sink()`] function. |
1523 | #[derive (Debug)] |
1524 | pub struct Sink { |
1525 | _private: (), |
1526 | } |
1527 | |
1528 | impl AsyncWrite for Sink { |
1529 | #[inline ] |
1530 | fn poll_write(self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> { |
1531 | Poll::Ready(Ok(buf.len())) |
1532 | } |
1533 | |
1534 | #[inline ] |
1535 | fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> { |
1536 | Poll::Ready(Ok(())) |
1537 | } |
1538 | |
1539 | #[inline ] |
1540 | fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> { |
1541 | Poll::Ready(Ok(())) |
1542 | } |
1543 | } |
1544 | |
1545 | /// Extension trait for [`AsyncBufRead`]. |
1546 | pub trait AsyncBufReadExt: AsyncBufRead { |
1547 | /// Returns the contents of the internal buffer, filling it with more data if empty. |
1548 | /// |
1549 | /// If the stream has reached EOF, an empty buffer will be returned. |
1550 | /// |
1551 | /// # Examples |
1552 | /// |
1553 | /// ``` |
1554 | /// use futures_lite::io::{AsyncBufReadExt, BufReader}; |
1555 | /// use std::pin::Pin; |
1556 | /// |
1557 | /// # spin_on::spin_on(async { |
1558 | /// let input: &[u8] = b"hello world" ; |
1559 | /// let mut reader = BufReader::with_capacity(5, input); |
1560 | /// |
1561 | /// assert_eq!(reader.fill_buf().await?, b"hello" ); |
1562 | /// reader.consume(2); |
1563 | /// assert_eq!(reader.fill_buf().await?, b"llo" ); |
1564 | /// reader.consume(3); |
1565 | /// assert_eq!(reader.fill_buf().await?, b" worl" ); |
1566 | /// # std::io::Result::Ok(()) }); |
1567 | /// ``` |
1568 | fn fill_buf(&mut self) -> FillBuf<'_, Self> |
1569 | where |
1570 | Self: Unpin, |
1571 | { |
1572 | FillBuf { reader: Some(self) } |
1573 | } |
1574 | |
1575 | /// Consumes `amt` buffered bytes. |
1576 | /// |
1577 | /// This method does not perform any I/O, it simply consumes some amount of bytes from the |
1578 | /// internal buffer. |
1579 | /// |
1580 | /// The `amt` must be <= the number of bytes in the buffer returned by |
1581 | /// [`fill_buf()`][`AsyncBufReadExt::fill_buf()`]. |
1582 | /// |
1583 | /// # Examples |
1584 | /// |
1585 | /// ``` |
1586 | /// use futures_lite::io::{AsyncBufReadExt, BufReader}; |
1587 | /// use std::pin::Pin; |
1588 | /// |
1589 | /// # spin_on::spin_on(async { |
1590 | /// let input: &[u8] = b"hello" ; |
1591 | /// let mut reader = BufReader::with_capacity(4, input); |
1592 | /// |
1593 | /// assert_eq!(reader.fill_buf().await?, b"hell" ); |
1594 | /// reader.consume(2); |
1595 | /// assert_eq!(reader.fill_buf().await?, b"ll" ); |
1596 | /// # std::io::Result::Ok(()) }); |
1597 | /// ``` |
1598 | fn consume(&mut self, amt: usize) |
1599 | where |
1600 | Self: Unpin, |
1601 | { |
1602 | AsyncBufRead::consume(Pin::new(self), amt); |
1603 | } |
1604 | |
1605 | /// Reads all bytes and appends them into `buf` until the delimiter `byte` or EOF is found. |
1606 | /// |
1607 | /// This method will read bytes from the underlying stream until the delimiter or EOF is |
1608 | /// found. All bytes up to and including the delimiter (if found) will be appended to `buf`. |
1609 | /// |
1610 | /// If successful, returns the total number of bytes read. |
1611 | /// |
1612 | /// # Examples |
1613 | /// |
1614 | /// ``` |
1615 | /// use futures_lite::io::{AsyncBufReadExt, BufReader}; |
1616 | /// |
1617 | /// # spin_on::spin_on(async { |
1618 | /// let input: &[u8] = b"hello" ; |
1619 | /// let mut reader = BufReader::new(input); |
1620 | /// |
1621 | /// let mut buf = Vec::new(); |
1622 | /// let n = reader.read_until(b' \n' , &mut buf).await?; |
1623 | /// # std::io::Result::Ok(()) }); |
1624 | /// ``` |
1625 | fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUntilFuture<'_, Self> |
1626 | where |
1627 | Self: Unpin, |
1628 | { |
1629 | ReadUntilFuture { |
1630 | reader: self, |
1631 | byte, |
1632 | buf, |
1633 | read: 0, |
1634 | } |
1635 | } |
1636 | |
1637 | /// Reads all bytes and appends them into `buf` until a newline (the 0xA byte) or EOF is found. |
1638 | /// |
1639 | /// This method will read bytes from the underlying stream until the newline delimiter (the |
1640 | /// 0xA byte) or EOF is found. All bytes up to, and including, the newline delimiter (if found) |
1641 | /// will be appended to `buf`. |
1642 | /// |
1643 | /// If successful, returns the total number of bytes read. |
1644 | /// |
1645 | /// # Examples |
1646 | /// |
1647 | /// ``` |
1648 | /// use futures_lite::io::{AsyncBufReadExt, BufReader}; |
1649 | /// |
1650 | /// # spin_on::spin_on(async { |
1651 | /// let input: &[u8] = b"hello" ; |
1652 | /// let mut reader = BufReader::new(input); |
1653 | /// |
1654 | /// let mut line = String::new(); |
1655 | /// let n = reader.read_line(&mut line).await?; |
1656 | /// # std::io::Result::Ok(()) }); |
1657 | /// ``` |
1658 | fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLineFuture<'_, Self> |
1659 | where |
1660 | Self: Unpin, |
1661 | { |
1662 | ReadLineFuture { |
1663 | reader: self, |
1664 | buf, |
1665 | bytes: Vec::new(), |
1666 | read: 0, |
1667 | } |
1668 | } |
1669 | |
1670 | /// Returns a stream over the lines of this byte stream. |
1671 | /// |
1672 | /// The stream returned from this method yields items of type |
1673 | /// [`io::Result`][`super::io::Result`]`<`[`String`]`>`. |
1674 | /// Each string returned will *not* have a newline byte (the 0xA byte) or CRLF (0xD, 0xA bytes) |
1675 | /// at the end. |
1676 | /// |
1677 | /// # Examples |
1678 | /// |
1679 | /// ``` |
1680 | /// use futures_lite::io::{AsyncBufReadExt, BufReader}; |
1681 | /// use futures_lite::stream::StreamExt; |
1682 | /// |
1683 | /// # spin_on::spin_on(async { |
1684 | /// let input: &[u8] = b"hello \nworld \n" ; |
1685 | /// let mut reader = BufReader::new(input); |
1686 | /// let mut lines = reader.lines(); |
1687 | /// |
1688 | /// while let Some(line) = lines.next().await { |
1689 | /// println!("{}" , line?); |
1690 | /// } |
1691 | /// # std::io::Result::Ok(()) }); |
1692 | /// ``` |
1693 | fn lines(self) -> Lines<Self> |
1694 | where |
1695 | Self: Unpin + Sized, |
1696 | { |
1697 | Lines { |
1698 | reader: self, |
1699 | buf: String::new(), |
1700 | bytes: Vec::new(), |
1701 | read: 0, |
1702 | } |
1703 | } |
1704 | |
1705 | /// Returns a stream over the contents of this reader split on the specified `byte`. |
1706 | /// |
1707 | /// The stream returned from this method yields items of type |
1708 | /// [`io::Result`][`super::io::Result`]`<`[`Vec<u8>`][`Vec`]`>`. |
1709 | /// Each vector returned will *not* have the delimiter byte at the end. |
1710 | /// |
1711 | /// # Examples |
1712 | /// |
1713 | /// ``` |
1714 | /// use futures_lite::io::{AsyncBufReadExt, Cursor}; |
1715 | /// use futures_lite::stream::StreamExt; |
1716 | /// |
1717 | /// # spin_on::spin_on(async { |
1718 | /// let cursor = Cursor::new(b"lorem-ipsum-dolor" ); |
1719 | /// let items: Vec<Vec<u8>> = cursor.split(b'-' ).try_collect().await?; |
1720 | /// |
1721 | /// assert_eq!(items[0], b"lorem" ); |
1722 | /// assert_eq!(items[1], b"ipsum" ); |
1723 | /// assert_eq!(items[2], b"dolor" ); |
1724 | /// # std::io::Result::Ok(()) }); |
1725 | /// ``` |
1726 | fn split(self, byte: u8) -> Split<Self> |
1727 | where |
1728 | Self: Sized, |
1729 | { |
1730 | Split { |
1731 | reader: self, |
1732 | buf: Vec::new(), |
1733 | delim: byte, |
1734 | read: 0, |
1735 | } |
1736 | } |
1737 | } |
1738 | |
1739 | impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {} |
1740 | |
1741 | /// Future for the [`AsyncBufReadExt::fill_buf()`] method. |
1742 | #[derive (Debug)] |
1743 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
1744 | pub struct FillBuf<'a, R: ?Sized> { |
1745 | reader: Option<&'a mut R>, |
1746 | } |
1747 | |
1748 | impl<R: ?Sized> Unpin for FillBuf<'_, R> {} |
1749 | |
1750 | impl<'a, R> Future for FillBuf<'a, R> |
1751 | where |
1752 | R: AsyncBufRead + Unpin + ?Sized, |
1753 | { |
1754 | type Output = Result<&'a [u8]>; |
1755 | |
1756 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
1757 | let this: &mut FillBuf<'_, R> = &mut *self; |
1758 | let reader: &mut R = this |
1759 | .reader |
1760 | .take() |
1761 | .expect(msg:"polled `FillBuf` after completion" ); |
1762 | |
1763 | match Pin::new(&mut *reader).poll_fill_buf(cx) { |
1764 | Poll::Ready(Ok(_)) => match Pin::new(pointer:reader).poll_fill_buf(cx) { |
1765 | Poll::Ready(Ok(slice: &[u8])) => Poll::Ready(Ok(slice)), |
1766 | poll: Poll> => panic!("`poll_fill_buf()` was ready but now it isn't: {:?}" , poll), |
1767 | }, |
1768 | Poll::Ready(Err(err: Error)) => Poll::Ready(Err(err)), |
1769 | Poll::Pending => { |
1770 | this.reader = Some(reader); |
1771 | Poll::Pending |
1772 | } |
1773 | } |
1774 | } |
1775 | } |
1776 | |
1777 | /// Future for the [`AsyncBufReadExt::read_until()`] method. |
1778 | #[derive (Debug)] |
1779 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
1780 | pub struct ReadUntilFuture<'a, R: Unpin + ?Sized> { |
1781 | reader: &'a mut R, |
1782 | byte: u8, |
1783 | buf: &'a mut Vec<u8>, |
1784 | read: usize, |
1785 | } |
1786 | |
1787 | impl<R: Unpin + ?Sized> Unpin for ReadUntilFuture<'_, R> {} |
1788 | |
1789 | impl<R: AsyncBufRead + Unpin + ?Sized> Future for ReadUntilFuture<'_, R> { |
1790 | type Output = Result<usize>; |
1791 | |
1792 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
1793 | let Self { |
1794 | reader: &mut &mut R, |
1795 | byte: &mut u8, |
1796 | buf: &mut &mut Vec, |
1797 | read: &mut usize, |
1798 | } = &mut *self; |
1799 | read_until_internal(reader:Pin::new(pointer:reader), cx, *byte, buf, read) |
1800 | } |
1801 | } |
1802 | |
1803 | fn read_until_internal<R: AsyncBufReadExt + ?Sized>( |
1804 | mut reader: Pin<&mut R>, |
1805 | cx: &mut Context<'_>, |
1806 | byte: u8, |
1807 | buf: &mut Vec<u8>, |
1808 | read: &mut usize, |
1809 | ) -> Poll<Result<usize>> { |
1810 | loop { |
1811 | let (done: bool, used: usize) = { |
1812 | let available: &[u8] = ready!(reader.as_mut().poll_fill_buf(cx))?; |
1813 | |
1814 | if let Some(i: usize) = memchr(needle:byte, haystack:available) { |
1815 | buf.extend_from_slice(&available[..=i]); |
1816 | (true, i + 1) |
1817 | } else { |
1818 | buf.extend_from_slice(available); |
1819 | (false, available.len()) |
1820 | } |
1821 | }; |
1822 | |
1823 | reader.as_mut().consume(amt:used); |
1824 | *read += used; |
1825 | |
1826 | if done || used == 0 { |
1827 | return Poll::Ready(Ok(mem::replace(dest:read, src:0))); |
1828 | } |
1829 | } |
1830 | } |
1831 | |
1832 | /// Future for the [`AsyncBufReadExt::read_line()`] method. |
1833 | #[derive (Debug)] |
1834 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
1835 | pub struct ReadLineFuture<'a, R: Unpin + ?Sized> { |
1836 | reader: &'a mut R, |
1837 | buf: &'a mut String, |
1838 | bytes: Vec<u8>, |
1839 | read: usize, |
1840 | } |
1841 | |
1842 | impl<R: Unpin + ?Sized> Unpin for ReadLineFuture<'_, R> {} |
1843 | |
1844 | impl<R: AsyncBufRead + Unpin + ?Sized> Future for ReadLineFuture<'_, R> { |
1845 | type Output = Result<usize>; |
1846 | |
1847 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
1848 | let Self { |
1849 | reader: &mut &mut R, |
1850 | buf: &mut &mut String, |
1851 | bytes: &mut Vec, |
1852 | read: &mut usize, |
1853 | } = &mut *self; |
1854 | read_line_internal(reader:Pin::new(pointer:reader), cx, buf, bytes, read) |
1855 | } |
1856 | } |
1857 | |
1858 | pin_project! { |
1859 | /// Stream for the [`AsyncBufReadExt::lines()`] method. |
1860 | #[derive(Debug)] |
1861 | #[must_use = "streams do nothing unless polled" ] |
1862 | pub struct Lines<R> { |
1863 | #[pin] |
1864 | reader: R, |
1865 | buf: String, |
1866 | bytes: Vec<u8>, |
1867 | read: usize, |
1868 | } |
1869 | } |
1870 | |
1871 | impl<R: AsyncBufRead> Stream for Lines<R> { |
1872 | type Item = Result<String>; |
1873 | |
1874 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
1875 | let this = self.project(); |
1876 | |
1877 | let n = ready!(read_line_internal( |
1878 | this.reader, |
1879 | cx, |
1880 | this.buf, |
1881 | this.bytes, |
1882 | this.read |
1883 | ))?; |
1884 | if n == 0 && this.buf.is_empty() { |
1885 | return Poll::Ready(None); |
1886 | } |
1887 | |
1888 | if this.buf.ends_with(' \n' ) { |
1889 | this.buf.pop(); |
1890 | if this.buf.ends_with(' \r' ) { |
1891 | this.buf.pop(); |
1892 | } |
1893 | } |
1894 | Poll::Ready(Some(Ok(mem::take(this.buf)))) |
1895 | } |
1896 | } |
1897 | |
1898 | fn read_line_internal<R: AsyncBufRead + ?Sized>( |
1899 | reader: Pin<&mut R>, |
1900 | cx: &mut Context<'_>, |
1901 | buf: &mut String, |
1902 | bytes: &mut Vec<u8>, |
1903 | read: &mut usize, |
1904 | ) -> Poll<Result<usize>> { |
1905 | let ret: Result = ready!(read_until_internal(reader, cx, b' \n' , bytes, read)); |
1906 | |
1907 | match String::from_utf8(vec:mem::take(dest:bytes)) { |
1908 | Ok(s: String) => { |
1909 | debug_assert!(buf.is_empty()); |
1910 | debug_assert_eq!(*read, 0); |
1911 | *buf = s; |
1912 | Poll::Ready(ret) |
1913 | } |
1914 | Err(_) => Poll::Ready(ret.and_then(|_| { |
1915 | Err(Error::new( |
1916 | kind:ErrorKind::InvalidData, |
1917 | error:"stream did not contain valid UTF-8" , |
1918 | )) |
1919 | })), |
1920 | } |
1921 | } |
1922 | |
1923 | pin_project! { |
1924 | /// Stream for the [`AsyncBufReadExt::split()`] method. |
1925 | #[derive(Debug)] |
1926 | #[must_use = "streams do nothing unless polled" ] |
1927 | pub struct Split<R> { |
1928 | #[pin] |
1929 | reader: R, |
1930 | buf: Vec<u8>, |
1931 | read: usize, |
1932 | delim: u8, |
1933 | } |
1934 | } |
1935 | |
1936 | impl<R: AsyncBufRead> Stream for Split<R> { |
1937 | type Item = Result<Vec<u8>>; |
1938 | |
1939 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
1940 | let this: Projection<'_, R> = self.project(); |
1941 | |
1942 | let n: usize = ready!(read_until_internal( |
1943 | this.reader, |
1944 | cx, |
1945 | *this.delim, |
1946 | this.buf, |
1947 | this.read |
1948 | ))?; |
1949 | if n == 0 && this.buf.is_empty() { |
1950 | return Poll::Ready(None); |
1951 | } |
1952 | |
1953 | if this.buf[this.buf.len() - 1] == *this.delim { |
1954 | this.buf.pop(); |
1955 | } |
1956 | Poll::Ready(Some(Ok(mem::take(dest:this.buf)))) |
1957 | } |
1958 | } |
1959 | |
1960 | /// Extension trait for [`AsyncRead`]. |
1961 | pub trait AsyncReadExt: AsyncRead { |
1962 | /// Reads some bytes from the byte stream. |
1963 | /// |
1964 | /// On success, returns the total number of bytes read. |
1965 | /// |
1966 | /// If the return value is `Ok(n)`, then it must be guaranteed that |
1967 | /// `0 <= n <= buf.len()`. A nonzero `n` value indicates that the buffer has been |
1968 | /// filled with `n` bytes of data. If `n` is `0`, then it can indicate one of two |
1969 | /// scenarios: |
1970 | /// |
1971 | /// 1. This reader has reached its "end of file" and will likely no longer be able to |
1972 | /// produce bytes. Note that this does not mean that the reader will always no |
1973 | /// longer be able to produce bytes. |
1974 | /// 2. The buffer specified was 0 bytes in length. |
1975 | /// |
1976 | /// # Examples |
1977 | /// |
1978 | /// ``` |
1979 | /// use futures_lite::io::{AsyncReadExt, BufReader}; |
1980 | /// |
1981 | /// # spin_on::spin_on(async { |
1982 | /// let input: &[u8] = b"hello" ; |
1983 | /// let mut reader = BufReader::new(input); |
1984 | /// |
1985 | /// let mut buf = vec![0; 1024]; |
1986 | /// let n = reader.read(&mut buf).await?; |
1987 | /// # std::io::Result::Ok(()) }); |
1988 | /// ``` |
1989 | fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadFuture<'a, Self> |
1990 | where |
1991 | Self: Unpin, |
1992 | { |
1993 | ReadFuture { reader: self, buf } |
1994 | } |
1995 | |
1996 | /// Like [`read()`][`AsyncReadExt::read()`], except it reads into a slice of buffers. |
1997 | /// |
1998 | /// Data is copied to fill each buffer in order, with the final buffer possibly being |
1999 | /// only partially filled. This method must behave same as a single call to |
2000 | /// [`read()`][`AsyncReadExt::read()`] with the buffers concatenated would. |
2001 | fn read_vectored<'a>( |
2002 | &'a mut self, |
2003 | bufs: &'a mut [IoSliceMut<'a>], |
2004 | ) -> ReadVectoredFuture<'a, Self> |
2005 | where |
2006 | Self: Unpin, |
2007 | { |
2008 | ReadVectoredFuture { reader: self, bufs } |
2009 | } |
2010 | |
2011 | /// Reads the entire contents and appends them to a [`Vec`]. |
2012 | /// |
2013 | /// On success, returns the total number of bytes read. |
2014 | /// |
2015 | /// # Examples |
2016 | /// |
2017 | /// ``` |
2018 | /// use futures_lite::io::{AsyncReadExt, Cursor}; |
2019 | /// |
2020 | /// # spin_on::spin_on(async { |
2021 | /// let mut reader = Cursor::new(vec![1, 2, 3]); |
2022 | /// let mut contents = Vec::new(); |
2023 | /// |
2024 | /// let n = reader.read_to_end(&mut contents).await?; |
2025 | /// assert_eq!(n, 3); |
2026 | /// assert_eq!(contents, [1, 2, 3]); |
2027 | /// # std::io::Result::Ok(()) }); |
2028 | /// ``` |
2029 | fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEndFuture<'a, Self> |
2030 | where |
2031 | Self: Unpin, |
2032 | { |
2033 | let start_len = buf.len(); |
2034 | ReadToEndFuture { |
2035 | reader: self, |
2036 | buf, |
2037 | start_len, |
2038 | } |
2039 | } |
2040 | |
2041 | /// Reads the entire contents and appends them to a [`String`]. |
2042 | /// |
2043 | /// On success, returns the total number of bytes read. |
2044 | /// |
2045 | /// # Examples |
2046 | /// |
2047 | /// ``` |
2048 | /// use futures_lite::io::{AsyncReadExt, Cursor}; |
2049 | /// |
2050 | /// # spin_on::spin_on(async { |
2051 | /// let mut reader = Cursor::new(&b"hello" ); |
2052 | /// let mut contents = String::new(); |
2053 | /// |
2054 | /// let n = reader.read_to_string(&mut contents).await?; |
2055 | /// assert_eq!(n, 5); |
2056 | /// assert_eq!(contents, "hello" ); |
2057 | /// # std::io::Result::Ok(()) }); |
2058 | /// ``` |
2059 | fn read_to_string<'a>(&'a mut self, buf: &'a mut String) -> ReadToStringFuture<'a, Self> |
2060 | where |
2061 | Self: Unpin, |
2062 | { |
2063 | ReadToStringFuture { |
2064 | reader: self, |
2065 | buf, |
2066 | bytes: Vec::new(), |
2067 | start_len: 0, |
2068 | } |
2069 | } |
2070 | |
2071 | /// Reads the exact number of bytes required to fill `buf`. |
2072 | /// |
2073 | /// On success, returns the total number of bytes read. |
2074 | /// |
2075 | /// # Examples |
2076 | /// |
2077 | /// ``` |
2078 | /// use futures_lite::io::{AsyncReadExt, Cursor}; |
2079 | /// |
2080 | /// # spin_on::spin_on(async { |
2081 | /// let mut reader = Cursor::new(&b"hello" ); |
2082 | /// let mut contents = vec![0; 3]; |
2083 | /// |
2084 | /// reader.read_exact(&mut contents).await?; |
2085 | /// assert_eq!(contents, b"hel" ); |
2086 | /// # std::io::Result::Ok(()) }); |
2087 | /// ``` |
2088 | fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExactFuture<'a, Self> |
2089 | where |
2090 | Self: Unpin, |
2091 | { |
2092 | ReadExactFuture { reader: self, buf } |
2093 | } |
2094 | |
2095 | /// Creates an adapter which will read at most `limit` bytes from it. |
2096 | /// |
2097 | /// This method returns a new instance of [`AsyncRead`] which will read at most |
2098 | /// `limit` bytes, after which it will always return `Ok(0)` indicating EOF. |
2099 | /// |
2100 | /// # Examples |
2101 | /// |
2102 | /// ``` |
2103 | /// use futures_lite::io::{AsyncReadExt, Cursor}; |
2104 | /// |
2105 | /// # spin_on::spin_on(async { |
2106 | /// let mut reader = Cursor::new(&b"hello" ); |
2107 | /// let mut contents = String::new(); |
2108 | /// |
2109 | /// let n = reader.take(3).read_to_string(&mut contents).await?; |
2110 | /// assert_eq!(n, 3); |
2111 | /// assert_eq!(contents, "hel" ); |
2112 | /// # std::io::Result::Ok(()) }); |
2113 | /// ``` |
2114 | fn take(self, limit: u64) -> Take<Self> |
2115 | where |
2116 | Self: Sized, |
2117 | { |
2118 | Take { inner: self, limit } |
2119 | } |
2120 | |
2121 | /// Converts this [`AsyncRead`] into a [`Stream`] of bytes. |
2122 | /// |
2123 | /// The returned type implements [`Stream`] where `Item` is `io::Result<u8>`. |
2124 | /// |
2125 | /// ``` |
2126 | /// use futures_lite::io::{AsyncReadExt, Cursor}; |
2127 | /// use futures_lite::stream::StreamExt; |
2128 | /// |
2129 | /// # spin_on::spin_on(async { |
2130 | /// let reader = Cursor::new(&b"hello" ); |
2131 | /// let mut bytes = reader.bytes(); |
2132 | /// |
2133 | /// while let Some(byte) = bytes.next().await { |
2134 | /// println!("byte: {}" , byte?); |
2135 | /// } |
2136 | /// # std::io::Result::Ok(()) }); |
2137 | /// ``` |
2138 | fn bytes(self) -> Bytes<Self> |
2139 | where |
2140 | Self: Sized, |
2141 | { |
2142 | Bytes { inner: self } |
2143 | } |
2144 | |
2145 | /// Creates an adapter which will chain this stream with another. |
2146 | /// |
2147 | /// The returned [`AsyncRead`] instance will first read all bytes from this reader |
2148 | /// until EOF is found, and then continue with `next`. |
2149 | /// |
2150 | /// # Examples |
2151 | /// |
2152 | /// ``` |
2153 | /// use futures_lite::io::{AsyncReadExt, Cursor}; |
2154 | /// |
2155 | /// # spin_on::spin_on(async { |
2156 | /// let r1 = Cursor::new(&b"hello" ); |
2157 | /// let r2 = Cursor::new(&b"world" ); |
2158 | /// let mut reader = r1.chain(r2); |
2159 | /// |
2160 | /// let mut contents = String::new(); |
2161 | /// reader.read_to_string(&mut contents).await?; |
2162 | /// assert_eq!(contents, "helloworld" ); |
2163 | /// # std::io::Result::Ok(()) }); |
2164 | /// ``` |
2165 | fn chain<R: AsyncRead>(self, next: R) -> Chain<Self, R> |
2166 | where |
2167 | Self: Sized, |
2168 | { |
2169 | Chain { |
2170 | first: self, |
2171 | second: next, |
2172 | done_first: false, |
2173 | } |
2174 | } |
2175 | |
2176 | /// Boxes the reader and changes its type to `dyn AsyncRead + Send + 'a`. |
2177 | /// |
2178 | /// # Examples |
2179 | /// |
2180 | /// ``` |
2181 | /// use futures_lite::io::AsyncReadExt; |
2182 | /// |
2183 | /// let reader = [1, 2, 3].boxed_reader(); |
2184 | /// ``` |
2185 | #[cfg (feature = "alloc" )] |
2186 | fn boxed_reader<'a>(self) -> Pin<Box<dyn AsyncRead + Send + 'a>> |
2187 | where |
2188 | Self: Sized + Send + 'a, |
2189 | { |
2190 | Box::pin(self) |
2191 | } |
2192 | } |
2193 | |
2194 | impl<R: AsyncRead + ?Sized> AsyncReadExt for R {} |
2195 | |
2196 | /// Future for the [`AsyncReadExt::read()`] method. |
2197 | #[derive (Debug)] |
2198 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
2199 | pub struct ReadFuture<'a, R: Unpin + ?Sized> { |
2200 | reader: &'a mut R, |
2201 | buf: &'a mut [u8], |
2202 | } |
2203 | |
2204 | impl<R: Unpin + ?Sized> Unpin for ReadFuture<'_, R> {} |
2205 | |
2206 | impl<R: AsyncRead + Unpin + ?Sized> Future for ReadFuture<'_, R> { |
2207 | type Output = Result<usize>; |
2208 | |
2209 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
2210 | let Self { reader: &mut &mut R, buf: &mut &mut [u8] } = &mut *self; |
2211 | Pin::new(pointer:reader).poll_read(cx, buf) |
2212 | } |
2213 | } |
2214 | |
2215 | /// Future for the [`AsyncReadExt::read_vectored()`] method. |
2216 | #[derive (Debug)] |
2217 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
2218 | pub struct ReadVectoredFuture<'a, R: Unpin + ?Sized> { |
2219 | reader: &'a mut R, |
2220 | bufs: &'a mut [IoSliceMut<'a>], |
2221 | } |
2222 | |
2223 | impl<R: Unpin + ?Sized> Unpin for ReadVectoredFuture<'_, R> {} |
2224 | |
2225 | impl<R: AsyncRead + Unpin + ?Sized> Future for ReadVectoredFuture<'_, R> { |
2226 | type Output = Result<usize>; |
2227 | |
2228 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
2229 | let Self { reader: &mut &mut R, bufs: &mut &mut [IoSliceMut<'_>] } = &mut *self; |
2230 | Pin::new(pointer:reader).poll_read_vectored(cx, bufs) |
2231 | } |
2232 | } |
2233 | |
2234 | /// Future for the [`AsyncReadExt::read_to_end()`] method. |
2235 | #[derive (Debug)] |
2236 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
2237 | pub struct ReadToEndFuture<'a, R: Unpin + ?Sized> { |
2238 | reader: &'a mut R, |
2239 | buf: &'a mut Vec<u8>, |
2240 | start_len: usize, |
2241 | } |
2242 | |
2243 | impl<R: Unpin + ?Sized> Unpin for ReadToEndFuture<'_, R> {} |
2244 | |
2245 | impl<R: AsyncRead + Unpin + ?Sized> Future for ReadToEndFuture<'_, R> { |
2246 | type Output = Result<usize>; |
2247 | |
2248 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
2249 | let Self { |
2250 | reader: &mut &mut R, |
2251 | buf: &mut &mut Vec, |
2252 | start_len: &mut usize, |
2253 | } = &mut *self; |
2254 | read_to_end_internal(rd:Pin::new(pointer:reader), cx, buf, *start_len) |
2255 | } |
2256 | } |
2257 | |
2258 | /// Future for the [`AsyncReadExt::read_to_string()`] method. |
2259 | #[derive (Debug)] |
2260 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
2261 | pub struct ReadToStringFuture<'a, R: Unpin + ?Sized> { |
2262 | reader: &'a mut R, |
2263 | buf: &'a mut String, |
2264 | bytes: Vec<u8>, |
2265 | start_len: usize, |
2266 | } |
2267 | |
2268 | impl<R: Unpin + ?Sized> Unpin for ReadToStringFuture<'_, R> {} |
2269 | |
2270 | impl<R: AsyncRead + Unpin + ?Sized> Future for ReadToStringFuture<'_, R> { |
2271 | type Output = Result<usize>; |
2272 | |
2273 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
2274 | let Self { |
2275 | reader, |
2276 | buf, |
2277 | bytes, |
2278 | start_len, |
2279 | } = &mut *self; |
2280 | let reader = Pin::new(reader); |
2281 | |
2282 | let ret = ready!(read_to_end_internal(reader, cx, bytes, *start_len)); |
2283 | |
2284 | match String::from_utf8(mem::take(bytes)) { |
2285 | Ok(s) => { |
2286 | debug_assert!(buf.is_empty()); |
2287 | **buf = s; |
2288 | Poll::Ready(ret) |
2289 | } |
2290 | Err(_) => Poll::Ready(ret.and_then(|_| { |
2291 | Err(Error::new( |
2292 | ErrorKind::InvalidData, |
2293 | "stream did not contain valid UTF-8" , |
2294 | )) |
2295 | })), |
2296 | } |
2297 | } |
2298 | } |
2299 | |
2300 | // This uses an adaptive system to extend the vector when it fills. We want to |
2301 | // avoid paying to allocate and zero a huge chunk of memory if the reader only |
2302 | // has 4 bytes while still making large reads if the reader does have a ton |
2303 | // of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every |
2304 | // time is 4,500 times (!) slower than this if the reader has a very small |
2305 | // amount of data to return. |
2306 | // |
2307 | // Because we're extending the buffer with uninitialized data for trusted |
2308 | // readers, we need to make sure to truncate that if any of this panics. |
2309 | fn read_to_end_internal<R: AsyncRead + ?Sized>( |
2310 | mut rd: Pin<&mut R>, |
2311 | cx: &mut Context<'_>, |
2312 | buf: &mut Vec<u8>, |
2313 | start_len: usize, |
2314 | ) -> Poll<Result<usize>> { |
2315 | struct Guard<'a> { |
2316 | buf: &'a mut Vec<u8>, |
2317 | len: usize, |
2318 | } |
2319 | |
2320 | impl Drop for Guard<'_> { |
2321 | fn drop(&mut self) { |
2322 | self.buf.resize(self.len, 0); |
2323 | } |
2324 | } |
2325 | |
2326 | let mut g = Guard { |
2327 | len: buf.len(), |
2328 | buf, |
2329 | }; |
2330 | let ret; |
2331 | loop { |
2332 | if g.len == g.buf.len() { |
2333 | g.buf.reserve(32); |
2334 | let capacity = g.buf.capacity(); |
2335 | g.buf.resize(capacity, 0); |
2336 | } |
2337 | |
2338 | match ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) { |
2339 | Ok(0) => { |
2340 | ret = Poll::Ready(Ok(g.len - start_len)); |
2341 | break; |
2342 | } |
2343 | Ok(n) => g.len += n, |
2344 | Err(e) => { |
2345 | ret = Poll::Ready(Err(e)); |
2346 | break; |
2347 | } |
2348 | } |
2349 | } |
2350 | |
2351 | ret |
2352 | } |
2353 | |
2354 | /// Future for the [`AsyncReadExt::read_exact()`] method. |
2355 | #[derive (Debug)] |
2356 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
2357 | pub struct ReadExactFuture<'a, R: Unpin + ?Sized> { |
2358 | reader: &'a mut R, |
2359 | buf: &'a mut [u8], |
2360 | } |
2361 | |
2362 | impl<R: Unpin + ?Sized> Unpin for ReadExactFuture<'_, R> {} |
2363 | |
2364 | impl<R: AsyncRead + Unpin + ?Sized> Future for ReadExactFuture<'_, R> { |
2365 | type Output = Result<()>; |
2366 | |
2367 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
2368 | let Self { reader: &mut &mut R, buf: &mut &mut [u8] } = &mut *self; |
2369 | |
2370 | while !buf.is_empty() { |
2371 | let n: usize = ready!(Pin::new(&mut *reader).poll_read(cx, buf))?; |
2372 | let (_, rest: &mut [u8]) = mem::take(buf).split_at_mut(mid:n); |
2373 | *buf = rest; |
2374 | |
2375 | if n == 0 { |
2376 | return Poll::Ready(Err(ErrorKind::UnexpectedEof.into())); |
2377 | } |
2378 | } |
2379 | |
2380 | Poll::Ready(Ok(())) |
2381 | } |
2382 | } |
2383 | |
2384 | pin_project! { |
2385 | /// Reader for the [`AsyncReadExt::take()`] method. |
2386 | #[derive(Debug)] |
2387 | pub struct Take<R> { |
2388 | #[pin] |
2389 | inner: R, |
2390 | limit: u64, |
2391 | } |
2392 | } |
2393 | |
2394 | impl<R> Take<R> { |
2395 | /// Returns the number of bytes before this adapter will return EOF. |
2396 | /// |
2397 | /// Note that EOF may be reached sooner if the underlying reader is shorter than the limit. |
2398 | /// |
2399 | /// # Examples |
2400 | /// |
2401 | /// ``` |
2402 | /// use futures_lite::io::{AsyncReadExt, Cursor}; |
2403 | /// |
2404 | /// let reader = Cursor::new("hello" ); |
2405 | /// |
2406 | /// let reader = reader.take(3); |
2407 | /// assert_eq!(reader.limit(), 3); |
2408 | /// ``` |
2409 | pub fn limit(&self) -> u64 { |
2410 | self.limit |
2411 | } |
2412 | |
2413 | /// Puts a limit on the number of bytes. |
2414 | /// |
2415 | /// Changing the limit is equivalent to creating a new adapter with [`AsyncReadExt::take()`]. |
2416 | /// |
2417 | /// # Examples |
2418 | /// |
2419 | /// ``` |
2420 | /// use futures_lite::io::{AsyncReadExt, Cursor}; |
2421 | /// |
2422 | /// let reader = Cursor::new("hello" ); |
2423 | /// |
2424 | /// let mut reader = reader.take(10); |
2425 | /// assert_eq!(reader.limit(), 10); |
2426 | /// |
2427 | /// reader.set_limit(3); |
2428 | /// assert_eq!(reader.limit(), 3); |
2429 | /// ``` |
2430 | pub fn set_limit(&mut self, limit: u64) { |
2431 | self.limit = limit; |
2432 | } |
2433 | |
2434 | /// Gets a reference to the underlying reader. |
2435 | /// |
2436 | /// # Examples |
2437 | /// |
2438 | /// ``` |
2439 | /// use futures_lite::io::{AsyncReadExt, Cursor}; |
2440 | /// |
2441 | /// let reader = Cursor::new("hello" ); |
2442 | /// |
2443 | /// let reader = reader.take(3); |
2444 | /// let r = reader.get_ref(); |
2445 | /// ``` |
2446 | pub fn get_ref(&self) -> &R { |
2447 | &self.inner |
2448 | } |
2449 | |
2450 | /// Gets a mutable reference to the underlying reader. |
2451 | /// |
2452 | /// # Examples |
2453 | /// |
2454 | /// ``` |
2455 | /// use futures_lite::io::{AsyncReadExt, Cursor}; |
2456 | /// |
2457 | /// let reader = Cursor::new("hello" ); |
2458 | /// |
2459 | /// let mut reader = reader.take(3); |
2460 | /// let r = reader.get_mut(); |
2461 | /// ``` |
2462 | pub fn get_mut(&mut self) -> &mut R { |
2463 | &mut self.inner |
2464 | } |
2465 | |
2466 | /// Unwraps the adapter, returning the underlying reader. |
2467 | /// |
2468 | /// # Examples |
2469 | /// |
2470 | /// ``` |
2471 | /// use futures_lite::io::{AsyncReadExt, Cursor}; |
2472 | /// |
2473 | /// let reader = Cursor::new("hello" ); |
2474 | /// |
2475 | /// let reader = reader.take(3); |
2476 | /// let reader = reader.into_inner(); |
2477 | /// ``` |
2478 | pub fn into_inner(self) -> R { |
2479 | self.inner |
2480 | } |
2481 | } |
2482 | |
2483 | impl<R: AsyncRead> AsyncRead for Take<R> { |
2484 | fn poll_read( |
2485 | self: Pin<&mut Self>, |
2486 | cx: &mut Context<'_>, |
2487 | buf: &mut [u8], |
2488 | ) -> Poll<Result<usize>> { |
2489 | let this: Projection<'_, R> = self.project(); |
2490 | take_read_internal(rd:this.inner, cx, buf, this.limit) |
2491 | } |
2492 | } |
2493 | |
2494 | fn take_read_internal<R: AsyncRead + ?Sized>( |
2495 | mut rd: Pin<&mut R>, |
2496 | cx: &mut Context<'_>, |
2497 | buf: &mut [u8], |
2498 | limit: &mut u64, |
2499 | ) -> Poll<Result<usize>> { |
2500 | // Don't call into inner reader at all at EOF because it may still block |
2501 | if *limit == 0 { |
2502 | return Poll::Ready(Ok(0)); |
2503 | } |
2504 | |
2505 | let max: usize = cmp::min(v1:buf.len() as u64, *limit) as usize; |
2506 | |
2507 | match ready!(rd.as_mut().poll_read(cx, &mut buf[..max])) { |
2508 | Ok(n: usize) => { |
2509 | *limit -= n as u64; |
2510 | Poll::Ready(Ok(n)) |
2511 | } |
2512 | Err(e: Error) => Poll::Ready(Err(e)), |
2513 | } |
2514 | } |
2515 | |
2516 | impl<R: AsyncBufRead> AsyncBufRead for Take<R> { |
2517 | fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> { |
2518 | let this = self.project(); |
2519 | |
2520 | if *this.limit == 0 { |
2521 | return Poll::Ready(Ok(&[])); |
2522 | } |
2523 | |
2524 | match ready!(this.inner.poll_fill_buf(cx)) { |
2525 | Ok(buf) => { |
2526 | let cap = cmp::min(buf.len() as u64, *this.limit) as usize; |
2527 | Poll::Ready(Ok(&buf[..cap])) |
2528 | } |
2529 | Err(e) => Poll::Ready(Err(e)), |
2530 | } |
2531 | } |
2532 | |
2533 | fn consume(self: Pin<&mut Self>, amt: usize) { |
2534 | let this = self.project(); |
2535 | // Don't let callers reset the limit by passing an overlarge value |
2536 | let amt = cmp::min(amt as u64, *this.limit) as usize; |
2537 | *this.limit -= amt as u64; |
2538 | |
2539 | this.inner.consume(amt); |
2540 | } |
2541 | } |
2542 | |
2543 | pin_project! { |
2544 | /// Reader for the [`AsyncReadExt::bytes()`] method. |
2545 | #[derive(Debug)] |
2546 | pub struct Bytes<R> { |
2547 | #[pin] |
2548 | inner: R, |
2549 | } |
2550 | } |
2551 | |
2552 | impl<R: AsyncRead + Unpin> Stream for Bytes<R> { |
2553 | type Item = Result<u8>; |
2554 | |
2555 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
2556 | let mut byte: u8 = 0; |
2557 | |
2558 | let rd: Pin<&mut R> = Pin::new(&mut self.inner); |
2559 | |
2560 | match ready!(rd.poll_read(cx, std::slice::from_mut(&mut byte))) { |
2561 | Ok(0) => Poll::Ready(None), |
2562 | Ok(..) => Poll::Ready(Some(Ok(byte))), |
2563 | Err(ref e: &Error) if e.kind() == ErrorKind::Interrupted => Poll::Pending, |
2564 | Err(e: Error) => Poll::Ready(Some(Err(e))), |
2565 | } |
2566 | } |
2567 | } |
2568 | |
2569 | impl<R: AsyncRead> AsyncRead for Bytes<R> { |
2570 | fn poll_read( |
2571 | self: Pin<&mut Self>, |
2572 | cx: &mut Context<'_>, |
2573 | buf: &mut [u8], |
2574 | ) -> Poll<Result<usize>> { |
2575 | self.project().inner.poll_read(cx, buf) |
2576 | } |
2577 | |
2578 | fn poll_read_vectored( |
2579 | self: Pin<&mut Self>, |
2580 | cx: &mut Context<'_>, |
2581 | bufs: &mut [IoSliceMut<'_>], |
2582 | ) -> Poll<Result<usize>> { |
2583 | self.project().inner.poll_read_vectored(cx, bufs) |
2584 | } |
2585 | } |
2586 | |
2587 | pin_project! { |
2588 | /// Reader for the [`AsyncReadExt::chain()`] method. |
2589 | pub struct Chain<R1, R2> { |
2590 | #[pin] |
2591 | first: R1, |
2592 | #[pin] |
2593 | second: R2, |
2594 | done_first: bool, |
2595 | } |
2596 | } |
2597 | |
2598 | impl<R1, R2> Chain<R1, R2> { |
2599 | /// Gets references to the underlying readers. |
2600 | /// |
2601 | /// # Examples |
2602 | /// |
2603 | /// ``` |
2604 | /// use futures_lite::io::{AsyncReadExt, Cursor}; |
2605 | /// |
2606 | /// let r1 = Cursor::new(b"hello" ); |
2607 | /// let r2 = Cursor::new(b"world" ); |
2608 | /// |
2609 | /// let reader = r1.chain(r2); |
2610 | /// let (r1, r2) = reader.get_ref(); |
2611 | /// ``` |
2612 | pub fn get_ref(&self) -> (&R1, &R2) { |
2613 | (&self.first, &self.second) |
2614 | } |
2615 | |
2616 | /// Gets mutable references to the underlying readers. |
2617 | /// |
2618 | /// # Examples |
2619 | /// |
2620 | /// ``` |
2621 | /// use futures_lite::io::{AsyncReadExt, Cursor}; |
2622 | /// |
2623 | /// let r1 = Cursor::new(b"hello" ); |
2624 | /// let r2 = Cursor::new(b"world" ); |
2625 | /// |
2626 | /// let mut reader = r1.chain(r2); |
2627 | /// let (r1, r2) = reader.get_mut(); |
2628 | /// ``` |
2629 | pub fn get_mut(&mut self) -> (&mut R1, &mut R2) { |
2630 | (&mut self.first, &mut self.second) |
2631 | } |
2632 | |
2633 | /// Unwraps the adapter, returning the underlying readers. |
2634 | /// |
2635 | /// # Examples |
2636 | /// |
2637 | /// ``` |
2638 | /// use futures_lite::io::{AsyncReadExt, Cursor}; |
2639 | /// |
2640 | /// let r1 = Cursor::new(b"hello" ); |
2641 | /// let r2 = Cursor::new(b"world" ); |
2642 | /// |
2643 | /// let reader = r1.chain(r2); |
2644 | /// let (r1, r2) = reader.into_inner(); |
2645 | /// ``` |
2646 | pub fn into_inner(self) -> (R1, R2) { |
2647 | (self.first, self.second) |
2648 | } |
2649 | } |
2650 | |
2651 | impl<R1: fmt::Debug, R2: fmt::Debug> fmt::Debug for Chain<R1, R2> { |
2652 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
2653 | f&mut DebugStruct<'_, '_>.debug_struct("Chain" ) |
2654 | .field("r1" , &self.first) |
2655 | .field(name:"r2" , &self.second) |
2656 | .finish() |
2657 | } |
2658 | } |
2659 | |
2660 | impl<R1: AsyncRead, R2: AsyncRead> AsyncRead for Chain<R1, R2> { |
2661 | fn poll_read( |
2662 | self: Pin<&mut Self>, |
2663 | cx: &mut Context<'_>, |
2664 | buf: &mut [u8], |
2665 | ) -> Poll<Result<usize>> { |
2666 | let this = self.project(); |
2667 | if !*this.done_first { |
2668 | match ready!(this.first.poll_read(cx, buf)) { |
2669 | Ok(0) if !buf.is_empty() => *this.done_first = true, |
2670 | Ok(n) => return Poll::Ready(Ok(n)), |
2671 | Err(err) => return Poll::Ready(Err(err)), |
2672 | } |
2673 | } |
2674 | |
2675 | this.second.poll_read(cx, buf) |
2676 | } |
2677 | |
2678 | fn poll_read_vectored( |
2679 | self: Pin<&mut Self>, |
2680 | cx: &mut Context<'_>, |
2681 | bufs: &mut [IoSliceMut<'_>], |
2682 | ) -> Poll<Result<usize>> { |
2683 | let this = self.project(); |
2684 | if !*this.done_first { |
2685 | match ready!(this.first.poll_read_vectored(cx, bufs)) { |
2686 | Ok(0) if !bufs.is_empty() => *this.done_first = true, |
2687 | Ok(n) => return Poll::Ready(Ok(n)), |
2688 | Err(err) => return Poll::Ready(Err(err)), |
2689 | } |
2690 | } |
2691 | |
2692 | this.second.poll_read_vectored(cx, bufs) |
2693 | } |
2694 | } |
2695 | |
2696 | impl<R1: AsyncBufRead, R2: AsyncBufRead> AsyncBufRead for Chain<R1, R2> { |
2697 | fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> { |
2698 | let this: Projection<'_, R1, R2> = self.project(); |
2699 | if !*this.done_first { |
2700 | match ready!(this.first.poll_fill_buf(cx)) { |
2701 | Ok([]) => *this.done_first = true, |
2702 | Ok(buf: &[u8]) => return Poll::Ready(Ok(buf)), |
2703 | Err(err: Error) => return Poll::Ready(Err(err)), |
2704 | } |
2705 | } |
2706 | |
2707 | this.second.poll_fill_buf(cx) |
2708 | } |
2709 | |
2710 | fn consume(self: Pin<&mut Self>, amt: usize) { |
2711 | let this: Projection<'_, R1, R2> = self.project(); |
2712 | if !*this.done_first { |
2713 | this.first.consume(amt) |
2714 | } else { |
2715 | this.second.consume(amt) |
2716 | } |
2717 | } |
2718 | } |
2719 | |
2720 | /// Extension trait for [`AsyncSeek`]. |
2721 | pub trait AsyncSeekExt: AsyncSeek { |
2722 | /// Seeks to a new position in a byte stream. |
2723 | /// |
2724 | /// Returns the new position in the byte stream. |
2725 | /// |
2726 | /// A seek beyond the end of stream is allowed, but behavior is defined by the implementation. |
2727 | /// |
2728 | /// # Examples |
2729 | /// |
2730 | /// ``` |
2731 | /// use futures_lite::io::{AsyncSeekExt, Cursor, SeekFrom}; |
2732 | /// |
2733 | /// # spin_on::spin_on(async { |
2734 | /// let mut cursor = Cursor::new("hello" ); |
2735 | /// |
2736 | /// // Move the cursor to the end. |
2737 | /// cursor.seek(SeekFrom::End(0)).await?; |
2738 | /// |
2739 | /// // Check the current position. |
2740 | /// assert_eq!(cursor.seek(SeekFrom::Current(0)).await?, 5); |
2741 | /// # std::io::Result::Ok(()) }); |
2742 | /// ``` |
2743 | fn seek(&mut self, pos: SeekFrom) -> SeekFuture<'_, Self> |
2744 | where |
2745 | Self: Unpin, |
2746 | { |
2747 | SeekFuture { seeker: self, pos } |
2748 | } |
2749 | } |
2750 | |
2751 | impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {} |
2752 | |
2753 | /// Future for the [`AsyncSeekExt::seek()`] method. |
2754 | #[derive (Debug)] |
2755 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
2756 | pub struct SeekFuture<'a, S: Unpin + ?Sized> { |
2757 | seeker: &'a mut S, |
2758 | pos: SeekFrom, |
2759 | } |
2760 | |
2761 | impl<S: Unpin + ?Sized> Unpin for SeekFuture<'_, S> {} |
2762 | |
2763 | impl<S: AsyncSeek + Unpin + ?Sized> Future for SeekFuture<'_, S> { |
2764 | type Output = Result<u64>; |
2765 | |
2766 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
2767 | let pos: SeekFrom = self.pos; |
2768 | Pin::new(&mut *self.seeker).poll_seek(cx, pos) |
2769 | } |
2770 | } |
2771 | |
2772 | /// Extension trait for [`AsyncWrite`]. |
2773 | pub trait AsyncWriteExt: AsyncWrite { |
2774 | /// Writes some bytes into the byte stream. |
2775 | /// |
2776 | /// Returns the number of bytes written from the start of the buffer. |
2777 | /// |
2778 | /// If the return value is `Ok(n)` then it must be guaranteed that |
2779 | /// `0 <= n <= buf.len()`. A return value of `0` typically means that the underlying |
2780 | /// object is no longer able to accept bytes and will likely not be able to in the |
2781 | /// future as well, or that the provided buffer is empty. |
2782 | /// |
2783 | /// # Examples |
2784 | /// |
2785 | /// ``` |
2786 | /// use futures_lite::io::{AsyncWriteExt, BufWriter}; |
2787 | /// |
2788 | /// # spin_on::spin_on(async { |
2789 | /// let mut output = Vec::new(); |
2790 | /// let mut writer = BufWriter::new(&mut output); |
2791 | /// |
2792 | /// let n = writer.write(b"hello" ).await?; |
2793 | /// # std::io::Result::Ok(()) }); |
2794 | /// ``` |
2795 | fn write<'a>(&'a mut self, buf: &'a [u8]) -> WriteFuture<'a, Self> |
2796 | where |
2797 | Self: Unpin, |
2798 | { |
2799 | WriteFuture { writer: self, buf } |
2800 | } |
2801 | |
2802 | /// Like [`write()`][`AsyncWriteExt::write()`], except that it writes a slice of buffers. |
2803 | /// |
2804 | /// Data is copied from each buffer in order, with the final buffer possibly being only |
2805 | /// partially consumed. This method must behave same as a call to |
2806 | /// [`write()`][`AsyncWriteExt::write()`] with the buffers concatenated would. |
2807 | fn write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectoredFuture<'a, Self> |
2808 | where |
2809 | Self: Unpin, |
2810 | { |
2811 | WriteVectoredFuture { writer: self, bufs } |
2812 | } |
2813 | |
2814 | /// Writes an entire buffer into the byte stream. |
2815 | /// |
2816 | /// This method will keep calling [`write()`][`AsyncWriteExt::write()`] until there is no more |
2817 | /// data to be written or an error occurs. It will not return before the entire buffer is |
2818 | /// successfully written or an error occurs. |
2819 | /// |
2820 | /// # Examples |
2821 | /// |
2822 | /// ``` |
2823 | /// use futures_lite::io::{AsyncWriteExt, BufWriter}; |
2824 | /// |
2825 | /// # spin_on::spin_on(async { |
2826 | /// let mut output = Vec::new(); |
2827 | /// let mut writer = BufWriter::new(&mut output); |
2828 | /// |
2829 | /// let n = writer.write_all(b"hello" ).await?; |
2830 | /// # std::io::Result::Ok(()) }); |
2831 | /// ``` |
2832 | fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAllFuture<'a, Self> |
2833 | where |
2834 | Self: Unpin, |
2835 | { |
2836 | WriteAllFuture { writer: self, buf } |
2837 | } |
2838 | |
2839 | /// Flushes the stream to ensure that all buffered contents reach their destination. |
2840 | /// |
2841 | /// # Examples |
2842 | /// |
2843 | /// ``` |
2844 | /// use futures_lite::io::{AsyncWriteExt, BufWriter}; |
2845 | /// |
2846 | /// # spin_on::spin_on(async { |
2847 | /// let mut output = Vec::new(); |
2848 | /// let mut writer = BufWriter::new(&mut output); |
2849 | /// |
2850 | /// writer.write_all(b"hello" ).await?; |
2851 | /// writer.flush().await?; |
2852 | /// # std::io::Result::Ok(()) }); |
2853 | /// ``` |
2854 | fn flush(&mut self) -> FlushFuture<'_, Self> |
2855 | where |
2856 | Self: Unpin, |
2857 | { |
2858 | FlushFuture { writer: self } |
2859 | } |
2860 | |
2861 | /// Closes the writer. |
2862 | /// |
2863 | /// # Examples |
2864 | /// |
2865 | /// ``` |
2866 | /// use futures_lite::io::{AsyncWriteExt, BufWriter}; |
2867 | /// |
2868 | /// # spin_on::spin_on(async { |
2869 | /// let mut output = Vec::new(); |
2870 | /// let mut writer = BufWriter::new(&mut output); |
2871 | /// |
2872 | /// writer.close().await?; |
2873 | /// # std::io::Result::Ok(()) }); |
2874 | /// ``` |
2875 | fn close(&mut self) -> CloseFuture<'_, Self> |
2876 | where |
2877 | Self: Unpin, |
2878 | { |
2879 | CloseFuture { writer: self } |
2880 | } |
2881 | |
2882 | /// Boxes the writer and changes its type to `dyn AsyncWrite + Send + 'a`. |
2883 | /// |
2884 | /// # Examples |
2885 | /// |
2886 | /// ``` |
2887 | /// use futures_lite::io::AsyncWriteExt; |
2888 | /// |
2889 | /// let writer = Vec::<u8>::new().boxed_writer(); |
2890 | /// ``` |
2891 | #[cfg (feature = "alloc" )] |
2892 | fn boxed_writer<'a>(self) -> Pin<Box<dyn AsyncWrite + Send + 'a>> |
2893 | where |
2894 | Self: Sized + Send + 'a, |
2895 | { |
2896 | Box::pin(self) |
2897 | } |
2898 | } |
2899 | |
2900 | impl<W: AsyncWrite + ?Sized> AsyncWriteExt for W {} |
2901 | |
2902 | /// Future for the [`AsyncWriteExt::write()`] method. |
2903 | #[derive (Debug)] |
2904 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
2905 | pub struct WriteFuture<'a, W: Unpin + ?Sized> { |
2906 | writer: &'a mut W, |
2907 | buf: &'a [u8], |
2908 | } |
2909 | |
2910 | impl<W: Unpin + ?Sized> Unpin for WriteFuture<'_, W> {} |
2911 | |
2912 | impl<W: AsyncWrite + Unpin + ?Sized> Future for WriteFuture<'_, W> { |
2913 | type Output = Result<usize>; |
2914 | |
2915 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
2916 | let buf: &[u8] = self.buf; |
2917 | Pin::new(&mut *self.writer).poll_write(cx, buf) |
2918 | } |
2919 | } |
2920 | |
2921 | /// Future for the [`AsyncWriteExt::write_vectored()`] method. |
2922 | #[derive (Debug)] |
2923 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
2924 | pub struct WriteVectoredFuture<'a, W: Unpin + ?Sized> { |
2925 | writer: &'a mut W, |
2926 | bufs: &'a [IoSlice<'a>], |
2927 | } |
2928 | |
2929 | impl<W: Unpin + ?Sized> Unpin for WriteVectoredFuture<'_, W> {} |
2930 | |
2931 | impl<W: AsyncWrite + Unpin + ?Sized> Future for WriteVectoredFuture<'_, W> { |
2932 | type Output = Result<usize>; |
2933 | |
2934 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
2935 | let bufs: &[IoSlice<'_>] = self.bufs; |
2936 | Pin::new(&mut *self.writer).poll_write_vectored(cx, bufs) |
2937 | } |
2938 | } |
2939 | |
2940 | /// Future for the [`AsyncWriteExt::write_all()`] method. |
2941 | #[derive (Debug)] |
2942 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
2943 | pub struct WriteAllFuture<'a, W: Unpin + ?Sized> { |
2944 | writer: &'a mut W, |
2945 | buf: &'a [u8], |
2946 | } |
2947 | |
2948 | impl<W: Unpin + ?Sized> Unpin for WriteAllFuture<'_, W> {} |
2949 | |
2950 | impl<W: AsyncWrite + Unpin + ?Sized> Future for WriteAllFuture<'_, W> { |
2951 | type Output = Result<()>; |
2952 | |
2953 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
2954 | let Self { writer: &mut &mut W, buf: &mut &[u8] } = &mut *self; |
2955 | |
2956 | while !buf.is_empty() { |
2957 | let n: usize = ready!(Pin::new(&mut **writer).poll_write(cx, buf))?; |
2958 | let (_, rest: &[u8]) = mem::take(buf).split_at(mid:n); |
2959 | *buf = rest; |
2960 | |
2961 | if n == 0 { |
2962 | return Poll::Ready(Err(ErrorKind::WriteZero.into())); |
2963 | } |
2964 | } |
2965 | |
2966 | Poll::Ready(Ok(())) |
2967 | } |
2968 | } |
2969 | |
2970 | /// Future for the [`AsyncWriteExt::flush()`] method. |
2971 | #[derive (Debug)] |
2972 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
2973 | pub struct FlushFuture<'a, W: Unpin + ?Sized> { |
2974 | writer: &'a mut W, |
2975 | } |
2976 | |
2977 | impl<W: Unpin + ?Sized> Unpin for FlushFuture<'_, W> {} |
2978 | |
2979 | impl<W: AsyncWrite + Unpin + ?Sized> Future for FlushFuture<'_, W> { |
2980 | type Output = Result<()>; |
2981 | |
2982 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
2983 | Pin::new(&mut *self.writer).poll_flush(cx) |
2984 | } |
2985 | } |
2986 | |
2987 | /// Future for the [`AsyncWriteExt::close()`] method. |
2988 | #[derive (Debug)] |
2989 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
2990 | pub struct CloseFuture<'a, W: Unpin + ?Sized> { |
2991 | writer: &'a mut W, |
2992 | } |
2993 | |
2994 | impl<W: Unpin + ?Sized> Unpin for CloseFuture<'_, W> {} |
2995 | |
2996 | impl<W: AsyncWrite + Unpin + ?Sized> Future for CloseFuture<'_, W> { |
2997 | type Output = Result<()>; |
2998 | |
2999 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
3000 | Pin::new(&mut *self.writer).poll_close(cx) |
3001 | } |
3002 | } |
3003 | |
3004 | /// Type alias for `Pin<Box<dyn AsyncRead + Send + 'static>>`. |
3005 | /// |
3006 | /// # Examples |
3007 | /// |
3008 | /// ``` |
3009 | /// use futures_lite::io::AsyncReadExt; |
3010 | /// |
3011 | /// let reader = [1, 2, 3].boxed_reader(); |
3012 | /// ``` |
3013 | #[cfg (feature = "alloc" )] |
3014 | pub type BoxedReader = Pin<Box<dyn AsyncRead + Send + 'static>>; |
3015 | |
3016 | /// Type alias for `Pin<Box<dyn AsyncWrite + Send + 'static>>`. |
3017 | /// |
3018 | /// # Examples |
3019 | /// |
3020 | /// ``` |
3021 | /// use futures_lite::io::AsyncWriteExt; |
3022 | /// |
3023 | /// let writer = Vec::<u8>::new().boxed_writer(); |
3024 | /// ``` |
3025 | #[cfg (feature = "alloc" )] |
3026 | pub type BoxedWriter = Pin<Box<dyn AsyncWrite + Send + 'static>>; |
3027 | |
3028 | /// Splits a stream into [`AsyncRead`] and [`AsyncWrite`] halves. |
3029 | /// |
3030 | /// # Examples |
3031 | /// |
3032 | /// ``` |
3033 | /// use futures_lite::io::{self, Cursor}; |
3034 | /// |
3035 | /// # spin_on::spin_on(async { |
3036 | /// let stream = Cursor::new(vec![]); |
3037 | /// let (mut reader, mut writer) = io::split(stream); |
3038 | /// # std::io::Result::Ok(()) }); |
3039 | /// ``` |
3040 | pub fn split<T>(stream: T) -> (ReadHalf<T>, WriteHalf<T>) |
3041 | where |
3042 | T: AsyncRead + AsyncWrite + Unpin, |
3043 | { |
3044 | let inner: Arc> = Arc::new(data:Mutex::new(stream)); |
3045 | (ReadHalf(inner.clone()), WriteHalf(inner)) |
3046 | } |
3047 | |
3048 | /// The read half returned by [`split()`]. |
3049 | #[derive (Debug)] |
3050 | pub struct ReadHalf<T>(Arc<Mutex<T>>); |
3051 | |
3052 | /// The write half returned by [`split()`]. |
3053 | #[derive (Debug)] |
3054 | pub struct WriteHalf<T>(Arc<Mutex<T>>); |
3055 | |
3056 | impl<T: AsyncRead + Unpin> AsyncRead for ReadHalf<T> { |
3057 | fn poll_read( |
3058 | self: Pin<&mut Self>, |
3059 | cx: &mut Context<'_>, |
3060 | buf: &mut [u8], |
3061 | ) -> Poll<Result<usize>> { |
3062 | let mut inner: MutexGuard<'_, T> = self.0.lock().unwrap(); |
3063 | Pin::new(&mut *inner).poll_read(cx, buf) |
3064 | } |
3065 | |
3066 | fn poll_read_vectored( |
3067 | self: Pin<&mut Self>, |
3068 | cx: &mut Context<'_>, |
3069 | bufs: &mut [IoSliceMut<'_>], |
3070 | ) -> Poll<Result<usize>> { |
3071 | let mut inner: MutexGuard<'_, T> = self.0.lock().unwrap(); |
3072 | Pin::new(&mut *inner).poll_read_vectored(cx, bufs) |
3073 | } |
3074 | } |
3075 | |
3076 | impl<T: AsyncWrite + Unpin> AsyncWrite for WriteHalf<T> { |
3077 | fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> { |
3078 | let mut inner: MutexGuard<'_, T> = self.0.lock().unwrap(); |
3079 | Pin::new(&mut *inner).poll_write(cx, buf) |
3080 | } |
3081 | |
3082 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { |
3083 | let mut inner: MutexGuard<'_, T> = self.0.lock().unwrap(); |
3084 | Pin::new(&mut *inner).poll_flush(cx) |
3085 | } |
3086 | |
3087 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { |
3088 | let mut inner: MutexGuard<'_, T> = self.0.lock().unwrap(); |
3089 | Pin::new(&mut *inner).poll_close(cx) |
3090 | } |
3091 | } |
3092 | |
3093 | #[cfg (feature = "memchr" )] |
3094 | use memchr::memchr; |
3095 | |
3096 | /// Unoptimized memchr fallback. |
3097 | #[cfg (not(feature = "memchr" ))] |
3098 | fn memchr(needle: u8, haystack: &[u8]) -> Option<usize> { |
3099 | haystack.iter().position(|&b: u8| b == needle) |
3100 | } |
3101 | |