1 | //! Types for working with [`File`]. |
2 | //! |
3 | //! [`File`]: File |
4 | |
5 | use crate::fs::{asyncify, OpenOptions}; |
6 | use crate::io::blocking::{Buf, DEFAULT_MAX_BUF_SIZE}; |
7 | use crate::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}; |
8 | use crate::sync::Mutex; |
9 | |
10 | use std::cmp; |
11 | use std::fmt; |
12 | use std::fs::{Metadata, Permissions}; |
13 | use std::future::Future; |
14 | use std::io::{self, Seek, SeekFrom}; |
15 | use std::path::Path; |
16 | use std::pin::Pin; |
17 | use std::sync::Arc; |
18 | use std::task::{ready, Context, Poll}; |
19 | |
20 | #[cfg (test)] |
21 | use super::mocks::JoinHandle; |
22 | #[cfg (test)] |
23 | use super::mocks::MockFile as StdFile; |
24 | #[cfg (test)] |
25 | use super::mocks::{spawn_blocking, spawn_mandatory_blocking}; |
26 | #[cfg (not(test))] |
27 | use crate::blocking::JoinHandle; |
28 | #[cfg (not(test))] |
29 | use crate::blocking::{spawn_blocking, spawn_mandatory_blocking}; |
30 | #[cfg (not(test))] |
31 | use std::fs::File as StdFile; |
32 | |
33 | /// A reference to an open file on the filesystem. |
34 | /// |
35 | /// This is a specialized version of [`std::fs::File`] for usage from the |
36 | /// Tokio runtime. |
37 | /// |
38 | /// An instance of a `File` can be read and/or written depending on what options |
39 | /// it was opened with. Files also implement [`AsyncSeek`] to alter the logical |
40 | /// cursor that the file contains internally. |
41 | /// |
42 | /// A file will not be closed immediately when it goes out of scope if there |
43 | /// are any IO operations that have not yet completed. To ensure that a file is |
44 | /// closed immediately when it is dropped, you should call [`flush`] before |
45 | /// dropping it. Note that this does not ensure that the file has been fully |
46 | /// written to disk; the operating system might keep the changes around in an |
47 | /// in-memory buffer. See the [`sync_all`] method for telling the OS to write |
48 | /// the data to disk. |
49 | /// |
50 | /// Reading and writing to a `File` is usually done using the convenience |
51 | /// methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] traits. |
52 | /// |
53 | /// [`AsyncSeek`]: trait@crate::io::AsyncSeek |
54 | /// [`flush`]: fn@crate::io::AsyncWriteExt::flush |
55 | /// [`sync_all`]: fn@crate::fs::File::sync_all |
56 | /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt |
57 | /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt |
58 | /// |
59 | /// # Examples |
60 | /// |
61 | /// Create a new file and asynchronously write bytes to it: |
62 | /// |
63 | /// ```no_run |
64 | /// use tokio::fs::File; |
65 | /// use tokio::io::AsyncWriteExt; // for write_all() |
66 | /// |
67 | /// # async fn dox() -> std::io::Result<()> { |
68 | /// let mut file = File::create("foo.txt" ).await?; |
69 | /// file.write_all(b"hello, world!" ).await?; |
70 | /// # Ok(()) |
71 | /// # } |
72 | /// ``` |
73 | /// |
74 | /// Read the contents of a file into a buffer: |
75 | /// |
76 | /// ```no_run |
77 | /// use tokio::fs::File; |
78 | /// use tokio::io::AsyncReadExt; // for read_to_end() |
79 | /// |
80 | /// # async fn dox() -> std::io::Result<()> { |
81 | /// let mut file = File::open("foo.txt" ).await?; |
82 | /// |
83 | /// let mut contents = vec![]; |
84 | /// file.read_to_end(&mut contents).await?; |
85 | /// |
86 | /// println!("len = {}" , contents.len()); |
87 | /// # Ok(()) |
88 | /// # } |
89 | /// ``` |
90 | pub struct File { |
91 | std: Arc<StdFile>, |
92 | inner: Mutex<Inner>, |
93 | max_buf_size: usize, |
94 | } |
95 | |
96 | struct Inner { |
97 | state: State, |
98 | |
99 | /// Errors from writes/flushes are returned in write/flush calls. If a write |
100 | /// error is observed while performing a read, it is saved until the next |
101 | /// write / flush call. |
102 | last_write_err: Option<io::ErrorKind>, |
103 | |
104 | pos: u64, |
105 | } |
106 | |
107 | #[derive (Debug)] |
108 | enum State { |
109 | Idle(Option<Buf>), |
110 | Busy(JoinHandle<(Operation, Buf)>), |
111 | } |
112 | |
113 | #[derive (Debug)] |
114 | enum Operation { |
115 | Read(io::Result<usize>), |
116 | Write(io::Result<()>), |
117 | Seek(io::Result<u64>), |
118 | } |
119 | |
120 | impl File { |
121 | /// Attempts to open a file in read-only mode. |
122 | /// |
123 | /// See [`OpenOptions`] for more details. |
124 | /// |
125 | /// # Errors |
126 | /// |
127 | /// This function will return an error if called from outside of the Tokio |
128 | /// runtime or if path does not already exist. Other errors may also be |
129 | /// returned according to `OpenOptions::open`. |
130 | /// |
131 | /// # Examples |
132 | /// |
133 | /// ```no_run |
134 | /// use tokio::fs::File; |
135 | /// use tokio::io::AsyncReadExt; |
136 | /// |
137 | /// # async fn dox() -> std::io::Result<()> { |
138 | /// let mut file = File::open("foo.txt" ).await?; |
139 | /// |
140 | /// let mut contents = vec![]; |
141 | /// file.read_to_end(&mut contents).await?; |
142 | /// |
143 | /// println!("len = {}" , contents.len()); |
144 | /// # Ok(()) |
145 | /// # } |
146 | /// ``` |
147 | /// |
148 | /// The [`read_to_end`] method is defined on the [`AsyncReadExt`] trait. |
149 | /// |
150 | /// [`read_to_end`]: fn@crate::io::AsyncReadExt::read_to_end |
151 | /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt |
152 | pub async fn open(path: impl AsRef<Path>) -> io::Result<File> { |
153 | let path = path.as_ref().to_owned(); |
154 | let std = asyncify(|| StdFile::open(path)).await?; |
155 | |
156 | Ok(File::from_std(std)) |
157 | } |
158 | |
159 | /// Opens a file in write-only mode. |
160 | /// |
161 | /// This function will create a file if it does not exist, and will truncate |
162 | /// it if it does. |
163 | /// |
164 | /// See [`OpenOptions`] for more details. |
165 | /// |
166 | /// # Errors |
167 | /// |
168 | /// Results in an error if called from outside of the Tokio runtime or if |
169 | /// the underlying [`create`] call results in an error. |
170 | /// |
171 | /// [`create`]: std::fs::File::create |
172 | /// |
173 | /// # Examples |
174 | /// |
175 | /// ```no_run |
176 | /// use tokio::fs::File; |
177 | /// use tokio::io::AsyncWriteExt; |
178 | /// |
179 | /// # async fn dox() -> std::io::Result<()> { |
180 | /// let mut file = File::create("foo.txt" ).await?; |
181 | /// file.write_all(b"hello, world!" ).await?; |
182 | /// # Ok(()) |
183 | /// # } |
184 | /// ``` |
185 | /// |
186 | /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. |
187 | /// |
188 | /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all |
189 | /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt |
190 | pub async fn create(path: impl AsRef<Path>) -> io::Result<File> { |
191 | let path = path.as_ref().to_owned(); |
192 | let std_file = asyncify(move || StdFile::create(path)).await?; |
193 | Ok(File::from_std(std_file)) |
194 | } |
195 | |
196 | /// Opens a file in read-write mode. |
197 | /// |
198 | /// This function will create a file if it does not exist, or return an error |
199 | /// if it does. This way, if the call succeeds, the file returned is guaranteed |
200 | /// to be new. |
201 | /// |
202 | /// This option is useful because it is atomic. Otherwise between checking |
203 | /// whether a file exists and creating a new one, the file may have been |
204 | /// created by another process (a TOCTOU race condition / attack). |
205 | /// |
206 | /// This can also be written using `File::options().read(true).write(true).create_new(true).open(...)`. |
207 | /// |
208 | /// See [`OpenOptions`] for more details. |
209 | /// |
210 | /// # Examples |
211 | /// |
212 | /// ```no_run |
213 | /// use tokio::fs::File; |
214 | /// use tokio::io::AsyncWriteExt; |
215 | /// |
216 | /// # async fn dox() -> std::io::Result<()> { |
217 | /// let mut file = File::create_new("foo.txt" ).await?; |
218 | /// file.write_all(b"hello, world!" ).await?; |
219 | /// # Ok(()) |
220 | /// # } |
221 | /// ``` |
222 | /// |
223 | /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. |
224 | /// |
225 | /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all |
226 | /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt |
227 | pub async fn create_new<P: AsRef<Path>>(path: P) -> std::io::Result<File> { |
228 | Self::options() |
229 | .read(true) |
230 | .write(true) |
231 | .create_new(true) |
232 | .open(path) |
233 | .await |
234 | } |
235 | |
236 | /// Returns a new [`OpenOptions`] object. |
237 | /// |
238 | /// This function returns a new `OpenOptions` object that you can use to |
239 | /// open or create a file with specific options if `open()` or `create()` |
240 | /// are not appropriate. |
241 | /// |
242 | /// It is equivalent to `OpenOptions::new()`, but allows you to write more |
243 | /// readable code. Instead of |
244 | /// `OpenOptions::new().append(true).open("example.log")`, |
245 | /// you can write `File::options().append(true).open("example.log")`. This |
246 | /// also avoids the need to import `OpenOptions`. |
247 | /// |
248 | /// See the [`OpenOptions::new`] function for more details. |
249 | /// |
250 | /// # Examples |
251 | /// |
252 | /// ```no_run |
253 | /// use tokio::fs::File; |
254 | /// use tokio::io::AsyncWriteExt; |
255 | /// |
256 | /// # async fn dox() -> std::io::Result<()> { |
257 | /// let mut f = File::options().append(true).open("example.log" ).await?; |
258 | /// f.write_all(b"new line \n" ).await?; |
259 | /// # Ok(()) |
260 | /// # } |
261 | /// ``` |
262 | #[must_use ] |
263 | pub fn options() -> OpenOptions { |
264 | OpenOptions::new() |
265 | } |
266 | |
267 | /// Converts a [`std::fs::File`] to a [`tokio::fs::File`](File). |
268 | /// |
269 | /// # Examples |
270 | /// |
271 | /// ```no_run |
272 | /// // This line could block. It is not recommended to do this on the Tokio |
273 | /// // runtime. |
274 | /// let std_file = std::fs::File::open("foo.txt" ).unwrap(); |
275 | /// let file = tokio::fs::File::from_std(std_file); |
276 | /// ``` |
277 | pub fn from_std(std: StdFile) -> File { |
278 | File { |
279 | std: Arc::new(std), |
280 | inner: Mutex::new(Inner { |
281 | state: State::Idle(Some(Buf::with_capacity(0))), |
282 | last_write_err: None, |
283 | pos: 0, |
284 | }), |
285 | max_buf_size: DEFAULT_MAX_BUF_SIZE, |
286 | } |
287 | } |
288 | |
289 | /// Attempts to sync all OS-internal metadata to disk. |
290 | /// |
291 | /// This function will attempt to ensure that all in-core data reaches the |
292 | /// filesystem before returning. |
293 | /// |
294 | /// # Examples |
295 | /// |
296 | /// ```no_run |
297 | /// use tokio::fs::File; |
298 | /// use tokio::io::AsyncWriteExt; |
299 | /// |
300 | /// # async fn dox() -> std::io::Result<()> { |
301 | /// let mut file = File::create("foo.txt" ).await?; |
302 | /// file.write_all(b"hello, world!" ).await?; |
303 | /// file.sync_all().await?; |
304 | /// # Ok(()) |
305 | /// # } |
306 | /// ``` |
307 | /// |
308 | /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. |
309 | /// |
310 | /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all |
311 | /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt |
312 | pub async fn sync_all(&self) -> io::Result<()> { |
313 | let mut inner = self.inner.lock().await; |
314 | inner.complete_inflight().await; |
315 | |
316 | let std = self.std.clone(); |
317 | asyncify(move || std.sync_all()).await |
318 | } |
319 | |
320 | /// This function is similar to `sync_all`, except that it may not |
321 | /// synchronize file metadata to the filesystem. |
322 | /// |
323 | /// This is intended for use cases that must synchronize content, but don't |
324 | /// need the metadata on disk. The goal of this method is to reduce disk |
325 | /// operations. |
326 | /// |
327 | /// Note that some platforms may simply implement this in terms of `sync_all`. |
328 | /// |
329 | /// # Examples |
330 | /// |
331 | /// ```no_run |
332 | /// use tokio::fs::File; |
333 | /// use tokio::io::AsyncWriteExt; |
334 | /// |
335 | /// # async fn dox() -> std::io::Result<()> { |
336 | /// let mut file = File::create("foo.txt" ).await?; |
337 | /// file.write_all(b"hello, world!" ).await?; |
338 | /// file.sync_data().await?; |
339 | /// # Ok(()) |
340 | /// # } |
341 | /// ``` |
342 | /// |
343 | /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. |
344 | /// |
345 | /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all |
346 | /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt |
347 | pub async fn sync_data(&self) -> io::Result<()> { |
348 | let mut inner = self.inner.lock().await; |
349 | inner.complete_inflight().await; |
350 | |
351 | let std = self.std.clone(); |
352 | asyncify(move || std.sync_data()).await |
353 | } |
354 | |
355 | /// Truncates or extends the underlying file, updating the size of this file to become size. |
356 | /// |
357 | /// If the size is less than the current file's size, then the file will be |
358 | /// shrunk. If it is greater than the current file's size, then the file |
359 | /// will be extended to size and have all of the intermediate data filled in |
360 | /// with 0s. |
361 | /// |
362 | /// # Errors |
363 | /// |
364 | /// This function will return an error if the file is not opened for |
365 | /// writing. |
366 | /// |
367 | /// # Examples |
368 | /// |
369 | /// ```no_run |
370 | /// use tokio::fs::File; |
371 | /// use tokio::io::AsyncWriteExt; |
372 | /// |
373 | /// # async fn dox() -> std::io::Result<()> { |
374 | /// let mut file = File::create("foo.txt" ).await?; |
375 | /// file.write_all(b"hello, world!" ).await?; |
376 | /// file.set_len(10).await?; |
377 | /// # Ok(()) |
378 | /// # } |
379 | /// ``` |
380 | /// |
381 | /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. |
382 | /// |
383 | /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all |
384 | /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt |
385 | pub async fn set_len(&self, size: u64) -> io::Result<()> { |
386 | let mut inner = self.inner.lock().await; |
387 | inner.complete_inflight().await; |
388 | |
389 | let mut buf = match inner.state { |
390 | State::Idle(ref mut buf_cell) => buf_cell.take().unwrap(), |
391 | _ => unreachable!(), |
392 | }; |
393 | |
394 | let seek = if !buf.is_empty() { |
395 | Some(SeekFrom::Current(buf.discard_read())) |
396 | } else { |
397 | None |
398 | }; |
399 | |
400 | let std = self.std.clone(); |
401 | |
402 | inner.state = State::Busy(spawn_blocking(move || { |
403 | let res = if let Some(seek) = seek { |
404 | (&*std).seek(seek).and_then(|_| std.set_len(size)) |
405 | } else { |
406 | std.set_len(size) |
407 | } |
408 | .map(|()| 0); // the value is discarded later |
409 | |
410 | // Return the result as a seek |
411 | (Operation::Seek(res), buf) |
412 | })); |
413 | |
414 | let (op, buf) = match inner.state { |
415 | State::Idle(_) => unreachable!(), |
416 | State::Busy(ref mut rx) => rx.await?, |
417 | }; |
418 | |
419 | inner.state = State::Idle(Some(buf)); |
420 | |
421 | match op { |
422 | Operation::Seek(res) => res.map(|pos| { |
423 | inner.pos = pos; |
424 | }), |
425 | _ => unreachable!(), |
426 | } |
427 | } |
428 | |
429 | /// Queries metadata about the underlying file. |
430 | /// |
431 | /// # Examples |
432 | /// |
433 | /// ```no_run |
434 | /// use tokio::fs::File; |
435 | /// |
436 | /// # async fn dox() -> std::io::Result<()> { |
437 | /// let file = File::open("foo.txt" ).await?; |
438 | /// let metadata = file.metadata().await?; |
439 | /// |
440 | /// println!("{:?}" , metadata); |
441 | /// # Ok(()) |
442 | /// # } |
443 | /// ``` |
444 | pub async fn metadata(&self) -> io::Result<Metadata> { |
445 | let std = self.std.clone(); |
446 | asyncify(move || std.metadata()).await |
447 | } |
448 | |
449 | /// Creates a new `File` instance that shares the same underlying file handle |
450 | /// as the existing `File` instance. Reads, writes, and seeks will affect both |
451 | /// File instances simultaneously. |
452 | /// |
453 | /// # Examples |
454 | /// |
455 | /// ```no_run |
456 | /// use tokio::fs::File; |
457 | /// |
458 | /// # async fn dox() -> std::io::Result<()> { |
459 | /// let file = File::open("foo.txt" ).await?; |
460 | /// let file_clone = file.try_clone().await?; |
461 | /// # Ok(()) |
462 | /// # } |
463 | /// ``` |
464 | pub async fn try_clone(&self) -> io::Result<File> { |
465 | self.inner.lock().await.complete_inflight().await; |
466 | let std = self.std.clone(); |
467 | let std_file = asyncify(move || std.try_clone()).await?; |
468 | Ok(File::from_std(std_file)) |
469 | } |
470 | |
471 | /// Destructures `File` into a [`std::fs::File`]. This function is |
472 | /// async to allow any in-flight operations to complete. |
473 | /// |
474 | /// Use `File::try_into_std` to attempt conversion immediately. |
475 | /// |
476 | /// # Examples |
477 | /// |
478 | /// ```no_run |
479 | /// use tokio::fs::File; |
480 | /// |
481 | /// # async fn dox() -> std::io::Result<()> { |
482 | /// let tokio_file = File::open("foo.txt" ).await?; |
483 | /// let std_file = tokio_file.into_std().await; |
484 | /// # Ok(()) |
485 | /// # } |
486 | /// ``` |
487 | pub async fn into_std(mut self) -> StdFile { |
488 | self.inner.get_mut().complete_inflight().await; |
489 | Arc::try_unwrap(self.std).expect("Arc::try_unwrap failed" ) |
490 | } |
491 | |
492 | /// Tries to immediately destructure `File` into a [`std::fs::File`]. |
493 | /// |
494 | /// # Errors |
495 | /// |
496 | /// This function will return an error containing the file if some |
497 | /// operation is in-flight. |
498 | /// |
499 | /// # Examples |
500 | /// |
501 | /// ```no_run |
502 | /// use tokio::fs::File; |
503 | /// |
504 | /// # async fn dox() -> std::io::Result<()> { |
505 | /// let tokio_file = File::open("foo.txt" ).await?; |
506 | /// let std_file = tokio_file.try_into_std().unwrap(); |
507 | /// # Ok(()) |
508 | /// # } |
509 | /// ``` |
510 | pub fn try_into_std(mut self) -> Result<StdFile, Self> { |
511 | match Arc::try_unwrap(self.std) { |
512 | Ok(file) => Ok(file), |
513 | Err(std_file_arc) => { |
514 | self.std = std_file_arc; |
515 | Err(self) |
516 | } |
517 | } |
518 | } |
519 | |
520 | /// Changes the permissions on the underlying file. |
521 | /// |
522 | /// # Platform-specific behavior |
523 | /// |
524 | /// This function currently corresponds to the `fchmod` function on Unix and |
525 | /// the `SetFileInformationByHandle` function on Windows. Note that, this |
526 | /// [may change in the future][changes]. |
527 | /// |
528 | /// [changes]: https://doc.rust-lang.org/std/io/index.html#platform-specific-behavior |
529 | /// |
530 | /// # Errors |
531 | /// |
532 | /// This function will return an error if the user lacks permission change |
533 | /// attributes on the underlying file. It may also return an error in other |
534 | /// os-specific unspecified cases. |
535 | /// |
536 | /// # Examples |
537 | /// |
538 | /// ```no_run |
539 | /// use tokio::fs::File; |
540 | /// |
541 | /// # async fn dox() -> std::io::Result<()> { |
542 | /// let file = File::open("foo.txt" ).await?; |
543 | /// let mut perms = file.metadata().await?.permissions(); |
544 | /// perms.set_readonly(true); |
545 | /// file.set_permissions(perms).await?; |
546 | /// # Ok(()) |
547 | /// # } |
548 | /// ``` |
549 | pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> { |
550 | let std = self.std.clone(); |
551 | asyncify(move || std.set_permissions(perm)).await |
552 | } |
553 | |
554 | /// Set the maximum buffer size for the underlying [`AsyncRead`] / [`AsyncWrite`] operation. |
555 | /// |
556 | /// Although Tokio uses a sensible default value for this buffer size, this function would be |
557 | /// useful for changing that default depending on the situation. |
558 | /// |
559 | /// # Examples |
560 | /// |
561 | /// ```no_run |
562 | /// use tokio::fs::File; |
563 | /// use tokio::io::AsyncWriteExt; |
564 | /// |
565 | /// # async fn dox() -> std::io::Result<()> { |
566 | /// let mut file = File::open("foo.txt" ).await?; |
567 | /// |
568 | /// // Set maximum buffer size to 8 MiB |
569 | /// file.set_max_buf_size(8 * 1024 * 1024); |
570 | /// |
571 | /// let mut buf = vec![1; 1024 * 1024 * 1024]; |
572 | /// |
573 | /// // Write the 1 GiB buffer in chunks up to 8 MiB each. |
574 | /// file.write_all(&mut buf).await?; |
575 | /// # Ok(()) |
576 | /// # } |
577 | /// ``` |
578 | pub fn set_max_buf_size(&mut self, max_buf_size: usize) { |
579 | self.max_buf_size = max_buf_size; |
580 | } |
581 | } |
582 | |
583 | impl AsyncRead for File { |
584 | fn poll_read( |
585 | self: Pin<&mut Self>, |
586 | cx: &mut Context<'_>, |
587 | dst: &mut ReadBuf<'_>, |
588 | ) -> Poll<io::Result<()>> { |
589 | ready!(crate::trace::trace_leaf(cx)); |
590 | |
591 | let me = self.get_mut(); |
592 | let inner = me.inner.get_mut(); |
593 | |
594 | loop { |
595 | match inner.state { |
596 | State::Idle(ref mut buf_cell) => { |
597 | let mut buf = buf_cell.take().unwrap(); |
598 | |
599 | if !buf.is_empty() || dst.remaining() == 0 { |
600 | buf.copy_to(dst); |
601 | *buf_cell = Some(buf); |
602 | return Poll::Ready(Ok(())); |
603 | } |
604 | |
605 | let std = me.std.clone(); |
606 | |
607 | let max_buf_size = cmp::min(dst.remaining(), me.max_buf_size); |
608 | inner.state = State::Busy(spawn_blocking(move || { |
609 | // SAFETY: the `Read` implementation of `std` does not |
610 | // read from the buffer it is borrowing and correctly |
611 | // reports the length of the data written into the buffer. |
612 | let res = unsafe { buf.read_from(&mut &*std, max_buf_size) }; |
613 | (Operation::Read(res), buf) |
614 | })); |
615 | } |
616 | State::Busy(ref mut rx) => { |
617 | let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?; |
618 | |
619 | match op { |
620 | Operation::Read(Ok(_)) => { |
621 | buf.copy_to(dst); |
622 | inner.state = State::Idle(Some(buf)); |
623 | return Poll::Ready(Ok(())); |
624 | } |
625 | Operation::Read(Err(e)) => { |
626 | assert!(buf.is_empty()); |
627 | |
628 | inner.state = State::Idle(Some(buf)); |
629 | return Poll::Ready(Err(e)); |
630 | } |
631 | Operation::Write(Ok(())) => { |
632 | assert!(buf.is_empty()); |
633 | inner.state = State::Idle(Some(buf)); |
634 | continue; |
635 | } |
636 | Operation::Write(Err(e)) => { |
637 | assert!(inner.last_write_err.is_none()); |
638 | inner.last_write_err = Some(e.kind()); |
639 | inner.state = State::Idle(Some(buf)); |
640 | } |
641 | Operation::Seek(result) => { |
642 | assert!(buf.is_empty()); |
643 | inner.state = State::Idle(Some(buf)); |
644 | if let Ok(pos) = result { |
645 | inner.pos = pos; |
646 | } |
647 | continue; |
648 | } |
649 | } |
650 | } |
651 | } |
652 | } |
653 | } |
654 | } |
655 | |
656 | impl AsyncSeek for File { |
657 | fn start_seek(self: Pin<&mut Self>, mut pos: SeekFrom) -> io::Result<()> { |
658 | let me = self.get_mut(); |
659 | let inner = me.inner.get_mut(); |
660 | |
661 | match inner.state { |
662 | State::Busy(_) => Err(io::Error::new( |
663 | io::ErrorKind::Other, |
664 | "other file operation is pending, call poll_complete before start_seek" , |
665 | )), |
666 | State::Idle(ref mut buf_cell) => { |
667 | let mut buf = buf_cell.take().unwrap(); |
668 | |
669 | // Factor in any unread data from the buf |
670 | if !buf.is_empty() { |
671 | let n = buf.discard_read(); |
672 | |
673 | if let SeekFrom::Current(ref mut offset) = pos { |
674 | *offset += n; |
675 | } |
676 | } |
677 | |
678 | let std = me.std.clone(); |
679 | |
680 | inner.state = State::Busy(spawn_blocking(move || { |
681 | let res = (&*std).seek(pos); |
682 | (Operation::Seek(res), buf) |
683 | })); |
684 | Ok(()) |
685 | } |
686 | } |
687 | } |
688 | |
689 | fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> { |
690 | ready!(crate::trace::trace_leaf(cx)); |
691 | let inner = self.inner.get_mut(); |
692 | |
693 | loop { |
694 | match inner.state { |
695 | State::Idle(_) => return Poll::Ready(Ok(inner.pos)), |
696 | State::Busy(ref mut rx) => { |
697 | let (op, buf) = ready!(Pin::new(rx).poll(cx))?; |
698 | inner.state = State::Idle(Some(buf)); |
699 | |
700 | match op { |
701 | Operation::Read(_) => {} |
702 | Operation::Write(Err(e)) => { |
703 | assert!(inner.last_write_err.is_none()); |
704 | inner.last_write_err = Some(e.kind()); |
705 | } |
706 | Operation::Write(_) => {} |
707 | Operation::Seek(res) => { |
708 | if let Ok(pos) = res { |
709 | inner.pos = pos; |
710 | } |
711 | return Poll::Ready(res); |
712 | } |
713 | } |
714 | } |
715 | } |
716 | } |
717 | } |
718 | } |
719 | |
720 | impl AsyncWrite for File { |
721 | fn poll_write( |
722 | self: Pin<&mut Self>, |
723 | cx: &mut Context<'_>, |
724 | src: &[u8], |
725 | ) -> Poll<io::Result<usize>> { |
726 | ready!(crate::trace::trace_leaf(cx)); |
727 | let me = self.get_mut(); |
728 | let inner = me.inner.get_mut(); |
729 | |
730 | if let Some(e) = inner.last_write_err.take() { |
731 | return Poll::Ready(Err(e.into())); |
732 | } |
733 | |
734 | loop { |
735 | match inner.state { |
736 | State::Idle(ref mut buf_cell) => { |
737 | let mut buf = buf_cell.take().unwrap(); |
738 | |
739 | let seek = if !buf.is_empty() { |
740 | Some(SeekFrom::Current(buf.discard_read())) |
741 | } else { |
742 | None |
743 | }; |
744 | |
745 | let n = buf.copy_from(src, me.max_buf_size); |
746 | let std = me.std.clone(); |
747 | |
748 | let blocking_task_join_handle = spawn_mandatory_blocking(move || { |
749 | let res = if let Some(seek) = seek { |
750 | (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) |
751 | } else { |
752 | buf.write_to(&mut &*std) |
753 | }; |
754 | |
755 | (Operation::Write(res), buf) |
756 | }) |
757 | .ok_or_else(|| { |
758 | io::Error::new(io::ErrorKind::Other, "background task failed" ) |
759 | })?; |
760 | |
761 | inner.state = State::Busy(blocking_task_join_handle); |
762 | |
763 | return Poll::Ready(Ok(n)); |
764 | } |
765 | State::Busy(ref mut rx) => { |
766 | let (op, buf) = ready!(Pin::new(rx).poll(cx))?; |
767 | inner.state = State::Idle(Some(buf)); |
768 | |
769 | match op { |
770 | Operation::Read(_) => { |
771 | // We don't care about the result here. The fact |
772 | // that the cursor has advanced will be reflected in |
773 | // the next iteration of the loop |
774 | continue; |
775 | } |
776 | Operation::Write(res) => { |
777 | // If the previous write was successful, continue. |
778 | // Otherwise, error. |
779 | res?; |
780 | continue; |
781 | } |
782 | Operation::Seek(_) => { |
783 | // Ignore the seek |
784 | continue; |
785 | } |
786 | } |
787 | } |
788 | } |
789 | } |
790 | } |
791 | |
792 | fn poll_write_vectored( |
793 | self: Pin<&mut Self>, |
794 | cx: &mut Context<'_>, |
795 | bufs: &[io::IoSlice<'_>], |
796 | ) -> Poll<Result<usize, io::Error>> { |
797 | ready!(crate::trace::trace_leaf(cx)); |
798 | let me = self.get_mut(); |
799 | let inner = me.inner.get_mut(); |
800 | |
801 | if let Some(e) = inner.last_write_err.take() { |
802 | return Poll::Ready(Err(e.into())); |
803 | } |
804 | |
805 | loop { |
806 | match inner.state { |
807 | State::Idle(ref mut buf_cell) => { |
808 | let mut buf = buf_cell.take().unwrap(); |
809 | |
810 | let seek = if !buf.is_empty() { |
811 | Some(SeekFrom::Current(buf.discard_read())) |
812 | } else { |
813 | None |
814 | }; |
815 | |
816 | let n = buf.copy_from_bufs(bufs, me.max_buf_size); |
817 | let std = me.std.clone(); |
818 | |
819 | let blocking_task_join_handle = spawn_mandatory_blocking(move || { |
820 | let res = if let Some(seek) = seek { |
821 | (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) |
822 | } else { |
823 | buf.write_to(&mut &*std) |
824 | }; |
825 | |
826 | (Operation::Write(res), buf) |
827 | }) |
828 | .ok_or_else(|| { |
829 | io::Error::new(io::ErrorKind::Other, "background task failed" ) |
830 | })?; |
831 | |
832 | inner.state = State::Busy(blocking_task_join_handle); |
833 | |
834 | return Poll::Ready(Ok(n)); |
835 | } |
836 | State::Busy(ref mut rx) => { |
837 | let (op, buf) = ready!(Pin::new(rx).poll(cx))?; |
838 | inner.state = State::Idle(Some(buf)); |
839 | |
840 | match op { |
841 | Operation::Read(_) => { |
842 | // We don't care about the result here. The fact |
843 | // that the cursor has advanced will be reflected in |
844 | // the next iteration of the loop |
845 | continue; |
846 | } |
847 | Operation::Write(res) => { |
848 | // If the previous write was successful, continue. |
849 | // Otherwise, error. |
850 | res?; |
851 | continue; |
852 | } |
853 | Operation::Seek(_) => { |
854 | // Ignore the seek |
855 | continue; |
856 | } |
857 | } |
858 | } |
859 | } |
860 | } |
861 | } |
862 | |
863 | fn is_write_vectored(&self) -> bool { |
864 | true |
865 | } |
866 | |
867 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { |
868 | ready!(crate::trace::trace_leaf(cx)); |
869 | let inner = self.inner.get_mut(); |
870 | inner.poll_flush(cx) |
871 | } |
872 | |
873 | fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { |
874 | ready!(crate::trace::trace_leaf(cx)); |
875 | self.poll_flush(cx) |
876 | } |
877 | } |
878 | |
879 | impl From<StdFile> for File { |
880 | fn from(std: StdFile) -> Self { |
881 | Self::from_std(std) |
882 | } |
883 | } |
884 | |
885 | impl fmt::Debug for File { |
886 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
887 | fmt&mut DebugStruct<'_, '_>.debug_struct("tokio::fs::File" ) |
888 | .field(name:"std" , &self.std) |
889 | .finish() |
890 | } |
891 | } |
892 | |
893 | #[cfg (unix)] |
894 | impl std::os::unix::io::AsRawFd for File { |
895 | fn as_raw_fd(&self) -> std::os::unix::io::RawFd { |
896 | self.std.as_raw_fd() |
897 | } |
898 | } |
899 | |
900 | #[cfg (unix)] |
901 | impl std::os::unix::io::AsFd for File { |
902 | fn as_fd(&self) -> std::os::unix::io::BorrowedFd<'_> { |
903 | unsafe { |
904 | std::os::unix::io::BorrowedFd::borrow_raw(fd:std::os::unix::io::AsRawFd::as_raw_fd(self)) |
905 | } |
906 | } |
907 | } |
908 | |
909 | #[cfg (unix)] |
910 | impl std::os::unix::io::FromRawFd for File { |
911 | unsafe fn from_raw_fd(fd: std::os::unix::io::RawFd) -> Self { |
912 | StdFile::from_raw_fd(fd).into() |
913 | } |
914 | } |
915 | |
916 | cfg_windows! { |
917 | use crate::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle, AsHandle, BorrowedHandle}; |
918 | |
919 | impl AsRawHandle for File { |
920 | fn as_raw_handle(&self) -> RawHandle { |
921 | self.std.as_raw_handle() |
922 | } |
923 | } |
924 | |
925 | impl AsHandle for File { |
926 | fn as_handle(&self) -> BorrowedHandle<'_> { |
927 | unsafe { |
928 | BorrowedHandle::borrow_raw( |
929 | AsRawHandle::as_raw_handle(self), |
930 | ) |
931 | } |
932 | } |
933 | } |
934 | |
935 | impl FromRawHandle for File { |
936 | unsafe fn from_raw_handle(handle: RawHandle) -> Self { |
937 | StdFile::from_raw_handle(handle).into() |
938 | } |
939 | } |
940 | } |
941 | |
942 | impl Inner { |
943 | async fn complete_inflight(&mut self) { |
944 | use std::future::poll_fn; |
945 | |
946 | poll_fn(|cx| self.poll_complete_inflight(cx)).await; |
947 | } |
948 | |
949 | fn poll_complete_inflight(&mut self, cx: &mut Context<'_>) -> Poll<()> { |
950 | ready!(crate::trace::trace_leaf(cx)); |
951 | match self.poll_flush(cx) { |
952 | Poll::Ready(Err(e)) => { |
953 | self.last_write_err = Some(e.kind()); |
954 | Poll::Ready(()) |
955 | } |
956 | Poll::Ready(Ok(())) => Poll::Ready(()), |
957 | Poll::Pending => Poll::Pending, |
958 | } |
959 | } |
960 | |
961 | fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { |
962 | if let Some(e) = self.last_write_err.take() { |
963 | return Poll::Ready(Err(e.into())); |
964 | } |
965 | |
966 | let (op, buf) = match self.state { |
967 | State::Idle(_) => return Poll::Ready(Ok(())), |
968 | State::Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?, |
969 | }; |
970 | |
971 | // The buffer is not used here |
972 | self.state = State::Idle(Some(buf)); |
973 | |
974 | match op { |
975 | Operation::Read(_) => Poll::Ready(Ok(())), |
976 | Operation::Write(res) => Poll::Ready(res), |
977 | Operation::Seek(_) => Poll::Ready(Ok(())), |
978 | } |
979 | } |
980 | } |
981 | |
982 | #[cfg (test)] |
983 | mod tests; |
984 | |