1 | use crate::fs::asyncify; |
2 | |
3 | use std::collections::VecDeque; |
4 | use std::ffi::OsString; |
5 | use std::fs::{FileType, Metadata}; |
6 | use std::future::Future; |
7 | use std::io; |
8 | use std::path::{Path, PathBuf}; |
9 | use std::pin::Pin; |
10 | use std::sync::Arc; |
11 | use std::task::{ready, Context, Poll}; |
12 | |
13 | #[cfg (test)] |
14 | use super::mocks::spawn_blocking; |
15 | #[cfg (test)] |
16 | use super::mocks::JoinHandle; |
17 | #[cfg (not(test))] |
18 | use crate::blocking::spawn_blocking; |
19 | #[cfg (not(test))] |
20 | use crate::blocking::JoinHandle; |
21 | |
22 | const CHUNK_SIZE: usize = 32; |
23 | |
24 | /// Returns a stream over the entries within a directory. |
25 | /// |
26 | /// This is an async version of [`std::fs::read_dir`]. |
27 | /// |
28 | /// This operation is implemented by running the equivalent blocking |
29 | /// operation on a separate thread pool using [`spawn_blocking`]. |
30 | /// |
31 | /// [`spawn_blocking`]: crate::task::spawn_blocking |
32 | pub async fn read_dir(path: impl AsRef<Path>) -> io::Result<ReadDir> { |
33 | let path: PathBuf = path.as_ref().to_owned(); |
34 | asyncifyimpl Future(|| -> io::Result<ReadDir> { |
35 | let mut std: ReadDir = std::fs::read_dir(path)?; |
36 | let mut buf: VecDeque> = VecDeque::with_capacity(CHUNK_SIZE); |
37 | let remain: bool = ReadDir::next_chunk(&mut buf, &mut std); |
38 | |
39 | Ok(ReadDir(State::Idle(Some((buf, std, remain))))) |
40 | }) |
41 | .await |
42 | } |
43 | |
44 | /// Reads the entries in a directory. |
45 | /// |
46 | /// This struct is returned from the [`read_dir`] function of this module and |
47 | /// will yield instances of [`DirEntry`]. Through a [`DirEntry`] information |
48 | /// like the entry's path and possibly other metadata can be learned. |
49 | /// |
50 | /// A `ReadDir` can be turned into a `Stream` with [`ReadDirStream`]. |
51 | /// |
52 | /// [`ReadDirStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.ReadDirStream.html |
53 | /// |
54 | /// # Errors |
55 | /// |
56 | /// This stream will return an [`Err`] if there's some sort of intermittent |
57 | /// IO error during iteration. |
58 | /// |
59 | /// [`read_dir`]: read_dir |
60 | /// [`DirEntry`]: DirEntry |
61 | /// [`Err`]: std::result::Result::Err |
62 | #[derive (Debug)] |
63 | #[must_use = "streams do nothing unless polled" ] |
64 | pub struct ReadDir(State); |
65 | |
66 | #[derive (Debug)] |
67 | enum State { |
68 | Idle(Option<(VecDeque<io::Result<DirEntry>>, std::fs::ReadDir, bool)>), |
69 | Pending(JoinHandle<(VecDeque<io::Result<DirEntry>>, std::fs::ReadDir, bool)>), |
70 | } |
71 | |
72 | impl ReadDir { |
73 | /// Returns the next entry in the directory stream. |
74 | /// |
75 | /// # Cancel safety |
76 | /// |
77 | /// This method is cancellation safe. |
78 | pub async fn next_entry(&mut self) -> io::Result<Option<DirEntry>> { |
79 | use std::future::poll_fn; |
80 | poll_fn(|cx| self.poll_next_entry(cx)).await |
81 | } |
82 | |
83 | /// Polls for the next directory entry in the stream. |
84 | /// |
85 | /// This method returns: |
86 | /// |
87 | /// * `Poll::Pending` if the next directory entry is not yet available. |
88 | /// * `Poll::Ready(Ok(Some(entry)))` if the next directory entry is available. |
89 | /// * `Poll::Ready(Ok(None))` if there are no more directory entries in this |
90 | /// stream. |
91 | /// * `Poll::Ready(Err(err))` if an IO error occurred while reading the next |
92 | /// directory entry. |
93 | /// |
94 | /// When the method returns `Poll::Pending`, the `Waker` in the provided |
95 | /// `Context` is scheduled to receive a wakeup when the next directory entry |
96 | /// becomes available on the underlying IO resource. |
97 | /// |
98 | /// Note that on multiple calls to `poll_next_entry`, only the `Waker` from |
99 | /// the `Context` passed to the most recent call is scheduled to receive a |
100 | /// wakeup. |
101 | pub fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<Option<DirEntry>>> { |
102 | loop { |
103 | match self.0 { |
104 | State::Idle(ref mut data) => { |
105 | let (buf, _, ref remain) = data.as_mut().unwrap(); |
106 | |
107 | if let Some(ent) = buf.pop_front() { |
108 | return Poll::Ready(ent.map(Some)); |
109 | } else if !remain { |
110 | return Poll::Ready(Ok(None)); |
111 | } |
112 | |
113 | let (mut buf, mut std, _) = data.take().unwrap(); |
114 | |
115 | self.0 = State::Pending(spawn_blocking(move || { |
116 | let remain = ReadDir::next_chunk(&mut buf, &mut std); |
117 | (buf, std, remain) |
118 | })); |
119 | } |
120 | State::Pending(ref mut rx) => { |
121 | self.0 = State::Idle(Some(ready!(Pin::new(rx).poll(cx))?)); |
122 | } |
123 | } |
124 | } |
125 | } |
126 | |
127 | fn next_chunk(buf: &mut VecDeque<io::Result<DirEntry>>, std: &mut std::fs::ReadDir) -> bool { |
128 | for _ in 0..CHUNK_SIZE { |
129 | let ret = match std.next() { |
130 | Some(ret) => ret, |
131 | None => return false, |
132 | }; |
133 | |
134 | let success = ret.is_ok(); |
135 | |
136 | buf.push_back(ret.map(|std| DirEntry { |
137 | #[cfg (not(any( |
138 | target_os = "solaris" , |
139 | target_os = "illumos" , |
140 | target_os = "haiku" , |
141 | target_os = "vxworks" , |
142 | target_os = "aix" , |
143 | target_os = "nto" , |
144 | target_os = "vita" , |
145 | )))] |
146 | file_type: std.file_type().ok(), |
147 | std: Arc::new(std), |
148 | })); |
149 | |
150 | if !success { |
151 | break; |
152 | } |
153 | } |
154 | |
155 | true |
156 | } |
157 | } |
158 | |
159 | feature! { |
160 | #![unix] |
161 | |
162 | use std::os::unix::fs::DirEntryExt; |
163 | |
164 | impl DirEntry { |
165 | /// Returns the underlying `d_ino` field in the contained `dirent` |
166 | /// structure. |
167 | /// |
168 | /// # Examples |
169 | /// |
170 | /// ``` |
171 | /// use tokio::fs; |
172 | /// |
173 | /// # #[tokio::main] |
174 | /// # async fn main() -> std::io::Result<()> { |
175 | /// let mut entries = fs::read_dir(".").await?; |
176 | /// while let Some(entry) = entries.next_entry().await? { |
177 | /// // Here, `entry` is a `DirEntry`. |
178 | /// println!("{:?}: {}", entry.file_name(), entry.ino()); |
179 | /// } |
180 | /// # Ok(()) |
181 | /// # } |
182 | /// ``` |
183 | pub fn ino(&self) -> u64 { |
184 | self.as_inner().ino() |
185 | } |
186 | } |
187 | } |
188 | |
189 | /// Entries returned by the [`ReadDir`] stream. |
190 | /// |
191 | /// [`ReadDir`]: struct@ReadDir |
192 | /// |
193 | /// This is a specialized version of [`std::fs::DirEntry`] for usage from the |
194 | /// Tokio runtime. |
195 | /// |
196 | /// An instance of `DirEntry` represents an entry inside of a directory on the |
197 | /// filesystem. Each entry can be inspected via methods to learn about the full |
198 | /// path or possibly other metadata through per-platform extension traits. |
199 | #[derive (Debug)] |
200 | pub struct DirEntry { |
201 | #[cfg (not(any( |
202 | target_os = "solaris" , |
203 | target_os = "illumos" , |
204 | target_os = "haiku" , |
205 | target_os = "vxworks" , |
206 | target_os = "aix" , |
207 | target_os = "nto" , |
208 | target_os = "vita" , |
209 | )))] |
210 | file_type: Option<FileType>, |
211 | std: Arc<std::fs::DirEntry>, |
212 | } |
213 | |
214 | impl DirEntry { |
215 | /// Returns the full path to the file that this entry represents. |
216 | /// |
217 | /// The full path is created by joining the original path to `read_dir` |
218 | /// with the filename of this entry. |
219 | /// |
220 | /// # Examples |
221 | /// |
222 | /// ```no_run |
223 | /// use tokio::fs; |
224 | /// |
225 | /// # async fn dox() -> std::io::Result<()> { |
226 | /// let mut entries = fs::read_dir("." ).await?; |
227 | /// |
228 | /// while let Some(entry) = entries.next_entry().await? { |
229 | /// println!("{:?}" , entry.path()); |
230 | /// } |
231 | /// # Ok(()) |
232 | /// # } |
233 | /// ``` |
234 | /// |
235 | /// This prints output like: |
236 | /// |
237 | /// ```text |
238 | /// "./whatever.txt" |
239 | /// "./foo.html" |
240 | /// "./hello_world.rs" |
241 | /// ``` |
242 | /// |
243 | /// The exact text, of course, depends on what files you have in `.`. |
244 | pub fn path(&self) -> PathBuf { |
245 | self.std.path() |
246 | } |
247 | |
248 | /// Returns the bare file name of this directory entry without any other |
249 | /// leading path component. |
250 | /// |
251 | /// # Examples |
252 | /// |
253 | /// ``` |
254 | /// use tokio::fs; |
255 | /// |
256 | /// # async fn dox() -> std::io::Result<()> { |
257 | /// let mut entries = fs::read_dir("." ).await?; |
258 | /// |
259 | /// while let Some(entry) = entries.next_entry().await? { |
260 | /// println!("{:?}" , entry.file_name()); |
261 | /// } |
262 | /// # Ok(()) |
263 | /// # } |
264 | /// ``` |
265 | pub fn file_name(&self) -> OsString { |
266 | self.std.file_name() |
267 | } |
268 | |
269 | /// Returns the metadata for the file that this entry points at. |
270 | /// |
271 | /// This function will not traverse symlinks if this entry points at a |
272 | /// symlink. |
273 | /// |
274 | /// # Platform-specific behavior |
275 | /// |
276 | /// On Windows this function is cheap to call (no extra system calls |
277 | /// needed), but on Unix platforms this function is the equivalent of |
278 | /// calling `symlink_metadata` on the path. |
279 | /// |
280 | /// # Examples |
281 | /// |
282 | /// ``` |
283 | /// use tokio::fs; |
284 | /// |
285 | /// # async fn dox() -> std::io::Result<()> { |
286 | /// let mut entries = fs::read_dir("." ).await?; |
287 | /// |
288 | /// while let Some(entry) = entries.next_entry().await? { |
289 | /// if let Ok(metadata) = entry.metadata().await { |
290 | /// // Now let's show our entry's permissions! |
291 | /// println!("{:?}: {:?}" , entry.path(), metadata.permissions()); |
292 | /// } else { |
293 | /// println!("Couldn't get file type for {:?}" , entry.path()); |
294 | /// } |
295 | /// } |
296 | /// # Ok(()) |
297 | /// # } |
298 | /// ``` |
299 | pub async fn metadata(&self) -> io::Result<Metadata> { |
300 | let std = self.std.clone(); |
301 | asyncify(move || std.metadata()).await |
302 | } |
303 | |
304 | /// Returns the file type for the file that this entry points at. |
305 | /// |
306 | /// This function will not traverse symlinks if this entry points at a |
307 | /// symlink. |
308 | /// |
309 | /// # Platform-specific behavior |
310 | /// |
311 | /// On Windows and most Unix platforms this function is free (no extra |
312 | /// system calls needed), but some Unix platforms may require the equivalent |
313 | /// call to `symlink_metadata` to learn about the target file type. |
314 | /// |
315 | /// # Examples |
316 | /// |
317 | /// ``` |
318 | /// use tokio::fs; |
319 | /// |
320 | /// # async fn dox() -> std::io::Result<()> { |
321 | /// let mut entries = fs::read_dir("." ).await?; |
322 | /// |
323 | /// while let Some(entry) = entries.next_entry().await? { |
324 | /// if let Ok(file_type) = entry.file_type().await { |
325 | /// // Now let's show our entry's file type! |
326 | /// println!("{:?}: {:?}" , entry.path(), file_type); |
327 | /// } else { |
328 | /// println!("Couldn't get file type for {:?}" , entry.path()); |
329 | /// } |
330 | /// } |
331 | /// # Ok(()) |
332 | /// # } |
333 | /// ``` |
334 | pub async fn file_type(&self) -> io::Result<FileType> { |
335 | #[cfg (not(any( |
336 | target_os = "solaris" , |
337 | target_os = "illumos" , |
338 | target_os = "haiku" , |
339 | target_os = "vxworks" , |
340 | target_os = "aix" , |
341 | target_os = "nto" , |
342 | target_os = "vita" , |
343 | )))] |
344 | if let Some(file_type) = self.file_type { |
345 | return Ok(file_type); |
346 | } |
347 | |
348 | let std = self.std.clone(); |
349 | asyncify(move || std.file_type()).await |
350 | } |
351 | |
352 | /// Returns a reference to the underlying `std::fs::DirEntry`. |
353 | #[cfg (unix)] |
354 | pub(super) fn as_inner(&self) -> &std::fs::DirEntry { |
355 | &self.std |
356 | } |
357 | } |
358 | |