1 | //! HTTP/2 client connections |
2 | |
3 | use std::error::Error; |
4 | use std::fmt; |
5 | use std::future::Future; |
6 | use std::marker::PhantomData; |
7 | use std::pin::Pin; |
8 | use std::sync::Arc; |
9 | use std::task::{Context, Poll}; |
10 | use std::time::Duration; |
11 | |
12 | use crate::rt::{Read, Write}; |
13 | use futures_util::ready; |
14 | use http::{Request, Response}; |
15 | |
16 | use super::super::dispatch::{self, TrySendError}; |
17 | use crate::body::{Body, Incoming as IncomingBody}; |
18 | use crate::common::time::Time; |
19 | use crate::proto; |
20 | use crate::rt::bounds::Http2ClientConnExec; |
21 | use crate::rt::Timer; |
22 | |
23 | /// The sender side of an established connection. |
24 | pub struct SendRequest<B> { |
25 | dispatch: dispatch::UnboundedSender<Request<B>, Response<IncomingBody>>, |
26 | } |
27 | |
28 | impl<B> Clone for SendRequest<B> { |
29 | fn clone(&self) -> SendRequest<B> { |
30 | SendRequest { |
31 | dispatch: self.dispatch.clone(), |
32 | } |
33 | } |
34 | } |
35 | |
36 | /// A future that processes all HTTP state for the IO object. |
37 | /// |
38 | /// In most cases, this should just be spawned into an executor, so that it |
39 | /// can process incoming and outgoing messages, notice hangups, and the like. |
40 | /// |
41 | /// Instances of this type are typically created via the [`handshake`] function |
42 | #[must_use = "futures do nothing unless polled" ] |
43 | pub struct Connection<T, B, E> |
44 | where |
45 | T: Read + Write + Unpin, |
46 | B: Body + 'static, |
47 | E: Http2ClientConnExec<B, T> + Unpin, |
48 | B::Error: Into<Box<dyn Error + Send + Sync>>, |
49 | { |
50 | inner: (PhantomData<T>, proto::h2::ClientTask<B, E, T>), |
51 | } |
52 | |
53 | /// A builder to configure an HTTP connection. |
54 | /// |
55 | /// After setting options, the builder is used to create a handshake future. |
56 | /// |
57 | /// **Note**: The default values of options are *not considered stable*. They |
58 | /// are subject to change at any time. |
59 | #[derive (Clone, Debug)] |
60 | pub struct Builder<Ex> { |
61 | pub(super) exec: Ex, |
62 | pub(super) timer: Time, |
63 | h2_builder: proto::h2::client::Config, |
64 | } |
65 | |
66 | /// Returns a handshake future over some IO. |
67 | /// |
68 | /// This is a shortcut for `Builder::new(exec).handshake(io)`. |
69 | /// See [`client::conn`](crate::client::conn) for more. |
70 | pub async fn handshake<E, T, B>( |
71 | exec: E, |
72 | io: T, |
73 | ) -> crate::Result<(SendRequest<B>, Connection<T, B, E>)> |
74 | where |
75 | T: Read + Write + Unpin, |
76 | B: Body + 'static, |
77 | B::Data: Send, |
78 | B::Error: Into<Box<dyn Error + Send + Sync>>, |
79 | E: Http2ClientConnExec<B, T> + Unpin + Clone, |
80 | { |
81 | Builder::new(exec).handshake(io).await |
82 | } |
83 | |
84 | // ===== impl SendRequest |
85 | |
86 | impl<B> SendRequest<B> { |
87 | /// Polls to determine whether this sender can be used yet for a request. |
88 | /// |
89 | /// If the associated connection is closed, this returns an Error. |
90 | pub fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<crate::Result<()>> { |
91 | if self.is_closed() { |
92 | Poll::Ready(Err(crate::Error::new_closed())) |
93 | } else { |
94 | Poll::Ready(Ok(())) |
95 | } |
96 | } |
97 | |
98 | /// Waits until the dispatcher is ready |
99 | /// |
100 | /// If the associated connection is closed, this returns an Error. |
101 | pub async fn ready(&mut self) -> crate::Result<()> { |
102 | futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await |
103 | } |
104 | |
105 | /// Checks if the connection is currently ready to send a request. |
106 | /// |
107 | /// # Note |
108 | /// |
109 | /// This is mostly a hint. Due to inherent latency of networks, it is |
110 | /// possible that even after checking this is ready, sending a request |
111 | /// may still fail because the connection was closed in the meantime. |
112 | pub fn is_ready(&self) -> bool { |
113 | self.dispatch.is_ready() |
114 | } |
115 | |
116 | /// Checks if the connection side has been closed. |
117 | pub fn is_closed(&self) -> bool { |
118 | self.dispatch.is_closed() |
119 | } |
120 | } |
121 | |
122 | impl<B> SendRequest<B> |
123 | where |
124 | B: Body + 'static, |
125 | { |
126 | /// Sends a `Request` on the associated connection. |
127 | /// |
128 | /// Returns a future that if successful, yields the `Response`. |
129 | /// |
130 | /// `req` must have a `Host` header. |
131 | /// |
132 | /// Absolute-form `Uri`s are not required. If received, they will be serialized |
133 | /// as-is. |
134 | pub fn send_request( |
135 | &mut self, |
136 | req: Request<B>, |
137 | ) -> impl Future<Output = crate::Result<Response<IncomingBody>>> { |
138 | let sent = self.dispatch.send(req); |
139 | |
140 | async move { |
141 | match sent { |
142 | Ok(rx) => match rx.await { |
143 | Ok(Ok(resp)) => Ok(resp), |
144 | Ok(Err(err)) => Err(err), |
145 | // this is definite bug if it happens, but it shouldn't happen! |
146 | Err(_canceled) => panic!("dispatch dropped without returning error" ), |
147 | }, |
148 | Err(_req) => { |
149 | debug!("connection was not ready" ); |
150 | |
151 | Err(crate::Error::new_canceled().with("connection was not ready" )) |
152 | } |
153 | } |
154 | } |
155 | } |
156 | |
157 | /// Sends a `Request` on the associated connection. |
158 | /// |
159 | /// Returns a future that if successful, yields the `Response`. |
160 | /// |
161 | /// # Error |
162 | /// |
163 | /// If there was an error before trying to serialize the request to the |
164 | /// connection, the message will be returned as part of this error. |
165 | pub fn try_send_request( |
166 | &mut self, |
167 | req: Request<B>, |
168 | ) -> impl Future<Output = Result<Response<IncomingBody>, TrySendError<Request<B>>>> { |
169 | let sent = self.dispatch.try_send(req); |
170 | async move { |
171 | match sent { |
172 | Ok(rx) => match rx.await { |
173 | Ok(Ok(res)) => Ok(res), |
174 | Ok(Err(err)) => Err(err), |
175 | // this is definite bug if it happens, but it shouldn't happen! |
176 | Err(_) => panic!("dispatch dropped without returning error" ), |
177 | }, |
178 | Err(req) => { |
179 | debug!("connection was not ready" ); |
180 | let error = crate::Error::new_canceled().with("connection was not ready" ); |
181 | Err(TrySendError { |
182 | error, |
183 | message: Some(req), |
184 | }) |
185 | } |
186 | } |
187 | } |
188 | } |
189 | } |
190 | |
191 | impl<B> fmt::Debug for SendRequest<B> { |
192 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
193 | f.debug_struct(name:"SendRequest" ).finish() |
194 | } |
195 | } |
196 | |
197 | // ===== impl Connection |
198 | |
199 | impl<T, B, E> Connection<T, B, E> |
200 | where |
201 | T: Read + Write + Unpin + 'static, |
202 | B: Body + Unpin + 'static, |
203 | B::Data: Send, |
204 | B::Error: Into<Box<dyn Error + Send + Sync>>, |
205 | E: Http2ClientConnExec<B, T> + Unpin, |
206 | { |
207 | /// Returns whether the [extended CONNECT protocol][1] is enabled or not. |
208 | /// |
209 | /// This setting is configured by the server peer by sending the |
210 | /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame. |
211 | /// This method returns the currently acknowledged value received from the |
212 | /// remote. |
213 | /// |
214 | /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 |
215 | /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3 |
216 | pub fn is_extended_connect_protocol_enabled(&self) -> bool { |
217 | self.inner.1.is_extended_connect_protocol_enabled() |
218 | } |
219 | } |
220 | |
221 | impl<T, B, E> fmt::Debug for Connection<T, B, E> |
222 | where |
223 | T: Read + Write + fmt::Debug + 'static + Unpin, |
224 | B: Body + 'static, |
225 | E: Http2ClientConnExec<B, T> + Unpin, |
226 | B::Error: Into<Box<dyn Error + Send + Sync>>, |
227 | { |
228 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
229 | f.debug_struct(name:"Connection" ).finish() |
230 | } |
231 | } |
232 | |
233 | impl<T, B, E> Future for Connection<T, B, E> |
234 | where |
235 | T: Read + Write + Unpin + 'static, |
236 | B: Body + 'static + Unpin, |
237 | B::Data: Send, |
238 | E: Unpin, |
239 | B::Error: Into<Box<dyn Error + Send + Sync>>, |
240 | E: Http2ClientConnExec<B, T> + Unpin, |
241 | { |
242 | type Output = crate::Result<()>; |
243 | |
244 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
245 | match ready!(Pin::new(&mut self.inner.1).poll(cx))? { |
246 | proto::Dispatched::Shutdown => Poll::Ready(Ok(())), |
247 | #[cfg (feature = "http1" )] |
248 | proto::Dispatched::Upgrade(_pending: Pending) => unreachable!("http2 cannot upgrade" ), |
249 | } |
250 | } |
251 | } |
252 | |
253 | // ===== impl Builder |
254 | |
255 | impl<Ex> Builder<Ex> |
256 | where |
257 | Ex: Clone, |
258 | { |
259 | /// Creates a new connection builder. |
260 | #[inline ] |
261 | pub fn new(exec: Ex) -> Builder<Ex> { |
262 | Builder { |
263 | exec, |
264 | timer: Time::Empty, |
265 | h2_builder: Default::default(), |
266 | } |
267 | } |
268 | |
269 | /// Provide a timer to execute background HTTP2 tasks. |
270 | pub fn timer<M>(&mut self, timer: M) -> &mut Builder<Ex> |
271 | where |
272 | M: Timer + Send + Sync + 'static, |
273 | { |
274 | self.timer = Time::Timer(Arc::new(timer)); |
275 | self |
276 | } |
277 | |
278 | /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2 |
279 | /// stream-level flow control. |
280 | /// |
281 | /// Passing `None` will do nothing. |
282 | /// |
283 | /// If not set, hyper will use a default. |
284 | /// |
285 | /// [spec]: https://httpwg.org/specs/rfc9113.html#SETTINGS_INITIAL_WINDOW_SIZE |
286 | pub fn initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self { |
287 | if let Some(sz) = sz.into() { |
288 | self.h2_builder.adaptive_window = false; |
289 | self.h2_builder.initial_stream_window_size = sz; |
290 | } |
291 | self |
292 | } |
293 | |
294 | /// Sets the max connection-level flow control for HTTP2 |
295 | /// |
296 | /// Passing `None` will do nothing. |
297 | /// |
298 | /// If not set, hyper will use a default. |
299 | pub fn initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self { |
300 | if let Some(sz) = sz.into() { |
301 | self.h2_builder.adaptive_window = false; |
302 | self.h2_builder.initial_conn_window_size = sz; |
303 | } |
304 | self |
305 | } |
306 | |
307 | /// Sets the initial maximum of locally initiated (send) streams. |
308 | /// |
309 | /// This value will be overwritten by the value included in the initial |
310 | /// SETTINGS frame received from the peer as part of a [connection preface]. |
311 | /// |
312 | /// Passing `None` will do nothing. |
313 | /// |
314 | /// If not set, hyper will use a default. |
315 | /// |
316 | /// [connection preface]: https://httpwg.org/specs/rfc9113.html#preface |
317 | pub fn initial_max_send_streams(&mut self, initial: impl Into<Option<usize>>) -> &mut Self { |
318 | if let Some(initial) = initial.into() { |
319 | self.h2_builder.initial_max_send_streams = initial; |
320 | } |
321 | self |
322 | } |
323 | |
324 | /// Sets whether to use an adaptive flow control. |
325 | /// |
326 | /// Enabling this will override the limits set in |
327 | /// `initial_stream_window_size` and |
328 | /// `initial_connection_window_size`. |
329 | pub fn adaptive_window(&mut self, enabled: bool) -> &mut Self { |
330 | use proto::h2::SPEC_WINDOW_SIZE; |
331 | |
332 | self.h2_builder.adaptive_window = enabled; |
333 | if enabled { |
334 | self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE; |
335 | self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE; |
336 | } |
337 | self |
338 | } |
339 | |
340 | /// Sets the maximum frame size to use for HTTP2. |
341 | /// |
342 | /// Default is currently 16KB, but can change. |
343 | pub fn max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self { |
344 | self.h2_builder.max_frame_size = sz.into(); |
345 | self |
346 | } |
347 | |
348 | /// Sets the max size of received header frames. |
349 | /// |
350 | /// Default is currently 16KB, but can change. |
351 | pub fn max_header_list_size(&mut self, max: u32) -> &mut Self { |
352 | self.h2_builder.max_header_list_size = max; |
353 | self |
354 | } |
355 | |
356 | /// Sets the header table size. |
357 | /// |
358 | /// This setting informs the peer of the maximum size of the header compression |
359 | /// table used to encode header blocks, in octets. The encoder may select any value |
360 | /// equal to or less than the header table size specified by the sender. |
361 | /// |
362 | /// The default value of crate `h2` is 4,096. |
363 | pub fn header_table_size(&mut self, size: impl Into<Option<u32>>) -> &mut Self { |
364 | self.h2_builder.header_table_size = size.into(); |
365 | self |
366 | } |
367 | |
368 | /// Sets the maximum number of concurrent streams. |
369 | /// |
370 | /// The maximum concurrent streams setting only controls the maximum number |
371 | /// of streams that can be initiated by the remote peer. In other words, |
372 | /// when this setting is set to 100, this does not limit the number of |
373 | /// concurrent streams that can be created by the caller. |
374 | /// |
375 | /// It is recommended that this value be no smaller than 100, so as to not |
376 | /// unnecessarily limit parallelism. However, any value is legal, including |
377 | /// 0. If `max` is set to 0, then the remote will not be permitted to |
378 | /// initiate streams. |
379 | /// |
380 | /// Note that streams in the reserved state, i.e., push promises that have |
381 | /// been reserved but the stream has not started, do not count against this |
382 | /// setting. |
383 | /// |
384 | /// Also note that if the remote *does* exceed the value set here, it is not |
385 | /// a protocol level error. Instead, the `h2` library will immediately reset |
386 | /// the stream. |
387 | /// |
388 | /// See [Section 5.1.2] in the HTTP/2 spec for more details. |
389 | /// |
390 | /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2 |
391 | pub fn max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self { |
392 | self.h2_builder.max_concurrent_streams = max.into(); |
393 | self |
394 | } |
395 | |
396 | /// Sets an interval for HTTP2 Ping frames should be sent to keep a |
397 | /// connection alive. |
398 | /// |
399 | /// Pass `None` to disable HTTP2 keep-alive. |
400 | /// |
401 | /// Default is currently disabled. |
402 | pub fn keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self { |
403 | self.h2_builder.keep_alive_interval = interval.into(); |
404 | self |
405 | } |
406 | |
407 | /// Sets a timeout for receiving an acknowledgement of the keep-alive ping. |
408 | /// |
409 | /// If the ping is not acknowledged within the timeout, the connection will |
410 | /// be closed. Does nothing if `keep_alive_interval` is disabled. |
411 | /// |
412 | /// Default is 20 seconds. |
413 | pub fn keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self { |
414 | self.h2_builder.keep_alive_timeout = timeout; |
415 | self |
416 | } |
417 | |
418 | /// Sets whether HTTP2 keep-alive should apply while the connection is idle. |
419 | /// |
420 | /// If disabled, keep-alive pings are only sent while there are open |
421 | /// request/responses streams. If enabled, pings are also sent when no |
422 | /// streams are active. Does nothing if `keep_alive_interval` is |
423 | /// disabled. |
424 | /// |
425 | /// Default is `false`. |
426 | pub fn keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self { |
427 | self.h2_builder.keep_alive_while_idle = enabled; |
428 | self |
429 | } |
430 | |
431 | /// Sets the maximum number of HTTP2 concurrent locally reset streams. |
432 | /// |
433 | /// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more |
434 | /// details. |
435 | /// |
436 | /// The default value is determined by the `h2` crate. |
437 | /// |
438 | /// [`h2::client::Builder::max_concurrent_reset_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_reset_streams |
439 | pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self { |
440 | self.h2_builder.max_concurrent_reset_streams = Some(max); |
441 | self |
442 | } |
443 | |
444 | /// Set the maximum write buffer size for each HTTP/2 stream. |
445 | /// |
446 | /// Default is currently 1MB, but may change. |
447 | /// |
448 | /// # Panics |
449 | /// |
450 | /// The value must be no larger than `u32::MAX`. |
451 | pub fn max_send_buf_size(&mut self, max: usize) -> &mut Self { |
452 | assert!(max <= u32::MAX as usize); |
453 | self.h2_builder.max_send_buffer_size = max; |
454 | self |
455 | } |
456 | |
457 | /// Configures the maximum number of pending reset streams allowed before a GOAWAY will be sent. |
458 | /// |
459 | /// This will default to the default value set by the [`h2` crate](https://crates.io/crates/h2). |
460 | /// As of v0.4.0, it is 20. |
461 | /// |
462 | /// See <https://github.com/hyperium/hyper/issues/2877> for more information. |
463 | pub fn max_pending_accept_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self { |
464 | self.h2_builder.max_pending_accept_reset_streams = max.into(); |
465 | self |
466 | } |
467 | |
468 | /// Constructs a connection with the configured options and IO. |
469 | /// See [`client::conn`](crate::client::conn) for more. |
470 | /// |
471 | /// Note, if [`Connection`] is not `await`-ed, [`SendRequest`] will |
472 | /// do nothing. |
473 | pub fn handshake<T, B>( |
474 | &self, |
475 | io: T, |
476 | ) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B, Ex>)>> |
477 | where |
478 | T: Read + Write + Unpin, |
479 | B: Body + 'static, |
480 | B::Data: Send, |
481 | B::Error: Into<Box<dyn Error + Send + Sync>>, |
482 | Ex: Http2ClientConnExec<B, T> + Unpin, |
483 | { |
484 | let opts = self.clone(); |
485 | |
486 | async move { |
487 | trace!("client handshake HTTP/2" ); |
488 | |
489 | let (tx, rx) = dispatch::channel(); |
490 | let h2 = proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec, opts.timer) |
491 | .await?; |
492 | Ok(( |
493 | SendRequest { |
494 | dispatch: tx.unbound(), |
495 | }, |
496 | Connection { |
497 | inner: (PhantomData, h2), |
498 | }, |
499 | )) |
500 | } |
501 | } |
502 | } |
503 | |
504 | #[cfg (test)] |
505 | mod tests { |
506 | |
507 | #[tokio::test ] |
508 | #[ignore ] // only compilation is checked |
509 | async fn send_sync_executor_of_non_send_futures() { |
510 | #[derive (Clone)] |
511 | struct LocalTokioExecutor; |
512 | |
513 | impl<F> crate::rt::Executor<F> for LocalTokioExecutor |
514 | where |
515 | F: std::future::Future + 'static, // not requiring `Send` |
516 | { |
517 | fn execute(&self, fut: F) { |
518 | // This will spawn into the currently running `LocalSet`. |
519 | tokio::task::spawn_local(fut); |
520 | } |
521 | } |
522 | |
523 | #[allow (unused)] |
524 | async fn run(io: impl crate::rt::Read + crate::rt::Write + Unpin + 'static) { |
525 | let (_sender, conn) = crate::client::conn::http2::handshake::< |
526 | _, |
527 | _, |
528 | http_body_util::Empty<bytes::Bytes>, |
529 | >(LocalTokioExecutor, io) |
530 | .await |
531 | .unwrap(); |
532 | |
533 | tokio::task::spawn_local(async move { |
534 | conn.await.unwrap(); |
535 | }); |
536 | } |
537 | } |
538 | |
539 | #[tokio::test ] |
540 | #[ignore ] // only compilation is checked |
541 | async fn not_send_not_sync_executor_of_not_send_futures() { |
542 | #[derive (Clone)] |
543 | struct LocalTokioExecutor { |
544 | _x: std::marker::PhantomData<std::rc::Rc<()>>, |
545 | } |
546 | |
547 | impl<F> crate::rt::Executor<F> for LocalTokioExecutor |
548 | where |
549 | F: std::future::Future + 'static, // not requiring `Send` |
550 | { |
551 | fn execute(&self, fut: F) { |
552 | // This will spawn into the currently running `LocalSet`. |
553 | tokio::task::spawn_local(fut); |
554 | } |
555 | } |
556 | |
557 | #[allow (unused)] |
558 | async fn run(io: impl crate::rt::Read + crate::rt::Write + Unpin + 'static) { |
559 | let (_sender, conn) = |
560 | crate::client::conn::http2::handshake::<_, _, http_body_util::Empty<bytes::Bytes>>( |
561 | LocalTokioExecutor { |
562 | _x: Default::default(), |
563 | }, |
564 | io, |
565 | ) |
566 | .await |
567 | .unwrap(); |
568 | |
569 | tokio::task::spawn_local(async move { |
570 | conn.await.unwrap(); |
571 | }); |
572 | } |
573 | } |
574 | |
575 | #[tokio::test ] |
576 | #[ignore ] // only compilation is checked |
577 | async fn send_not_sync_executor_of_not_send_futures() { |
578 | #[derive (Clone)] |
579 | struct LocalTokioExecutor { |
580 | _x: std::marker::PhantomData<std::cell::Cell<()>>, |
581 | } |
582 | |
583 | impl<F> crate::rt::Executor<F> for LocalTokioExecutor |
584 | where |
585 | F: std::future::Future + 'static, // not requiring `Send` |
586 | { |
587 | fn execute(&self, fut: F) { |
588 | // This will spawn into the currently running `LocalSet`. |
589 | tokio::task::spawn_local(fut); |
590 | } |
591 | } |
592 | |
593 | #[allow (unused)] |
594 | async fn run(io: impl crate::rt::Read + crate::rt::Write + Unpin + 'static) { |
595 | let (_sender, conn) = |
596 | crate::client::conn::http2::handshake::<_, _, http_body_util::Empty<bytes::Bytes>>( |
597 | LocalTokioExecutor { |
598 | _x: Default::default(), |
599 | }, |
600 | io, |
601 | ) |
602 | .await |
603 | .unwrap(); |
604 | |
605 | tokio::task::spawn_local(async move { |
606 | conn.await.unwrap(); |
607 | }); |
608 | } |
609 | } |
610 | |
611 | #[tokio::test ] |
612 | #[ignore ] // only compilation is checked |
613 | async fn send_sync_executor_of_send_futures() { |
614 | #[derive (Clone)] |
615 | struct TokioExecutor; |
616 | |
617 | impl<F> crate::rt::Executor<F> for TokioExecutor |
618 | where |
619 | F: std::future::Future + 'static + Send, |
620 | F::Output: Send + 'static, |
621 | { |
622 | fn execute(&self, fut: F) { |
623 | tokio::task::spawn(fut); |
624 | } |
625 | } |
626 | |
627 | #[allow (unused)] |
628 | async fn run(io: impl crate::rt::Read + crate::rt::Write + Send + Unpin + 'static) { |
629 | let (_sender, conn) = crate::client::conn::http2::handshake::< |
630 | _, |
631 | _, |
632 | http_body_util::Empty<bytes::Bytes>, |
633 | >(TokioExecutor, io) |
634 | .await |
635 | .unwrap(); |
636 | |
637 | tokio::task::spawn(async move { |
638 | conn.await.unwrap(); |
639 | }); |
640 | } |
641 | } |
642 | |
643 | #[tokio::test ] |
644 | #[ignore ] // only compilation is checked |
645 | async fn not_send_not_sync_executor_of_send_futures() { |
646 | #[derive (Clone)] |
647 | struct TokioExecutor { |
648 | // !Send, !Sync |
649 | _x: std::marker::PhantomData<std::rc::Rc<()>>, |
650 | } |
651 | |
652 | impl<F> crate::rt::Executor<F> for TokioExecutor |
653 | where |
654 | F: std::future::Future + 'static + Send, |
655 | F::Output: Send + 'static, |
656 | { |
657 | fn execute(&self, fut: F) { |
658 | tokio::task::spawn(fut); |
659 | } |
660 | } |
661 | |
662 | #[allow (unused)] |
663 | async fn run(io: impl crate::rt::Read + crate::rt::Write + Send + Unpin + 'static) { |
664 | let (_sender, conn) = |
665 | crate::client::conn::http2::handshake::<_, _, http_body_util::Empty<bytes::Bytes>>( |
666 | TokioExecutor { |
667 | _x: Default::default(), |
668 | }, |
669 | io, |
670 | ) |
671 | .await |
672 | .unwrap(); |
673 | |
674 | tokio::task::spawn_local(async move { |
675 | // can't use spawn here because when executor is !Send |
676 | conn.await.unwrap(); |
677 | }); |
678 | } |
679 | } |
680 | |
681 | #[tokio::test ] |
682 | #[ignore ] // only compilation is checked |
683 | async fn send_not_sync_executor_of_send_futures() { |
684 | #[derive (Clone)] |
685 | struct TokioExecutor { |
686 | // !Sync |
687 | _x: std::marker::PhantomData<std::cell::Cell<()>>, |
688 | } |
689 | |
690 | impl<F> crate::rt::Executor<F> for TokioExecutor |
691 | where |
692 | F: std::future::Future + 'static + Send, |
693 | F::Output: Send + 'static, |
694 | { |
695 | fn execute(&self, fut: F) { |
696 | tokio::task::spawn(fut); |
697 | } |
698 | } |
699 | |
700 | #[allow (unused)] |
701 | async fn run(io: impl crate::rt::Read + crate::rt::Write + Send + Unpin + 'static) { |
702 | let (_sender, conn) = |
703 | crate::client::conn::http2::handshake::<_, _, http_body_util::Empty<bytes::Bytes>>( |
704 | TokioExecutor { |
705 | _x: Default::default(), |
706 | }, |
707 | io, |
708 | ) |
709 | .await |
710 | .unwrap(); |
711 | |
712 | tokio::task::spawn_local(async move { |
713 | // can't use spawn here because when executor is !Send |
714 | conn.await.unwrap(); |
715 | }); |
716 | } |
717 | } |
718 | } |
719 | |