1 | //! Types for working with [`File`]. |
2 | //! |
3 | //! [`File`]: File |
4 | |
5 | use crate::fs::{asyncify, OpenOptions}; |
6 | use crate::io::blocking::Buf; |
7 | use crate::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}; |
8 | use crate::sync::Mutex; |
9 | |
10 | use std::fmt; |
11 | use std::fs::{Metadata, Permissions}; |
12 | use std::future::Future; |
13 | use std::io::{self, Seek, SeekFrom}; |
14 | use std::path::Path; |
15 | use std::pin::Pin; |
16 | use std::sync::Arc; |
17 | use std::task::Context; |
18 | use std::task::Poll; |
19 | |
20 | #[cfg (test)] |
21 | use super::mocks::JoinHandle; |
22 | #[cfg (test)] |
23 | use super::mocks::MockFile as StdFile; |
24 | #[cfg (test)] |
25 | use super::mocks::{spawn_blocking, spawn_mandatory_blocking}; |
26 | #[cfg (not(test))] |
27 | use crate::blocking::JoinHandle; |
28 | #[cfg (not(test))] |
29 | use crate::blocking::{spawn_blocking, spawn_mandatory_blocking}; |
30 | #[cfg (not(test))] |
31 | use std::fs::File as StdFile; |
32 | |
33 | /// A reference to an open file on the filesystem. |
34 | /// |
35 | /// This is a specialized version of [`std::fs::File`] for usage from the |
36 | /// Tokio runtime. |
37 | /// |
38 | /// An instance of a `File` can be read and/or written depending on what options |
39 | /// it was opened with. Files also implement [`AsyncSeek`] to alter the logical |
40 | /// cursor that the file contains internally. |
41 | /// |
42 | /// A file will not be closed immediately when it goes out of scope if there |
43 | /// are any IO operations that have not yet completed. To ensure that a file is |
44 | /// closed immediately when it is dropped, you should call [`flush`] before |
45 | /// dropping it. Note that this does not ensure that the file has been fully |
46 | /// written to disk; the operating system might keep the changes around in an |
47 | /// in-memory buffer. See the [`sync_all`] method for telling the OS to write |
48 | /// the data to disk. |
49 | /// |
50 | /// Reading and writing to a `File` is usually done using the convenience |
51 | /// methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] traits. |
52 | /// |
53 | /// [`AsyncSeek`]: trait@crate::io::AsyncSeek |
54 | /// [`flush`]: fn@crate::io::AsyncWriteExt::flush |
55 | /// [`sync_all`]: fn@crate::fs::File::sync_all |
56 | /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt |
57 | /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt |
58 | /// |
59 | /// # Examples |
60 | /// |
61 | /// Create a new file and asynchronously write bytes to it: |
62 | /// |
63 | /// ```no_run |
64 | /// use tokio::fs::File; |
65 | /// use tokio::io::AsyncWriteExt; // for write_all() |
66 | /// |
67 | /// # async fn dox() -> std::io::Result<()> { |
68 | /// let mut file = File::create("foo.txt" ).await?; |
69 | /// file.write_all(b"hello, world!" ).await?; |
70 | /// # Ok(()) |
71 | /// # } |
72 | /// ``` |
73 | /// |
74 | /// Read the contents of a file into a buffer: |
75 | /// |
76 | /// ```no_run |
77 | /// use tokio::fs::File; |
78 | /// use tokio::io::AsyncReadExt; // for read_to_end() |
79 | /// |
80 | /// # async fn dox() -> std::io::Result<()> { |
81 | /// let mut file = File::open("foo.txt" ).await?; |
82 | /// |
83 | /// let mut contents = vec![]; |
84 | /// file.read_to_end(&mut contents).await?; |
85 | /// |
86 | /// println!("len = {}" , contents.len()); |
87 | /// # Ok(()) |
88 | /// # } |
89 | /// ``` |
90 | pub struct File { |
91 | std: Arc<StdFile>, |
92 | inner: Mutex<Inner>, |
93 | } |
94 | |
95 | struct Inner { |
96 | state: State, |
97 | |
98 | /// Errors from writes/flushes are returned in write/flush calls. If a write |
99 | /// error is observed while performing a read, it is saved until the next |
100 | /// write / flush call. |
101 | last_write_err: Option<io::ErrorKind>, |
102 | |
103 | pos: u64, |
104 | } |
105 | |
106 | #[derive (Debug)] |
107 | enum State { |
108 | Idle(Option<Buf>), |
109 | Busy(JoinHandle<(Operation, Buf)>), |
110 | } |
111 | |
112 | #[derive (Debug)] |
113 | enum Operation { |
114 | Read(io::Result<usize>), |
115 | Write(io::Result<()>), |
116 | Seek(io::Result<u64>), |
117 | } |
118 | |
119 | impl File { |
120 | /// Attempts to open a file in read-only mode. |
121 | /// |
122 | /// See [`OpenOptions`] for more details. |
123 | /// |
124 | /// # Errors |
125 | /// |
126 | /// This function will return an error if called from outside of the Tokio |
127 | /// runtime or if path does not already exist. Other errors may also be |
128 | /// returned according to `OpenOptions::open`. |
129 | /// |
130 | /// # Examples |
131 | /// |
132 | /// ```no_run |
133 | /// use tokio::fs::File; |
134 | /// use tokio::io::AsyncReadExt; |
135 | /// |
136 | /// # async fn dox() -> std::io::Result<()> { |
137 | /// let mut file = File::open("foo.txt" ).await?; |
138 | /// |
139 | /// let mut contents = vec![]; |
140 | /// file.read_to_end(&mut contents).await?; |
141 | /// |
142 | /// println!("len = {}" , contents.len()); |
143 | /// # Ok(()) |
144 | /// # } |
145 | /// ``` |
146 | /// |
147 | /// The [`read_to_end`] method is defined on the [`AsyncReadExt`] trait. |
148 | /// |
149 | /// [`read_to_end`]: fn@crate::io::AsyncReadExt::read_to_end |
150 | /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt |
151 | pub async fn open(path: impl AsRef<Path>) -> io::Result<File> { |
152 | let path = path.as_ref().to_owned(); |
153 | let std = asyncify(|| StdFile::open(path)).await?; |
154 | |
155 | Ok(File::from_std(std)) |
156 | } |
157 | |
158 | /// Opens a file in write-only mode. |
159 | /// |
160 | /// This function will create a file if it does not exist, and will truncate |
161 | /// it if it does. |
162 | /// |
163 | /// See [`OpenOptions`] for more details. |
164 | /// |
165 | /// # Errors |
166 | /// |
167 | /// Results in an error if called from outside of the Tokio runtime or if |
168 | /// the underlying [`create`] call results in an error. |
169 | /// |
170 | /// [`create`]: std::fs::File::create |
171 | /// |
172 | /// # Examples |
173 | /// |
174 | /// ```no_run |
175 | /// use tokio::fs::File; |
176 | /// use tokio::io::AsyncWriteExt; |
177 | /// |
178 | /// # async fn dox() -> std::io::Result<()> { |
179 | /// let mut file = File::create("foo.txt" ).await?; |
180 | /// file.write_all(b"hello, world!" ).await?; |
181 | /// # Ok(()) |
182 | /// # } |
183 | /// ``` |
184 | /// |
185 | /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. |
186 | /// |
187 | /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all |
188 | /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt |
189 | pub async fn create(path: impl AsRef<Path>) -> io::Result<File> { |
190 | let path = path.as_ref().to_owned(); |
191 | let std_file = asyncify(move || StdFile::create(path)).await?; |
192 | Ok(File::from_std(std_file)) |
193 | } |
194 | |
195 | /// Returns a new [`OpenOptions`] object. |
196 | /// |
197 | /// This function returns a new `OpenOptions` object that you can use to |
198 | /// open or create a file with specific options if `open()` or `create()` |
199 | /// are not appropriate. |
200 | /// |
201 | /// It is equivalent to `OpenOptions::new()`, but allows you to write more |
202 | /// readable code. Instead of |
203 | /// `OpenOptions::new().append(true).open("example.log")`, |
204 | /// you can write `File::options().append(true).open("example.log")`. This |
205 | /// also avoids the need to import `OpenOptions`. |
206 | /// |
207 | /// See the [`OpenOptions::new`] function for more details. |
208 | /// |
209 | /// # Examples |
210 | /// |
211 | /// ```no_run |
212 | /// use tokio::fs::File; |
213 | /// use tokio::io::AsyncWriteExt; |
214 | /// |
215 | /// # async fn dox() -> std::io::Result<()> { |
216 | /// let mut f = File::options().append(true).open("example.log" ).await?; |
217 | /// f.write_all(b"new line \n" ).await?; |
218 | /// # Ok(()) |
219 | /// # } |
220 | /// ``` |
221 | #[must_use ] |
222 | pub fn options() -> OpenOptions { |
223 | OpenOptions::new() |
224 | } |
225 | |
226 | /// Converts a [`std::fs::File`] to a [`tokio::fs::File`](File). |
227 | /// |
228 | /// # Examples |
229 | /// |
230 | /// ```no_run |
231 | /// // This line could block. It is not recommended to do this on the Tokio |
232 | /// // runtime. |
233 | /// let std_file = std::fs::File::open("foo.txt" ).unwrap(); |
234 | /// let file = tokio::fs::File::from_std(std_file); |
235 | /// ``` |
236 | pub fn from_std(std: StdFile) -> File { |
237 | File { |
238 | std: Arc::new(std), |
239 | inner: Mutex::new(Inner { |
240 | state: State::Idle(Some(Buf::with_capacity(0))), |
241 | last_write_err: None, |
242 | pos: 0, |
243 | }), |
244 | } |
245 | } |
246 | |
247 | /// Attempts to sync all OS-internal metadata to disk. |
248 | /// |
249 | /// This function will attempt to ensure that all in-core data reaches the |
250 | /// filesystem before returning. |
251 | /// |
252 | /// # Examples |
253 | /// |
254 | /// ```no_run |
255 | /// use tokio::fs::File; |
256 | /// use tokio::io::AsyncWriteExt; |
257 | /// |
258 | /// # async fn dox() -> std::io::Result<()> { |
259 | /// let mut file = File::create("foo.txt" ).await?; |
260 | /// file.write_all(b"hello, world!" ).await?; |
261 | /// file.sync_all().await?; |
262 | /// # Ok(()) |
263 | /// # } |
264 | /// ``` |
265 | /// |
266 | /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. |
267 | /// |
268 | /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all |
269 | /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt |
270 | pub async fn sync_all(&self) -> io::Result<()> { |
271 | let mut inner = self.inner.lock().await; |
272 | inner.complete_inflight().await; |
273 | |
274 | let std = self.std.clone(); |
275 | asyncify(move || std.sync_all()).await |
276 | } |
277 | |
278 | /// This function is similar to `sync_all`, except that it may not |
279 | /// synchronize file metadata to the filesystem. |
280 | /// |
281 | /// This is intended for use cases that must synchronize content, but don't |
282 | /// need the metadata on disk. The goal of this method is to reduce disk |
283 | /// operations. |
284 | /// |
285 | /// Note that some platforms may simply implement this in terms of `sync_all`. |
286 | /// |
287 | /// # Examples |
288 | /// |
289 | /// ```no_run |
290 | /// use tokio::fs::File; |
291 | /// use tokio::io::AsyncWriteExt; |
292 | /// |
293 | /// # async fn dox() -> std::io::Result<()> { |
294 | /// let mut file = File::create("foo.txt" ).await?; |
295 | /// file.write_all(b"hello, world!" ).await?; |
296 | /// file.sync_data().await?; |
297 | /// # Ok(()) |
298 | /// # } |
299 | /// ``` |
300 | /// |
301 | /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. |
302 | /// |
303 | /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all |
304 | /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt |
305 | pub async fn sync_data(&self) -> io::Result<()> { |
306 | let mut inner = self.inner.lock().await; |
307 | inner.complete_inflight().await; |
308 | |
309 | let std = self.std.clone(); |
310 | asyncify(move || std.sync_data()).await |
311 | } |
312 | |
313 | /// Truncates or extends the underlying file, updating the size of this file to become size. |
314 | /// |
315 | /// If the size is less than the current file's size, then the file will be |
316 | /// shrunk. If it is greater than the current file's size, then the file |
317 | /// will be extended to size and have all of the intermediate data filled in |
318 | /// with 0s. |
319 | /// |
320 | /// # Errors |
321 | /// |
322 | /// This function will return an error if the file is not opened for |
323 | /// writing. |
324 | /// |
325 | /// # Examples |
326 | /// |
327 | /// ```no_run |
328 | /// use tokio::fs::File; |
329 | /// use tokio::io::AsyncWriteExt; |
330 | /// |
331 | /// # async fn dox() -> std::io::Result<()> { |
332 | /// let mut file = File::create("foo.txt" ).await?; |
333 | /// file.write_all(b"hello, world!" ).await?; |
334 | /// file.set_len(10).await?; |
335 | /// # Ok(()) |
336 | /// # } |
337 | /// ``` |
338 | /// |
339 | /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. |
340 | /// |
341 | /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all |
342 | /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt |
343 | pub async fn set_len(&self, size: u64) -> io::Result<()> { |
344 | let mut inner = self.inner.lock().await; |
345 | inner.complete_inflight().await; |
346 | |
347 | let mut buf = match inner.state { |
348 | State::Idle(ref mut buf_cell) => buf_cell.take().unwrap(), |
349 | _ => unreachable!(), |
350 | }; |
351 | |
352 | let seek = if !buf.is_empty() { |
353 | Some(SeekFrom::Current(buf.discard_read())) |
354 | } else { |
355 | None |
356 | }; |
357 | |
358 | let std = self.std.clone(); |
359 | |
360 | inner.state = State::Busy(spawn_blocking(move || { |
361 | let res = if let Some(seek) = seek { |
362 | (&*std).seek(seek).and_then(|_| std.set_len(size)) |
363 | } else { |
364 | std.set_len(size) |
365 | } |
366 | .map(|()| 0); // the value is discarded later |
367 | |
368 | // Return the result as a seek |
369 | (Operation::Seek(res), buf) |
370 | })); |
371 | |
372 | let (op, buf) = match inner.state { |
373 | State::Idle(_) => unreachable!(), |
374 | State::Busy(ref mut rx) => rx.await?, |
375 | }; |
376 | |
377 | inner.state = State::Idle(Some(buf)); |
378 | |
379 | match op { |
380 | Operation::Seek(res) => res.map(|pos| { |
381 | inner.pos = pos; |
382 | }), |
383 | _ => unreachable!(), |
384 | } |
385 | } |
386 | |
387 | /// Queries metadata about the underlying file. |
388 | /// |
389 | /// # Examples |
390 | /// |
391 | /// ```no_run |
392 | /// use tokio::fs::File; |
393 | /// |
394 | /// # async fn dox() -> std::io::Result<()> { |
395 | /// let file = File::open("foo.txt" ).await?; |
396 | /// let metadata = file.metadata().await?; |
397 | /// |
398 | /// println!("{:?}" , metadata); |
399 | /// # Ok(()) |
400 | /// # } |
401 | /// ``` |
402 | pub async fn metadata(&self) -> io::Result<Metadata> { |
403 | let std = self.std.clone(); |
404 | asyncify(move || std.metadata()).await |
405 | } |
406 | |
407 | /// Creates a new `File` instance that shares the same underlying file handle |
408 | /// as the existing `File` instance. Reads, writes, and seeks will affect both |
409 | /// File instances simultaneously. |
410 | /// |
411 | /// # Examples |
412 | /// |
413 | /// ```no_run |
414 | /// use tokio::fs::File; |
415 | /// |
416 | /// # async fn dox() -> std::io::Result<()> { |
417 | /// let file = File::open("foo.txt" ).await?; |
418 | /// let file_clone = file.try_clone().await?; |
419 | /// # Ok(()) |
420 | /// # } |
421 | /// ``` |
422 | pub async fn try_clone(&self) -> io::Result<File> { |
423 | self.inner.lock().await.complete_inflight().await; |
424 | let std = self.std.clone(); |
425 | let std_file = asyncify(move || std.try_clone()).await?; |
426 | Ok(File::from_std(std_file)) |
427 | } |
428 | |
429 | /// Destructures `File` into a [`std::fs::File`]. This function is |
430 | /// async to allow any in-flight operations to complete. |
431 | /// |
432 | /// Use `File::try_into_std` to attempt conversion immediately. |
433 | /// |
434 | /// # Examples |
435 | /// |
436 | /// ```no_run |
437 | /// use tokio::fs::File; |
438 | /// |
439 | /// # async fn dox() -> std::io::Result<()> { |
440 | /// let tokio_file = File::open("foo.txt" ).await?; |
441 | /// let std_file = tokio_file.into_std().await; |
442 | /// # Ok(()) |
443 | /// # } |
444 | /// ``` |
445 | pub async fn into_std(mut self) -> StdFile { |
446 | self.inner.get_mut().complete_inflight().await; |
447 | Arc::try_unwrap(self.std).expect("Arc::try_unwrap failed" ) |
448 | } |
449 | |
450 | /// Tries to immediately destructure `File` into a [`std::fs::File`]. |
451 | /// |
452 | /// # Errors |
453 | /// |
454 | /// This function will return an error containing the file if some |
455 | /// operation is in-flight. |
456 | /// |
457 | /// # Examples |
458 | /// |
459 | /// ```no_run |
460 | /// use tokio::fs::File; |
461 | /// |
462 | /// # async fn dox() -> std::io::Result<()> { |
463 | /// let tokio_file = File::open("foo.txt" ).await?; |
464 | /// let std_file = tokio_file.try_into_std().unwrap(); |
465 | /// # Ok(()) |
466 | /// # } |
467 | /// ``` |
468 | pub fn try_into_std(mut self) -> Result<StdFile, Self> { |
469 | match Arc::try_unwrap(self.std) { |
470 | Ok(file) => Ok(file), |
471 | Err(std_file_arc) => { |
472 | self.std = std_file_arc; |
473 | Err(self) |
474 | } |
475 | } |
476 | } |
477 | |
478 | /// Changes the permissions on the underlying file. |
479 | /// |
480 | /// # Platform-specific behavior |
481 | /// |
482 | /// This function currently corresponds to the `fchmod` function on Unix and |
483 | /// the `SetFileInformationByHandle` function on Windows. Note that, this |
484 | /// [may change in the future][changes]. |
485 | /// |
486 | /// [changes]: https://doc.rust-lang.org/std/io/index.html#platform-specific-behavior |
487 | /// |
488 | /// # Errors |
489 | /// |
490 | /// This function will return an error if the user lacks permission change |
491 | /// attributes on the underlying file. It may also return an error in other |
492 | /// os-specific unspecified cases. |
493 | /// |
494 | /// # Examples |
495 | /// |
496 | /// ```no_run |
497 | /// use tokio::fs::File; |
498 | /// |
499 | /// # async fn dox() -> std::io::Result<()> { |
500 | /// let file = File::open("foo.txt" ).await?; |
501 | /// let mut perms = file.metadata().await?.permissions(); |
502 | /// perms.set_readonly(true); |
503 | /// file.set_permissions(perms).await?; |
504 | /// # Ok(()) |
505 | /// # } |
506 | /// ``` |
507 | pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> { |
508 | let std = self.std.clone(); |
509 | asyncify(move || std.set_permissions(perm)).await |
510 | } |
511 | } |
512 | |
513 | impl AsyncRead for File { |
514 | fn poll_read( |
515 | self: Pin<&mut Self>, |
516 | cx: &mut Context<'_>, |
517 | dst: &mut ReadBuf<'_>, |
518 | ) -> Poll<io::Result<()>> { |
519 | ready!(crate::trace::trace_leaf(cx)); |
520 | let me = self.get_mut(); |
521 | let inner = me.inner.get_mut(); |
522 | |
523 | loop { |
524 | match inner.state { |
525 | State::Idle(ref mut buf_cell) => { |
526 | let mut buf = buf_cell.take().unwrap(); |
527 | |
528 | if !buf.is_empty() { |
529 | buf.copy_to(dst); |
530 | *buf_cell = Some(buf); |
531 | return Poll::Ready(Ok(())); |
532 | } |
533 | |
534 | buf.ensure_capacity_for(dst); |
535 | let std = me.std.clone(); |
536 | |
537 | inner.state = State::Busy(spawn_blocking(move || { |
538 | let res = buf.read_from(&mut &*std); |
539 | (Operation::Read(res), buf) |
540 | })); |
541 | } |
542 | State::Busy(ref mut rx) => { |
543 | let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?; |
544 | |
545 | match op { |
546 | Operation::Read(Ok(_)) => { |
547 | buf.copy_to(dst); |
548 | inner.state = State::Idle(Some(buf)); |
549 | return Poll::Ready(Ok(())); |
550 | } |
551 | Operation::Read(Err(e)) => { |
552 | assert!(buf.is_empty()); |
553 | |
554 | inner.state = State::Idle(Some(buf)); |
555 | return Poll::Ready(Err(e)); |
556 | } |
557 | Operation::Write(Ok(())) => { |
558 | assert!(buf.is_empty()); |
559 | inner.state = State::Idle(Some(buf)); |
560 | continue; |
561 | } |
562 | Operation::Write(Err(e)) => { |
563 | assert!(inner.last_write_err.is_none()); |
564 | inner.last_write_err = Some(e.kind()); |
565 | inner.state = State::Idle(Some(buf)); |
566 | } |
567 | Operation::Seek(result) => { |
568 | assert!(buf.is_empty()); |
569 | inner.state = State::Idle(Some(buf)); |
570 | if let Ok(pos) = result { |
571 | inner.pos = pos; |
572 | } |
573 | continue; |
574 | } |
575 | } |
576 | } |
577 | } |
578 | } |
579 | } |
580 | } |
581 | |
582 | impl AsyncSeek for File { |
583 | fn start_seek(self: Pin<&mut Self>, mut pos: SeekFrom) -> io::Result<()> { |
584 | let me = self.get_mut(); |
585 | let inner = me.inner.get_mut(); |
586 | |
587 | match inner.state { |
588 | State::Busy(_) => Err(io::Error::new( |
589 | io::ErrorKind::Other, |
590 | "other file operation is pending, call poll_complete before start_seek" , |
591 | )), |
592 | State::Idle(ref mut buf_cell) => { |
593 | let mut buf = buf_cell.take().unwrap(); |
594 | |
595 | // Factor in any unread data from the buf |
596 | if !buf.is_empty() { |
597 | let n = buf.discard_read(); |
598 | |
599 | if let SeekFrom::Current(ref mut offset) = pos { |
600 | *offset += n; |
601 | } |
602 | } |
603 | |
604 | let std = me.std.clone(); |
605 | |
606 | inner.state = State::Busy(spawn_blocking(move || { |
607 | let res = (&*std).seek(pos); |
608 | (Operation::Seek(res), buf) |
609 | })); |
610 | Ok(()) |
611 | } |
612 | } |
613 | } |
614 | |
615 | fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> { |
616 | ready!(crate::trace::trace_leaf(cx)); |
617 | let inner = self.inner.get_mut(); |
618 | |
619 | loop { |
620 | match inner.state { |
621 | State::Idle(_) => return Poll::Ready(Ok(inner.pos)), |
622 | State::Busy(ref mut rx) => { |
623 | let (op, buf) = ready!(Pin::new(rx).poll(cx))?; |
624 | inner.state = State::Idle(Some(buf)); |
625 | |
626 | match op { |
627 | Operation::Read(_) => {} |
628 | Operation::Write(Err(e)) => { |
629 | assert!(inner.last_write_err.is_none()); |
630 | inner.last_write_err = Some(e.kind()); |
631 | } |
632 | Operation::Write(_) => {} |
633 | Operation::Seek(res) => { |
634 | if let Ok(pos) = res { |
635 | inner.pos = pos; |
636 | } |
637 | return Poll::Ready(res); |
638 | } |
639 | } |
640 | } |
641 | } |
642 | } |
643 | } |
644 | } |
645 | |
646 | impl AsyncWrite for File { |
647 | fn poll_write( |
648 | self: Pin<&mut Self>, |
649 | cx: &mut Context<'_>, |
650 | src: &[u8], |
651 | ) -> Poll<io::Result<usize>> { |
652 | ready!(crate::trace::trace_leaf(cx)); |
653 | let me = self.get_mut(); |
654 | let inner = me.inner.get_mut(); |
655 | |
656 | if let Some(e) = inner.last_write_err.take() { |
657 | return Poll::Ready(Err(e.into())); |
658 | } |
659 | |
660 | loop { |
661 | match inner.state { |
662 | State::Idle(ref mut buf_cell) => { |
663 | let mut buf = buf_cell.take().unwrap(); |
664 | |
665 | let seek = if !buf.is_empty() { |
666 | Some(SeekFrom::Current(buf.discard_read())) |
667 | } else { |
668 | None |
669 | }; |
670 | |
671 | let n = buf.copy_from(src); |
672 | let std = me.std.clone(); |
673 | |
674 | let blocking_task_join_handle = spawn_mandatory_blocking(move || { |
675 | let res = if let Some(seek) = seek { |
676 | (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) |
677 | } else { |
678 | buf.write_to(&mut &*std) |
679 | }; |
680 | |
681 | (Operation::Write(res), buf) |
682 | }) |
683 | .ok_or_else(|| { |
684 | io::Error::new(io::ErrorKind::Other, "background task failed" ) |
685 | })?; |
686 | |
687 | inner.state = State::Busy(blocking_task_join_handle); |
688 | |
689 | return Poll::Ready(Ok(n)); |
690 | } |
691 | State::Busy(ref mut rx) => { |
692 | let (op, buf) = ready!(Pin::new(rx).poll(cx))?; |
693 | inner.state = State::Idle(Some(buf)); |
694 | |
695 | match op { |
696 | Operation::Read(_) => { |
697 | // We don't care about the result here. The fact |
698 | // that the cursor has advanced will be reflected in |
699 | // the next iteration of the loop |
700 | continue; |
701 | } |
702 | Operation::Write(res) => { |
703 | // If the previous write was successful, continue. |
704 | // Otherwise, error. |
705 | res?; |
706 | continue; |
707 | } |
708 | Operation::Seek(_) => { |
709 | // Ignore the seek |
710 | continue; |
711 | } |
712 | } |
713 | } |
714 | } |
715 | } |
716 | } |
717 | |
718 | fn poll_write_vectored( |
719 | self: Pin<&mut Self>, |
720 | cx: &mut Context<'_>, |
721 | bufs: &[io::IoSlice<'_>], |
722 | ) -> Poll<Result<usize, io::Error>> { |
723 | ready!(crate::trace::trace_leaf(cx)); |
724 | let me = self.get_mut(); |
725 | let inner = me.inner.get_mut(); |
726 | |
727 | if let Some(e) = inner.last_write_err.take() { |
728 | return Poll::Ready(Err(e.into())); |
729 | } |
730 | |
731 | loop { |
732 | match inner.state { |
733 | State::Idle(ref mut buf_cell) => { |
734 | let mut buf = buf_cell.take().unwrap(); |
735 | |
736 | let seek = if !buf.is_empty() { |
737 | Some(SeekFrom::Current(buf.discard_read())) |
738 | } else { |
739 | None |
740 | }; |
741 | |
742 | let n = buf.copy_from_bufs(bufs); |
743 | let std = me.std.clone(); |
744 | |
745 | let blocking_task_join_handle = spawn_mandatory_blocking(move || { |
746 | let res = if let Some(seek) = seek { |
747 | (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) |
748 | } else { |
749 | buf.write_to(&mut &*std) |
750 | }; |
751 | |
752 | (Operation::Write(res), buf) |
753 | }) |
754 | .ok_or_else(|| { |
755 | io::Error::new(io::ErrorKind::Other, "background task failed" ) |
756 | })?; |
757 | |
758 | inner.state = State::Busy(blocking_task_join_handle); |
759 | |
760 | return Poll::Ready(Ok(n)); |
761 | } |
762 | State::Busy(ref mut rx) => { |
763 | let (op, buf) = ready!(Pin::new(rx).poll(cx))?; |
764 | inner.state = State::Idle(Some(buf)); |
765 | |
766 | match op { |
767 | Operation::Read(_) => { |
768 | // We don't care about the result here. The fact |
769 | // that the cursor has advanced will be reflected in |
770 | // the next iteration of the loop |
771 | continue; |
772 | } |
773 | Operation::Write(res) => { |
774 | // If the previous write was successful, continue. |
775 | // Otherwise, error. |
776 | res?; |
777 | continue; |
778 | } |
779 | Operation::Seek(_) => { |
780 | // Ignore the seek |
781 | continue; |
782 | } |
783 | } |
784 | } |
785 | } |
786 | } |
787 | } |
788 | |
789 | fn is_write_vectored(&self) -> bool { |
790 | true |
791 | } |
792 | |
793 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { |
794 | ready!(crate::trace::trace_leaf(cx)); |
795 | let inner = self.inner.get_mut(); |
796 | inner.poll_flush(cx) |
797 | } |
798 | |
799 | fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { |
800 | ready!(crate::trace::trace_leaf(cx)); |
801 | self.poll_flush(cx) |
802 | } |
803 | } |
804 | |
805 | impl From<StdFile> for File { |
806 | fn from(std: StdFile) -> Self { |
807 | Self::from_std(std) |
808 | } |
809 | } |
810 | |
811 | impl fmt::Debug for File { |
812 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
813 | fmt&mut DebugStruct<'_, '_>.debug_struct("tokio::fs::File" ) |
814 | .field(name:"std" , &self.std) |
815 | .finish() |
816 | } |
817 | } |
818 | |
819 | #[cfg (unix)] |
820 | impl std::os::unix::io::AsRawFd for File { |
821 | fn as_raw_fd(&self) -> std::os::unix::io::RawFd { |
822 | self.std.as_raw_fd() |
823 | } |
824 | } |
825 | |
826 | #[cfg (unix)] |
827 | impl std::os::unix::io::AsFd for File { |
828 | fn as_fd(&self) -> std::os::unix::io::BorrowedFd<'_> { |
829 | unsafe { |
830 | std::os::unix::io::BorrowedFd::borrow_raw(fd:std::os::unix::io::AsRawFd::as_raw_fd(self)) |
831 | } |
832 | } |
833 | } |
834 | |
835 | #[cfg (unix)] |
836 | impl std::os::unix::io::FromRawFd for File { |
837 | unsafe fn from_raw_fd(fd: std::os::unix::io::RawFd) -> Self { |
838 | StdFile::from_raw_fd(fd).into() |
839 | } |
840 | } |
841 | |
842 | cfg_windows! { |
843 | use crate::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle, AsHandle, BorrowedHandle}; |
844 | |
845 | impl AsRawHandle for File { |
846 | fn as_raw_handle(&self) -> RawHandle { |
847 | self.std.as_raw_handle() |
848 | } |
849 | } |
850 | |
851 | impl AsHandle for File { |
852 | fn as_handle(&self) -> BorrowedHandle<'_> { |
853 | unsafe { |
854 | BorrowedHandle::borrow_raw( |
855 | AsRawHandle::as_raw_handle(self), |
856 | ) |
857 | } |
858 | } |
859 | } |
860 | |
861 | impl FromRawHandle for File { |
862 | unsafe fn from_raw_handle(handle: RawHandle) -> Self { |
863 | StdFile::from_raw_handle(handle).into() |
864 | } |
865 | } |
866 | } |
867 | |
868 | impl Inner { |
869 | async fn complete_inflight(&mut self) { |
870 | use crate::future::poll_fn; |
871 | |
872 | poll_fn(|cx| self.poll_complete_inflight(cx)).await; |
873 | } |
874 | |
875 | fn poll_complete_inflight(&mut self, cx: &mut Context<'_>) -> Poll<()> { |
876 | ready!(crate::trace::trace_leaf(cx)); |
877 | match self.poll_flush(cx) { |
878 | Poll::Ready(Err(e)) => { |
879 | self.last_write_err = Some(e.kind()); |
880 | Poll::Ready(()) |
881 | } |
882 | Poll::Ready(Ok(())) => Poll::Ready(()), |
883 | Poll::Pending => Poll::Pending, |
884 | } |
885 | } |
886 | |
887 | fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { |
888 | if let Some(e) = self.last_write_err.take() { |
889 | return Poll::Ready(Err(e.into())); |
890 | } |
891 | |
892 | let (op, buf) = match self.state { |
893 | State::Idle(_) => return Poll::Ready(Ok(())), |
894 | State::Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?, |
895 | }; |
896 | |
897 | // The buffer is not used here |
898 | self.state = State::Idle(Some(buf)); |
899 | |
900 | match op { |
901 | Operation::Read(_) => Poll::Ready(Ok(())), |
902 | Operation::Write(res) => Poll::Ready(res), |
903 | Operation::Seek(_) => Poll::Ready(Ok(())), |
904 | } |
905 | } |
906 | } |
907 | |
908 | #[cfg (test)] |
909 | mod tests; |
910 | |