1 | #![cfg (feature = "full" )] |
2 | #![cfg (all(windows))] |
3 | |
4 | use std::io; |
5 | use std::time::Duration; |
6 | use tokio::io::{AsyncReadExt, AsyncWriteExt}; |
7 | use tokio::net::windows::named_pipe::{ClientOptions, PipeMode, ServerOptions}; |
8 | use tokio::time; |
9 | use windows_sys::Win32::Foundation::{ERROR_NO_DATA, ERROR_PIPE_BUSY}; |
10 | |
11 | #[tokio::test ] |
12 | async fn test_named_pipe_client_drop() -> io::Result<()> { |
13 | const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-client-drop" ; |
14 | |
15 | let mut server = ServerOptions::new().create(PIPE_NAME)?; |
16 | |
17 | let client = ClientOptions::new().open(PIPE_NAME)?; |
18 | |
19 | server.connect().await?; |
20 | drop(client); |
21 | |
22 | // instance will be broken because client is gone |
23 | match server.write_all(b"ping" ).await { |
24 | Err(e) if e.raw_os_error() == Some(ERROR_NO_DATA as i32) => (), |
25 | x => panic!("{:?}" , x), |
26 | } |
27 | |
28 | Ok(()) |
29 | } |
30 | |
31 | #[tokio::test ] |
32 | async fn test_named_pipe_single_client() -> io::Result<()> { |
33 | use tokio::io::{AsyncBufReadExt as _, BufReader}; |
34 | |
35 | const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-single-client" ; |
36 | |
37 | let server = ServerOptions::new().create(PIPE_NAME)?; |
38 | |
39 | let server = tokio::spawn(async move { |
40 | // Note: we wait for a client to connect. |
41 | server.connect().await?; |
42 | |
43 | let mut server = BufReader::new(server); |
44 | |
45 | let mut buf = String::new(); |
46 | server.read_line(&mut buf).await?; |
47 | server.write_all(b"pong \n" ).await?; |
48 | Ok::<_, io::Error>(buf) |
49 | }); |
50 | |
51 | let client = tokio::spawn(async move { |
52 | let client = ClientOptions::new().open(PIPE_NAME)?; |
53 | |
54 | let mut client = BufReader::new(client); |
55 | |
56 | let mut buf = String::new(); |
57 | client.write_all(b"ping \n" ).await?; |
58 | client.read_line(&mut buf).await?; |
59 | Ok::<_, io::Error>(buf) |
60 | }); |
61 | |
62 | let (server, client) = tokio::try_join!(server, client)?; |
63 | |
64 | assert_eq!(server?, "ping \n" ); |
65 | assert_eq!(client?, "pong \n" ); |
66 | |
67 | Ok(()) |
68 | } |
69 | |
70 | #[tokio::test ] |
71 | async fn test_named_pipe_multi_client() -> io::Result<()> { |
72 | use tokio::io::{AsyncBufReadExt as _, BufReader}; |
73 | |
74 | const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-multi-client" ; |
75 | const N: usize = 10; |
76 | |
77 | // The first server needs to be constructed early so that clients can |
78 | // be correctly connected. Otherwise calling .wait will cause the client to |
79 | // error. |
80 | let mut server = ServerOptions::new().create(PIPE_NAME)?; |
81 | |
82 | let server = tokio::spawn(async move { |
83 | for _ in 0..N { |
84 | // Wait for client to connect. |
85 | server.connect().await?; |
86 | let mut inner = BufReader::new(server); |
87 | |
88 | // Construct the next server to be connected before sending the one |
89 | // we already have of onto a task. This ensures that the server |
90 | // isn't closed (after it's done in the task) before a new one is |
91 | // available. Otherwise the client might error with |
92 | // `io::ErrorKind::NotFound`. |
93 | server = ServerOptions::new().create(PIPE_NAME)?; |
94 | |
95 | let _ = tokio::spawn(async move { |
96 | let mut buf = String::new(); |
97 | inner.read_line(&mut buf).await?; |
98 | inner.write_all(b"pong \n" ).await?; |
99 | inner.flush().await?; |
100 | Ok::<_, io::Error>(()) |
101 | }); |
102 | } |
103 | |
104 | Ok::<_, io::Error>(()) |
105 | }); |
106 | |
107 | let mut clients = Vec::new(); |
108 | |
109 | for _ in 0..N { |
110 | clients.push(tokio::spawn(async move { |
111 | // This showcases a generic connect loop. |
112 | // |
113 | // We immediately try to create a client, if it's not found or the |
114 | // pipe is busy we use the specialized wait function on the client |
115 | // builder. |
116 | let client = loop { |
117 | match ClientOptions::new().open(PIPE_NAME) { |
118 | Ok(client) => break client, |
119 | Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (), |
120 | Err(e) if e.kind() == io::ErrorKind::NotFound => (), |
121 | Err(e) => return Err(e), |
122 | } |
123 | |
124 | // Wait for a named pipe to become available. |
125 | time::sleep(Duration::from_millis(10)).await; |
126 | }; |
127 | |
128 | let mut client = BufReader::new(client); |
129 | |
130 | let mut buf = String::new(); |
131 | client.write_all(b"ping \n" ).await?; |
132 | client.flush().await?; |
133 | client.read_line(&mut buf).await?; |
134 | Ok::<_, io::Error>(buf) |
135 | })); |
136 | } |
137 | |
138 | for client in clients { |
139 | let result = client.await?; |
140 | assert_eq!(result?, "pong \n" ); |
141 | } |
142 | |
143 | server.await??; |
144 | Ok(()) |
145 | } |
146 | |
147 | #[tokio::test ] |
148 | async fn test_named_pipe_multi_client_ready() -> io::Result<()> { |
149 | use tokio::io::Interest; |
150 | |
151 | const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-multi-client-ready" ; |
152 | const N: usize = 10; |
153 | |
154 | // The first server needs to be constructed early so that clients can |
155 | // be correctly connected. Otherwise calling .wait will cause the client to |
156 | // error. |
157 | let mut server = ServerOptions::new().create(PIPE_NAME)?; |
158 | |
159 | let server = tokio::spawn(async move { |
160 | for _ in 0..N { |
161 | // Wait for client to connect. |
162 | server.connect().await?; |
163 | |
164 | let inner_server = server; |
165 | |
166 | // Construct the next server to be connected before sending the one |
167 | // we already have of onto a task. This ensures that the server |
168 | // isn't closed (after it's done in the task) before a new one is |
169 | // available. Otherwise the client might error with |
170 | // `io::ErrorKind::NotFound`. |
171 | server = ServerOptions::new().create(PIPE_NAME)?; |
172 | |
173 | let _ = tokio::spawn(async move { |
174 | let server = inner_server; |
175 | |
176 | { |
177 | let mut read_buf = [0u8; 5]; |
178 | let mut read_buf_cursor = 0; |
179 | |
180 | loop { |
181 | server.readable().await?; |
182 | |
183 | let buf = &mut read_buf[read_buf_cursor..]; |
184 | |
185 | match server.try_read(buf) { |
186 | Ok(n) => { |
187 | read_buf_cursor += n; |
188 | |
189 | if read_buf_cursor == read_buf.len() { |
190 | break; |
191 | } |
192 | } |
193 | Err(e) if e.kind() == io::ErrorKind::WouldBlock => { |
194 | continue; |
195 | } |
196 | Err(e) => { |
197 | return Err(e); |
198 | } |
199 | } |
200 | } |
201 | }; |
202 | |
203 | { |
204 | let write_buf = b"pong \n" ; |
205 | let mut write_buf_cursor = 0; |
206 | |
207 | loop { |
208 | server.writable().await?; |
209 | let buf = &write_buf[write_buf_cursor..]; |
210 | |
211 | match server.try_write(buf) { |
212 | Ok(n) => { |
213 | write_buf_cursor += n; |
214 | |
215 | if write_buf_cursor == write_buf.len() { |
216 | break; |
217 | } |
218 | } |
219 | Err(e) if e.kind() == io::ErrorKind::WouldBlock => { |
220 | continue; |
221 | } |
222 | Err(e) => { |
223 | return Err(e); |
224 | } |
225 | } |
226 | } |
227 | } |
228 | |
229 | Ok::<_, io::Error>(()) |
230 | }); |
231 | } |
232 | |
233 | Ok::<_, io::Error>(()) |
234 | }); |
235 | |
236 | let mut clients = Vec::new(); |
237 | |
238 | for _ in 0..N { |
239 | clients.push(tokio::spawn(async move { |
240 | // This showcases a generic connect loop. |
241 | // |
242 | // We immediately try to create a client, if it's not found or the |
243 | // pipe is busy we use the specialized wait function on the client |
244 | // builder. |
245 | let client = loop { |
246 | match ClientOptions::new().open(PIPE_NAME) { |
247 | Ok(client) => break client, |
248 | Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (), |
249 | Err(e) if e.kind() == io::ErrorKind::NotFound => (), |
250 | Err(e) => return Err(e), |
251 | } |
252 | |
253 | // Wait for a named pipe to become available. |
254 | time::sleep(Duration::from_millis(10)).await; |
255 | }; |
256 | |
257 | let mut read_buf = [0u8; 5]; |
258 | let mut read_buf_cursor = 0; |
259 | let write_buf = b"ping \n" ; |
260 | let mut write_buf_cursor = 0; |
261 | |
262 | loop { |
263 | let mut interest = Interest::READABLE; |
264 | if write_buf_cursor < write_buf.len() { |
265 | interest |= Interest::WRITABLE; |
266 | } |
267 | |
268 | let ready = client.ready(interest).await?; |
269 | |
270 | if ready.is_readable() { |
271 | let buf = &mut read_buf[read_buf_cursor..]; |
272 | |
273 | match client.try_read(buf) { |
274 | Ok(n) => { |
275 | read_buf_cursor += n; |
276 | |
277 | if read_buf_cursor == read_buf.len() { |
278 | break; |
279 | } |
280 | } |
281 | Err(e) if e.kind() == io::ErrorKind::WouldBlock => { |
282 | continue; |
283 | } |
284 | Err(e) => { |
285 | return Err(e); |
286 | } |
287 | } |
288 | } |
289 | |
290 | if ready.is_writable() { |
291 | let buf = &write_buf[write_buf_cursor..]; |
292 | |
293 | if buf.is_empty() { |
294 | continue; |
295 | } |
296 | |
297 | match client.try_write(buf) { |
298 | Ok(n) => { |
299 | write_buf_cursor += n; |
300 | } |
301 | Err(e) if e.kind() == io::ErrorKind::WouldBlock => { |
302 | continue; |
303 | } |
304 | Err(e) => { |
305 | return Err(e); |
306 | } |
307 | } |
308 | } |
309 | } |
310 | |
311 | let buf = String::from_utf8_lossy(&read_buf).into_owned(); |
312 | |
313 | Ok::<_, io::Error>(buf) |
314 | })); |
315 | } |
316 | |
317 | for client in clients { |
318 | let result = client.await?; |
319 | assert_eq!(result?, "pong \n" ); |
320 | } |
321 | |
322 | server.await??; |
323 | Ok(()) |
324 | } |
325 | |
326 | // This tests that message mode works as expected. |
327 | #[tokio::test ] |
328 | async fn test_named_pipe_mode_message() -> io::Result<()> { |
329 | // it's easy to accidentally get a seemingly working test here because byte pipes |
330 | // often return contents at write boundaries. to make sure we're doing the right thing we |
331 | // explicitly test that it doesn't work in byte mode. |
332 | _named_pipe_mode_message(PipeMode::Message).await?; |
333 | _named_pipe_mode_message(PipeMode::Byte).await |
334 | } |
335 | |
336 | async fn _named_pipe_mode_message(mode: PipeMode) -> io::Result<()> { |
337 | let pipe_name = format!( |
338 | r"\\.\pipe\test-named-pipe-mode-message-{}" , |
339 | matches!(mode, PipeMode::Message) |
340 | ); |
341 | let mut buf = [0u8; 32]; |
342 | |
343 | let mut server = ServerOptions::new() |
344 | .first_pipe_instance(true) |
345 | .pipe_mode(mode) |
346 | .create(&pipe_name)?; |
347 | |
348 | let mut client = ClientOptions::new().pipe_mode(mode).open(&pipe_name)?; |
349 | |
350 | server.connect().await?; |
351 | |
352 | // this needs a few iterations, presumably Windows waits for a few calls before merging buffers |
353 | for _ in 0..10 { |
354 | client.write_all(b"hello" ).await?; |
355 | server.write_all(b"world" ).await?; |
356 | } |
357 | for _ in 0..10 { |
358 | let n = server.read(&mut buf).await?; |
359 | if buf[..n] != b"hello" [..] { |
360 | assert!(matches!(mode, PipeMode::Byte)); |
361 | return Ok(()); |
362 | } |
363 | let n = client.read(&mut buf).await?; |
364 | if buf[..n] != b"world" [..] { |
365 | assert!(matches!(mode, PipeMode::Byte)); |
366 | return Ok(()); |
367 | } |
368 | } |
369 | // byte mode should have errored before. |
370 | assert!(matches!(mode, PipeMode::Message)); |
371 | Ok(()) |
372 | } |
373 | |
374 | // This tests `NamedPipeServer::connect` with various access settings. |
375 | #[tokio::test ] |
376 | async fn test_named_pipe_access() -> io::Result<()> { |
377 | const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-access" ; |
378 | |
379 | for (inb, outb) in [(true, true), (true, false), (false, true)] { |
380 | let (tx, rx) = tokio::sync::oneshot::channel(); |
381 | let server = tokio::spawn(async move { |
382 | let s = ServerOptions::new() |
383 | .access_inbound(inb) |
384 | .access_outbound(outb) |
385 | .create(PIPE_NAME)?; |
386 | let mut connect_fut = tokio_test::task::spawn(s.connect()); |
387 | assert!(connect_fut.poll().is_pending()); |
388 | tx.send(()).unwrap(); |
389 | connect_fut.await |
390 | }); |
391 | |
392 | // Wait for the server to call connect. |
393 | rx.await.unwrap(); |
394 | let _ = ClientOptions::new().read(outb).write(inb).open(PIPE_NAME)?; |
395 | |
396 | server.await??; |
397 | } |
398 | Ok(()) |
399 | } |
400 | |