1 | //! Various utility types and functions that are generally used with Tower. |
2 | |
3 | mod and_then; |
4 | mod boxed; |
5 | mod boxed_clone; |
6 | mod boxed_clone_sync; |
7 | mod call_all; |
8 | mod either; |
9 | |
10 | mod future_service; |
11 | mod map_err; |
12 | mod map_request; |
13 | mod map_response; |
14 | mod map_result; |
15 | |
16 | mod map_future; |
17 | mod oneshot; |
18 | mod optional; |
19 | mod ready; |
20 | mod service_fn; |
21 | mod then; |
22 | |
23 | pub mod rng; |
24 | |
25 | pub use self::{ |
26 | and_then::{AndThen, AndThenLayer}, |
27 | boxed::{ |
28 | BoxCloneServiceLayer, BoxCloneSyncServiceLayer, BoxLayer, BoxService, UnsyncBoxService, |
29 | }, |
30 | boxed_clone::BoxCloneService, |
31 | boxed_clone_sync::BoxCloneSyncService, |
32 | either::Either, |
33 | future_service::{future_service, FutureService}, |
34 | map_err::{MapErr, MapErrLayer}, |
35 | map_future::{MapFuture, MapFutureLayer}, |
36 | map_request::{MapRequest, MapRequestLayer}, |
37 | map_response::{MapResponse, MapResponseLayer}, |
38 | map_result::{MapResult, MapResultLayer}, |
39 | oneshot::Oneshot, |
40 | optional::Optional, |
41 | ready::{Ready, ReadyOneshot}, |
42 | service_fn::{service_fn, ServiceFn}, |
43 | then::{Then, ThenLayer}, |
44 | }; |
45 | |
46 | pub use self::call_all::{CallAll, CallAllUnordered}; |
47 | use std::future::Future; |
48 | |
49 | use crate::layer::util::Identity; |
50 | |
51 | pub mod error { |
52 | //! Error types |
53 | |
54 | pub use super::optional::error as optional; |
55 | } |
56 | |
57 | pub mod future { |
58 | //! Future types |
59 | |
60 | pub use super::and_then::AndThenFuture; |
61 | pub use super::either::EitherResponseFuture; |
62 | pub use super::map_err::MapErrFuture; |
63 | pub use super::map_response::MapResponseFuture; |
64 | pub use super::map_result::MapResultFuture; |
65 | pub use super::optional::future as optional; |
66 | pub use super::then::ThenFuture; |
67 | } |
68 | |
69 | /// An extension trait for `Service`s that provides a variety of convenient |
70 | /// adapters |
71 | pub trait ServiceExt<Request>: tower_service::Service<Request> { |
72 | /// Yields a mutable reference to the service when it is ready to accept a request. |
73 | fn ready(&mut self) -> Ready<'_, Self, Request> |
74 | where |
75 | Self: Sized, |
76 | { |
77 | Ready::new(self) |
78 | } |
79 | |
80 | /// Yields the service when it is ready to accept a request. |
81 | fn ready_oneshot(self) -> ReadyOneshot<Self, Request> |
82 | where |
83 | Self: Sized, |
84 | { |
85 | ReadyOneshot::new(self) |
86 | } |
87 | |
88 | /// Consume this `Service`, calling it with the provided request once it is ready. |
89 | fn oneshot(self, req: Request) -> Oneshot<Self, Request> |
90 | where |
91 | Self: Sized, |
92 | { |
93 | Oneshot::new(self, req) |
94 | } |
95 | |
96 | /// Process all requests from the given [`Stream`], and produce a [`Stream`] of their responses. |
97 | /// |
98 | /// This is essentially [`Stream<Item = Request>`][stream] + `Self` => [`Stream<Item = |
99 | /// Response>`][stream]. See the documentation for [`CallAll`] for |
100 | /// details. |
101 | /// |
102 | /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html |
103 | /// [stream]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html |
104 | fn call_all<S>(self, reqs: S) -> CallAll<Self, S> |
105 | where |
106 | Self: Sized, |
107 | S: futures_core::Stream<Item = Request>, |
108 | { |
109 | CallAll::new(self, reqs) |
110 | } |
111 | |
112 | /// Executes a new future after this service's future resolves. This does |
113 | /// not alter the behaviour of the [`poll_ready`] method. |
114 | /// |
115 | /// This method can be used to change the [`Response`] type of the service |
116 | /// into a different type. You can use this method to chain along a computation once the |
117 | /// service's response has been resolved. |
118 | /// |
119 | /// [`Response`]: crate::Service::Response |
120 | /// [`poll_ready`]: crate::Service::poll_ready |
121 | /// |
122 | /// # Example |
123 | /// ``` |
124 | /// # use std::task::{Poll, Context}; |
125 | /// # use tower::{Service, ServiceExt}; |
126 | /// # |
127 | /// # struct DatabaseService; |
128 | /// # impl DatabaseService { |
129 | /// # fn new(address: &str) -> Self { |
130 | /// # DatabaseService |
131 | /// # } |
132 | /// # } |
133 | /// # |
134 | /// # struct Record { |
135 | /// # pub name: String, |
136 | /// # pub age: u16 |
137 | /// # } |
138 | /// # |
139 | /// # impl Service<u32> for DatabaseService { |
140 | /// # type Response = Record; |
141 | /// # type Error = u8; |
142 | /// # type Future = futures_util::future::Ready<Result<Record, u8>>; |
143 | /// # |
144 | /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
145 | /// # Poll::Ready(Ok(())) |
146 | /// # } |
147 | /// # |
148 | /// # fn call(&mut self, request: u32) -> Self::Future { |
149 | /// # futures_util::future::ready(Ok(Record { name: "Jack" .into(), age: 32 })) |
150 | /// # } |
151 | /// # } |
152 | /// # |
153 | /// # async fn avatar_lookup(name: String) -> Result<Vec<u8>, u8> { Ok(vec![]) } |
154 | /// # |
155 | /// # fn main() { |
156 | /// # async { |
157 | /// // A service returning Result<Record, _> |
158 | /// let service = DatabaseService::new("127.0.0.1:8080" ); |
159 | /// |
160 | /// // Map the response into a new response |
161 | /// let mut new_service = service.and_then(|record: Record| async move { |
162 | /// let name = record.name; |
163 | /// avatar_lookup(name).await |
164 | /// }); |
165 | /// |
166 | /// // Call the new service |
167 | /// let id = 13; |
168 | /// let avatar = new_service.call(id).await.unwrap(); |
169 | /// # }; |
170 | /// # } |
171 | /// ``` |
172 | fn and_then<F>(self, f: F) -> AndThen<Self, F> |
173 | where |
174 | Self: Sized, |
175 | F: Clone, |
176 | { |
177 | AndThen::new(self, f) |
178 | } |
179 | |
180 | /// Maps this service's response value to a different value. This does not |
181 | /// alter the behaviour of the [`poll_ready`] method. |
182 | /// |
183 | /// This method can be used to change the [`Response`] type of the service |
184 | /// into a different type. It is similar to the [`Result::map`] |
185 | /// method. You can use this method to chain along a computation once the |
186 | /// service's response has been resolved. |
187 | /// |
188 | /// [`Response`]: crate::Service::Response |
189 | /// [`poll_ready`]: crate::Service::poll_ready |
190 | /// |
191 | /// # Example |
192 | /// ``` |
193 | /// # use std::task::{Poll, Context}; |
194 | /// # use tower::{Service, ServiceExt}; |
195 | /// # |
196 | /// # struct DatabaseService; |
197 | /// # impl DatabaseService { |
198 | /// # fn new(address: &str) -> Self { |
199 | /// # DatabaseService |
200 | /// # } |
201 | /// # } |
202 | /// # |
203 | /// # struct Record { |
204 | /// # pub name: String, |
205 | /// # pub age: u16 |
206 | /// # } |
207 | /// # |
208 | /// # impl Service<u32> for DatabaseService { |
209 | /// # type Response = Record; |
210 | /// # type Error = u8; |
211 | /// # type Future = futures_util::future::Ready<Result<Record, u8>>; |
212 | /// # |
213 | /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
214 | /// # Poll::Ready(Ok(())) |
215 | /// # } |
216 | /// # |
217 | /// # fn call(&mut self, request: u32) -> Self::Future { |
218 | /// # futures_util::future::ready(Ok(Record { name: "Jack" .into(), age: 32 })) |
219 | /// # } |
220 | /// # } |
221 | /// # |
222 | /// # fn main() { |
223 | /// # async { |
224 | /// // A service returning Result<Record, _> |
225 | /// let service = DatabaseService::new("127.0.0.1:8080" ); |
226 | /// |
227 | /// // Map the response into a new response |
228 | /// let mut new_service = service.map_response(|record| record.name); |
229 | /// |
230 | /// // Call the new service |
231 | /// let id = 13; |
232 | /// let name = new_service |
233 | /// .ready() |
234 | /// .await? |
235 | /// .call(id) |
236 | /// .await?; |
237 | /// # Ok::<(), u8>(()) |
238 | /// # }; |
239 | /// # } |
240 | /// ``` |
241 | fn map_response<F, Response>(self, f: F) -> MapResponse<Self, F> |
242 | where |
243 | Self: Sized, |
244 | F: FnOnce(Self::Response) -> Response + Clone, |
245 | { |
246 | MapResponse::new(self, f) |
247 | } |
248 | |
249 | /// Maps this service's error value to a different value. This does not |
250 | /// alter the behaviour of the [`poll_ready`] method. |
251 | /// |
252 | /// This method can be used to change the [`Error`] type of the service |
253 | /// into a different type. It is similar to the [`Result::map_err`] method. |
254 | /// |
255 | /// [`Error`]: crate::Service::Error |
256 | /// [`poll_ready`]: crate::Service::poll_ready |
257 | /// |
258 | /// # Example |
259 | /// ``` |
260 | /// # use std::task::{Poll, Context}; |
261 | /// # use tower::{Service, ServiceExt}; |
262 | /// # |
263 | /// # struct DatabaseService; |
264 | /// # impl DatabaseService { |
265 | /// # fn new(address: &str) -> Self { |
266 | /// # DatabaseService |
267 | /// # } |
268 | /// # } |
269 | /// # |
270 | /// # struct Error { |
271 | /// # pub code: u32, |
272 | /// # pub message: String |
273 | /// # } |
274 | /// # |
275 | /// # impl Service<u32> for DatabaseService { |
276 | /// # type Response = String; |
277 | /// # type Error = Error; |
278 | /// # type Future = futures_util::future::Ready<Result<String, Error>>; |
279 | /// # |
280 | /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
281 | /// # Poll::Ready(Ok(())) |
282 | /// # } |
283 | /// # |
284 | /// # fn call(&mut self, request: u32) -> Self::Future { |
285 | /// # futures_util::future::ready(Ok(String::new())) |
286 | /// # } |
287 | /// # } |
288 | /// # |
289 | /// # fn main() { |
290 | /// # async { |
291 | /// // A service returning Result<_, Error> |
292 | /// let service = DatabaseService::new("127.0.0.1:8080" ); |
293 | /// |
294 | /// // Map the error to a new error |
295 | /// let mut new_service = service.map_err(|err| err.code); |
296 | /// |
297 | /// // Call the new service |
298 | /// let id = 13; |
299 | /// let code = new_service |
300 | /// .ready() |
301 | /// .await? |
302 | /// .call(id) |
303 | /// .await |
304 | /// .unwrap_err(); |
305 | /// # Ok::<(), u32>(()) |
306 | /// # }; |
307 | /// # } |
308 | /// ``` |
309 | fn map_err<F, Error>(self, f: F) -> MapErr<Self, F> |
310 | where |
311 | Self: Sized, |
312 | F: FnOnce(Self::Error) -> Error + Clone, |
313 | { |
314 | MapErr::new(self, f) |
315 | } |
316 | |
317 | /// Maps this service's result type (`Result<Self::Response, Self::Error>`) |
318 | /// to a different value, regardless of whether the future succeeds or |
319 | /// fails. |
320 | /// |
321 | /// This is similar to the [`map_response`] and [`map_err`] combinators, |
322 | /// except that the *same* function is invoked when the service's future |
323 | /// completes, whether it completes successfully or fails. This function |
324 | /// takes the [`Result`] returned by the service's future, and returns a |
325 | /// [`Result`]. |
326 | /// |
327 | /// Like the standard library's [`Result::and_then`], this method can be |
328 | /// used to implement control flow based on `Result` values. For example, it |
329 | /// may be used to implement error recovery, by turning some [`Err`] |
330 | /// responses from the service into [`Ok`] responses. Similarly, some |
331 | /// successful responses from the service could be rejected, by returning an |
332 | /// [`Err`] conditionally, depending on the value inside the [`Ok`.] Finally, |
333 | /// this method can also be used to implement behaviors that must run when a |
334 | /// service's future completes, regardless of whether it succeeded or failed. |
335 | /// |
336 | /// This method can be used to change the [`Response`] type of the service |
337 | /// into a different type. It can also be used to change the [`Error`] type |
338 | /// of the service. However, because the [`map_result`] function is not applied |
339 | /// to the errors returned by the service's [`poll_ready`] method, it must |
340 | /// be possible to convert the service's [`Error`] type into the error type |
341 | /// returned by the [`map_result`] function. This is trivial when the function |
342 | /// returns the same error type as the service, but in other cases, it can |
343 | /// be useful to use [`BoxError`] to erase differing error types. |
344 | /// |
345 | /// # Examples |
346 | /// |
347 | /// Recovering from certain errors: |
348 | /// |
349 | /// ``` |
350 | /// # use std::task::{Poll, Context}; |
351 | /// # use tower::{Service, ServiceExt}; |
352 | /// # |
353 | /// # struct DatabaseService; |
354 | /// # impl DatabaseService { |
355 | /// # fn new(address: &str) -> Self { |
356 | /// # DatabaseService |
357 | /// # } |
358 | /// # } |
359 | /// # |
360 | /// # struct Record { |
361 | /// # pub name: String, |
362 | /// # pub age: u16 |
363 | /// # } |
364 | /// # #[derive(Debug)] |
365 | /// # enum DbError { |
366 | /// # Parse(std::num::ParseIntError), |
367 | /// # NoRecordsFound, |
368 | /// # } |
369 | /// # |
370 | /// # impl Service<u32> for DatabaseService { |
371 | /// # type Response = Vec<Record>; |
372 | /// # type Error = DbError; |
373 | /// # type Future = futures_util::future::Ready<Result<Vec<Record>, DbError>>; |
374 | /// # |
375 | /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
376 | /// # Poll::Ready(Ok(())) |
377 | /// # } |
378 | /// # |
379 | /// # fn call(&mut self, request: u32) -> Self::Future { |
380 | /// # futures_util::future::ready(Ok(vec![Record { name: "Jack" .into(), age: 32 }])) |
381 | /// # } |
382 | /// # } |
383 | /// # |
384 | /// # fn main() { |
385 | /// # async { |
386 | /// // A service returning Result<Vec<Record>, DbError> |
387 | /// let service = DatabaseService::new("127.0.0.1:8080" ); |
388 | /// |
389 | /// // If the database returns no records for the query, we just want an empty `Vec`. |
390 | /// let mut new_service = service.map_result(|result| match result { |
391 | /// // If the error indicates that no records matched the query, return an empty |
392 | /// // `Vec` instead. |
393 | /// Err(DbError::NoRecordsFound) => Ok(Vec::new()), |
394 | /// // Propagate all other responses (`Ok` and `Err`) unchanged |
395 | /// x => x, |
396 | /// }); |
397 | /// |
398 | /// // Call the new service |
399 | /// let id = 13; |
400 | /// let name = new_service |
401 | /// .ready() |
402 | /// .await? |
403 | /// .call(id) |
404 | /// .await?; |
405 | /// # Ok::<(), DbError>(()) |
406 | /// # }; |
407 | /// # } |
408 | /// ``` |
409 | /// |
410 | /// Rejecting some `Ok` responses: |
411 | /// |
412 | /// ``` |
413 | /// # use std::task::{Poll, Context}; |
414 | /// # use tower::{Service, ServiceExt}; |
415 | /// # |
416 | /// # struct DatabaseService; |
417 | /// # impl DatabaseService { |
418 | /// # fn new(address: &str) -> Self { |
419 | /// # DatabaseService |
420 | /// # } |
421 | /// # } |
422 | /// # |
423 | /// # struct Record { |
424 | /// # pub name: String, |
425 | /// # pub age: u16 |
426 | /// # } |
427 | /// # type DbError = String; |
428 | /// # type AppError = String; |
429 | /// # |
430 | /// # impl Service<u32> for DatabaseService { |
431 | /// # type Response = Record; |
432 | /// # type Error = DbError; |
433 | /// # type Future = futures_util::future::Ready<Result<Record, DbError>>; |
434 | /// # |
435 | /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
436 | /// # Poll::Ready(Ok(())) |
437 | /// # } |
438 | /// # |
439 | /// # fn call(&mut self, request: u32) -> Self::Future { |
440 | /// # futures_util::future::ready(Ok(Record { name: "Jack" .into(), age: 32 })) |
441 | /// # } |
442 | /// # } |
443 | /// # |
444 | /// # fn main() { |
445 | /// # async { |
446 | /// use tower::BoxError; |
447 | /// |
448 | /// // A service returning Result<Record, DbError> |
449 | /// let service = DatabaseService::new("127.0.0.1:8080" ); |
450 | /// |
451 | /// // If the user is zero years old, return an error. |
452 | /// let mut new_service = service.map_result(|result| { |
453 | /// let record = result?; |
454 | /// |
455 | /// if record.age == 0 { |
456 | /// // Users must have been born to use our app! |
457 | /// let app_error = AppError::from("users cannot be 0 years old!" ); |
458 | /// |
459 | /// // Box the error to erase its type (as it can be an `AppError` |
460 | /// // *or* the inner service's `DbError`). |
461 | /// return Err(BoxError::from(app_error)); |
462 | /// } |
463 | /// |
464 | /// // Otherwise, return the record. |
465 | /// Ok(record) |
466 | /// }); |
467 | /// |
468 | /// // Call the new service |
469 | /// let id = 13; |
470 | /// let record = new_service |
471 | /// .ready() |
472 | /// .await? |
473 | /// .call(id) |
474 | /// .await?; |
475 | /// # Ok::<(), BoxError>(()) |
476 | /// # }; |
477 | /// # } |
478 | /// ``` |
479 | /// |
480 | /// Performing an action that must be run for both successes and failures: |
481 | /// |
482 | /// ``` |
483 | /// # use std::convert::TryFrom; |
484 | /// # use std::task::{Poll, Context}; |
485 | /// # use tower::{Service, ServiceExt}; |
486 | /// # |
487 | /// # struct DatabaseService; |
488 | /// # impl DatabaseService { |
489 | /// # fn new(address: &str) -> Self { |
490 | /// # DatabaseService |
491 | /// # } |
492 | /// # } |
493 | /// # |
494 | /// # impl Service<u32> for DatabaseService { |
495 | /// # type Response = String; |
496 | /// # type Error = u8; |
497 | /// # type Future = futures_util::future::Ready<Result<String, u8>>; |
498 | /// # |
499 | /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
500 | /// # Poll::Ready(Ok(())) |
501 | /// # } |
502 | /// # |
503 | /// # fn call(&mut self, request: u32) -> Self::Future { |
504 | /// # futures_util::future::ready(Ok(String::new())) |
505 | /// # } |
506 | /// # } |
507 | /// # |
508 | /// # fn main() { |
509 | /// # async { |
510 | /// // A service returning Result<Record, DbError> |
511 | /// let service = DatabaseService::new("127.0.0.1:8080" ); |
512 | /// |
513 | /// // Print a message whenever a query completes. |
514 | /// let mut new_service = service.map_result(|result| { |
515 | /// println!("query completed; success={}" , result.is_ok()); |
516 | /// result |
517 | /// }); |
518 | /// |
519 | /// // Call the new service |
520 | /// let id = 13; |
521 | /// let response = new_service |
522 | /// .ready() |
523 | /// .await? |
524 | /// .call(id) |
525 | /// .await; |
526 | /// # response |
527 | /// # }; |
528 | /// # } |
529 | /// ``` |
530 | /// |
531 | /// [`map_response`]: ServiceExt::map_response |
532 | /// [`map_err`]: ServiceExt::map_err |
533 | /// [`map_result`]: ServiceExt::map_result |
534 | /// [`Error`]: crate::Service::Error |
535 | /// [`Response`]: crate::Service::Response |
536 | /// [`poll_ready`]: crate::Service::poll_ready |
537 | /// [`BoxError`]: crate::BoxError |
538 | fn map_result<F, Response, Error>(self, f: F) -> MapResult<Self, F> |
539 | where |
540 | Self: Sized, |
541 | Error: From<Self::Error>, |
542 | F: FnOnce(Result<Self::Response, Self::Error>) -> Result<Response, Error> + Clone, |
543 | { |
544 | MapResult::new(self, f) |
545 | } |
546 | |
547 | /// Composes a function *in front of* the service. |
548 | /// |
549 | /// This adapter produces a new service that passes each value through the |
550 | /// given function `f` before sending it to `self`. |
551 | /// |
552 | /// # Example |
553 | /// ``` |
554 | /// # use std::convert::TryFrom; |
555 | /// # use std::task::{Poll, Context}; |
556 | /// # use tower::{Service, ServiceExt}; |
557 | /// # |
558 | /// # struct DatabaseService; |
559 | /// # impl DatabaseService { |
560 | /// # fn new(address: &str) -> Self { |
561 | /// # DatabaseService |
562 | /// # } |
563 | /// # } |
564 | /// # |
565 | /// # impl Service<String> for DatabaseService { |
566 | /// # type Response = String; |
567 | /// # type Error = u8; |
568 | /// # type Future = futures_util::future::Ready<Result<String, u8>>; |
569 | /// # |
570 | /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
571 | /// # Poll::Ready(Ok(())) |
572 | /// # } |
573 | /// # |
574 | /// # fn call(&mut self, request: String) -> Self::Future { |
575 | /// # futures_util::future::ready(Ok(String::new())) |
576 | /// # } |
577 | /// # } |
578 | /// # |
579 | /// # fn main() { |
580 | /// # async { |
581 | /// // A service taking a String as a request |
582 | /// let service = DatabaseService::new("127.0.0.1:8080" ); |
583 | /// |
584 | /// // Map the request to a new request |
585 | /// let mut new_service = service.map_request(|id: u32| id.to_string()); |
586 | /// |
587 | /// // Call the new service |
588 | /// let id = 13; |
589 | /// let response = new_service |
590 | /// .ready() |
591 | /// .await? |
592 | /// .call(id) |
593 | /// .await; |
594 | /// # response |
595 | /// # }; |
596 | /// # } |
597 | /// ``` |
598 | fn map_request<F, NewRequest>(self, f: F) -> MapRequest<Self, F> |
599 | where |
600 | Self: Sized, |
601 | F: FnMut(NewRequest) -> Request, |
602 | { |
603 | MapRequest::new(self, f) |
604 | } |
605 | |
606 | /// Composes this service with a [`Filter`] that conditionally accepts or |
607 | /// rejects requests based on a [predicate]. |
608 | /// |
609 | /// This adapter produces a new service that passes each value through the |
610 | /// given function `predicate` before sending it to `self`. |
611 | /// |
612 | /// # Example |
613 | /// ``` |
614 | /// # use std::convert::TryFrom; |
615 | /// # use std::task::{Poll, Context}; |
616 | /// # use tower::{Service, ServiceExt}; |
617 | /// # |
618 | /// # struct DatabaseService; |
619 | /// # impl DatabaseService { |
620 | /// # fn new(address: &str) -> Self { |
621 | /// # DatabaseService |
622 | /// # } |
623 | /// # } |
624 | /// # |
625 | /// # #[derive(Debug)] enum DbError { |
626 | /// # Parse(std::num::ParseIntError) |
627 | /// # } |
628 | /// # |
629 | /// # impl std::fmt::Display for DbError { |
630 | /// # fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { std::fmt::Debug::fmt(self, f) } |
631 | /// # } |
632 | /// # impl std::error::Error for DbError {} |
633 | /// # impl Service<u32> for DatabaseService { |
634 | /// # type Response = String; |
635 | /// # type Error = DbError; |
636 | /// # type Future = futures_util::future::Ready<Result<String, DbError>>; |
637 | /// # |
638 | /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
639 | /// # Poll::Ready(Ok(())) |
640 | /// # } |
641 | /// # |
642 | /// # fn call(&mut self, request: u32) -> Self::Future { |
643 | /// # futures_util::future::ready(Ok(String::new())) |
644 | /// # } |
645 | /// # } |
646 | /// # |
647 | /// # fn main() { |
648 | /// # async { |
649 | /// // A service taking a u32 as a request and returning Result<_, DbError> |
650 | /// let service = DatabaseService::new("127.0.0.1:8080"); |
651 | /// |
652 | /// // Fallibly map the request to a new request |
653 | /// let mut new_service = service |
654 | /// .filter(|id_str: &str| id_str.parse().map_err(DbError::Parse)); |
655 | /// |
656 | /// // Call the new service |
657 | /// let id = "13"; |
658 | /// let response = new_service |
659 | /// .ready() |
660 | /// .await? |
661 | /// .call(id) |
662 | /// .await; |
663 | /// # response |
664 | /// # }; |
665 | /// # } |
666 | /// ``` |
667 | /// |
668 | /// [`Filter`]: crate::filter::Filter |
669 | /// [predicate]: crate::filter::Predicate |
670 | #[cfg (feature = "filter" )] |
671 | fn filter<F, NewRequest>(self, filter: F) -> crate::filter::Filter<Self, F> |
672 | where |
673 | Self: Sized, |
674 | F: crate::filter::Predicate<NewRequest>, |
675 | { |
676 | crate::filter::Filter::new(self, filter) |
677 | } |
678 | |
679 | /// Composes this service with an [`AsyncFilter`] that conditionally accepts or |
680 | /// rejects requests based on an [async predicate]. |
681 | /// |
682 | /// This adapter produces a new service that passes each value through the |
683 | /// given function `predicate` before sending it to `self`. |
684 | /// |
685 | /// # Example |
686 | /// ``` |
687 | /// # use std::convert::TryFrom; |
688 | /// # use std::task::{Poll, Context}; |
689 | /// # use tower::{Service, ServiceExt}; |
690 | /// # |
691 | /// # #[derive(Clone)] struct DatabaseService; |
692 | /// # impl DatabaseService { |
693 | /// # fn new(address: &str) -> Self { |
694 | /// # DatabaseService |
695 | /// # } |
696 | /// # } |
697 | /// # #[derive(Debug)] |
698 | /// # enum DbError { |
699 | /// # Rejected |
700 | /// # } |
701 | /// # impl std::fmt::Display for DbError { |
702 | /// # fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { std::fmt::Debug::fmt(self, f) } |
703 | /// # } |
704 | /// # impl std::error::Error for DbError {} |
705 | /// # |
706 | /// # impl Service<u32> for DatabaseService { |
707 | /// # type Response = String; |
708 | /// # type Error = DbError; |
709 | /// # type Future = futures_util::future::Ready<Result<String, DbError>>; |
710 | /// # |
711 | /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
712 | /// # Poll::Ready(Ok(())) |
713 | /// # } |
714 | /// # |
715 | /// # fn call(&mut self, request: u32) -> Self::Future { |
716 | /// # futures_util::future::ready(Ok(String::new())) |
717 | /// # } |
718 | /// # } |
719 | /// # |
720 | /// # fn main() { |
721 | /// # async { |
722 | /// // A service taking a u32 as a request and returning Result<_, DbError> |
723 | /// let service = DatabaseService::new("127.0.0.1:8080"); |
724 | /// |
725 | /// /// Returns `true` if we should query the database for an ID. |
726 | /// async fn should_query(id: u32) -> bool { |
727 | /// // ... |
728 | /// # true |
729 | /// } |
730 | /// |
731 | /// // Filter requests based on `should_query`. |
732 | /// let mut new_service = service |
733 | /// .filter_async(|id: u32| async move { |
734 | /// if should_query(id).await { |
735 | /// return Ok(id); |
736 | /// } |
737 | /// |
738 | /// Err(DbError::Rejected) |
739 | /// }); |
740 | /// |
741 | /// // Call the new service |
742 | /// let id = 13; |
743 | /// # let id: u32 = id; |
744 | /// let response = new_service |
745 | /// .ready() |
746 | /// .await? |
747 | /// .call(id) |
748 | /// .await; |
749 | /// # response |
750 | /// # }; |
751 | /// # } |
752 | /// ``` |
753 | /// |
754 | /// [`AsyncFilter`]: crate::filter::AsyncFilter |
755 | /// [asynchronous predicate]: crate::filter::AsyncPredicate |
756 | #[cfg (feature = "filter" )] |
757 | fn filter_async<F, NewRequest>(self, filter: F) -> crate::filter::AsyncFilter<Self, F> |
758 | where |
759 | Self: Sized, |
760 | F: crate::filter::AsyncPredicate<NewRequest>, |
761 | { |
762 | crate::filter::AsyncFilter::new(self, filter) |
763 | } |
764 | |
765 | /// Composes an asynchronous function *after* this service. |
766 | /// |
767 | /// This takes a function or closure returning a future, and returns a new |
768 | /// `Service` that chains that function after this service's [`Future`]. The |
769 | /// new `Service`'s future will consist of this service's future, followed |
770 | /// by the future returned by calling the chained function with the future's |
771 | /// [`Output`] type. The chained function is called regardless of whether |
772 | /// this service's future completes with a successful response or with an |
773 | /// error. |
774 | /// |
775 | /// This method can be thought of as an equivalent to the [`futures` |
776 | /// crate]'s [`FutureExt::then`] combinator, but acting on `Service`s that |
777 | /// _return_ futures, rather than on an individual future. Similarly to that |
778 | /// combinator, [`ServiceExt::then`] can be used to implement asynchronous |
779 | /// error recovery, by calling some asynchronous function with errors |
780 | /// returned by this service. Alternatively, it may also be used to call a |
781 | /// fallible async function with the successful response of this service. |
782 | /// |
783 | /// This method can be used to change the [`Response`] type of the service |
784 | /// into a different type. It can also be used to change the [`Error`] type |
785 | /// of the service. However, because the `then` function is not applied |
786 | /// to the errors returned by the service's [`poll_ready`] method, it must |
787 | /// be possible to convert the service's [`Error`] type into the error type |
788 | /// returned by the `then` future. This is trivial when the function |
789 | /// returns the same error type as the service, but in other cases, it can |
790 | /// be useful to use [`BoxError`] to erase differing error types. |
791 | /// |
792 | /// # Examples |
793 | /// |
794 | /// ``` |
795 | /// # use std::task::{Poll, Context}; |
796 | /// # use tower::{Service, ServiceExt}; |
797 | /// # |
798 | /// # struct DatabaseService; |
799 | /// # impl DatabaseService { |
800 | /// # fn new(address: &str) -> Self { |
801 | /// # DatabaseService |
802 | /// # } |
803 | /// # } |
804 | /// # |
805 | /// # type Record = (); |
806 | /// # type DbError = (); |
807 | /// # |
808 | /// # impl Service<u32> for DatabaseService { |
809 | /// # type Response = Record; |
810 | /// # type Error = DbError; |
811 | /// # type Future = futures_util::future::Ready<Result<Record, DbError>>; |
812 | /// # |
813 | /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
814 | /// # Poll::Ready(Ok(())) |
815 | /// # } |
816 | /// # |
817 | /// # fn call(&mut self, request: u32) -> Self::Future { |
818 | /// # futures_util::future::ready(Ok(())) |
819 | /// # } |
820 | /// # } |
821 | /// # |
822 | /// # fn main() { |
823 | /// // A service returning Result<Record, DbError> |
824 | /// let service = DatabaseService::new("127.0.0.1:8080" ); |
825 | /// |
826 | /// // An async function that attempts to recover from errors returned by the |
827 | /// // database. |
828 | /// async fn recover_from_error(error: DbError) -> Result<Record, DbError> { |
829 | /// // ... |
830 | /// # Ok(()) |
831 | /// } |
832 | /// # async { |
833 | /// |
834 | /// // If the database service returns an error, attempt to recover by |
835 | /// // calling `recover_from_error`. Otherwise, return the successful response. |
836 | /// let mut new_service = service.then(|result| async move { |
837 | /// match result { |
838 | /// Ok(record) => Ok(record), |
839 | /// Err(e) => recover_from_error(e).await, |
840 | /// } |
841 | /// }); |
842 | /// |
843 | /// // Call the new service |
844 | /// let id = 13; |
845 | /// let record = new_service |
846 | /// .ready() |
847 | /// .await? |
848 | /// .call(id) |
849 | /// .await?; |
850 | /// # Ok::<(), DbError>(()) |
851 | /// # }; |
852 | /// # } |
853 | /// ``` |
854 | /// |
855 | /// [`Future`]: crate::Service::Future |
856 | /// [`Output`]: std::future::Future::Output |
857 | /// [`futures` crate]: https://docs.rs/futures |
858 | /// [`FutureExt::then`]: https://docs.rs/futures/latest/futures/future/trait.FutureExt.html#method.then |
859 | /// [`Error`]: crate::Service::Error |
860 | /// [`Response`]: crate::Service::Response |
861 | /// [`poll_ready`]: crate::Service::poll_ready |
862 | /// [`BoxError`]: crate::BoxError |
863 | fn then<F, Response, Error, Fut>(self, f: F) -> Then<Self, F> |
864 | where |
865 | Self: Sized, |
866 | Error: From<Self::Error>, |
867 | F: FnOnce(Result<Self::Response, Self::Error>) -> Fut + Clone, |
868 | Fut: Future<Output = Result<Response, Error>>, |
869 | { |
870 | Then::new(self, f) |
871 | } |
872 | |
873 | /// Composes a function that transforms futures produced by the service. |
874 | /// |
875 | /// This takes a function or closure returning a future computed from the future returned by |
876 | /// the service's [`call`] method, as opposed to the responses produced by the future. |
877 | /// |
878 | /// # Examples |
879 | /// |
880 | /// ``` |
881 | /// # use std::task::{Poll, Context}; |
882 | /// # use tower::{Service, ServiceExt, BoxError}; |
883 | /// # |
884 | /// # struct DatabaseService; |
885 | /// # impl DatabaseService { |
886 | /// # fn new(address: &str) -> Self { |
887 | /// # DatabaseService |
888 | /// # } |
889 | /// # } |
890 | /// # |
891 | /// # type Record = (); |
892 | /// # type DbError = crate::BoxError; |
893 | /// # |
894 | /// # impl Service<u32> for DatabaseService { |
895 | /// # type Response = Record; |
896 | /// # type Error = DbError; |
897 | /// # type Future = futures_util::future::Ready<Result<Record, DbError>>; |
898 | /// # |
899 | /// # fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
900 | /// # Poll::Ready(Ok(())) |
901 | /// # } |
902 | /// # |
903 | /// # fn call(&mut self, request: u32) -> Self::Future { |
904 | /// # futures_util::future::ready(Ok(())) |
905 | /// # } |
906 | /// # } |
907 | /// # |
908 | /// # fn main() { |
909 | /// use std::time::Duration; |
910 | /// use tokio::time::timeout; |
911 | /// |
912 | /// // A service returning Result<Record, DbError> |
913 | /// let service = DatabaseService::new("127.0.0.1:8080" ); |
914 | /// # async { |
915 | /// |
916 | /// let mut new_service = service.map_future(|future| async move { |
917 | /// let res = timeout(Duration::from_secs(1), future).await?; |
918 | /// Ok::<_, BoxError>(res) |
919 | /// }); |
920 | /// |
921 | /// // Call the new service |
922 | /// let id = 13; |
923 | /// let record = new_service |
924 | /// .ready() |
925 | /// .await? |
926 | /// .call(id) |
927 | /// .await?; |
928 | /// # Ok::<(), BoxError>(()) |
929 | /// # }; |
930 | /// # } |
931 | /// ``` |
932 | /// |
933 | /// Note that normally you wouldn't implement timeouts like this and instead use [`Timeout`]. |
934 | /// |
935 | /// [`call`]: crate::Service::call |
936 | /// [`Timeout`]: crate::timeout::Timeout |
937 | fn map_future<F, Fut, Response, Error>(self, f: F) -> MapFuture<Self, F> |
938 | where |
939 | Self: Sized, |
940 | F: FnMut(Self::Future) -> Fut, |
941 | Error: From<Self::Error>, |
942 | Fut: Future<Output = Result<Response, Error>>, |
943 | { |
944 | MapFuture::new(self, f) |
945 | } |
946 | |
947 | /// Convert the service into a [`Service`] + [`Send`] trait object. |
948 | /// |
949 | /// See [`BoxService`] for more details. |
950 | /// |
951 | /// If `Self` implements the [`Clone`] trait, the [`boxed_clone`] method |
952 | /// can be used instead, to produce a boxed service which will also |
953 | /// implement [`Clone`]. |
954 | /// |
955 | /// # Example |
956 | /// |
957 | /// ``` |
958 | /// use tower::{Service, ServiceExt, BoxError, service_fn, util::BoxService}; |
959 | /// # |
960 | /// # struct Request; |
961 | /// # struct Response; |
962 | /// # impl Response { |
963 | /// # fn new() -> Self { Self } |
964 | /// # } |
965 | /// |
966 | /// let service = service_fn(|req: Request| async { |
967 | /// Ok::<_, BoxError>(Response::new()) |
968 | /// }); |
969 | /// |
970 | /// let service: BoxService<Request, Response, BoxError> = service |
971 | /// .map_request(|req| { |
972 | /// println!("received request" ); |
973 | /// req |
974 | /// }) |
975 | /// .map_response(|res| { |
976 | /// println!("response produced" ); |
977 | /// res |
978 | /// }) |
979 | /// .boxed(); |
980 | /// # let service = assert_service(service); |
981 | /// # fn assert_service<S, R>(svc: S) -> S |
982 | /// # where S: Service<R> { svc } |
983 | /// ``` |
984 | /// |
985 | /// [`Service`]: crate::Service |
986 | /// [`boxed_clone`]: Self::boxed_clone |
987 | fn boxed(self) -> BoxService<Request, Self::Response, Self::Error> |
988 | where |
989 | Self: Sized + Send + 'static, |
990 | Self::Future: Send + 'static, |
991 | { |
992 | BoxService::new(self) |
993 | } |
994 | |
995 | /// Convert the service into a [`Service`] + [`Clone`] + [`Send`] trait object. |
996 | /// |
997 | /// This is similar to the [`boxed`] method, but it requires that `Self` implement |
998 | /// [`Clone`], and the returned boxed service implements [`Clone`]. |
999 | /// See [`BoxCloneService`] for more details. |
1000 | /// |
1001 | /// # Example |
1002 | /// |
1003 | /// ``` |
1004 | /// use tower::{Service, ServiceExt, BoxError, service_fn, util::BoxCloneService}; |
1005 | /// # |
1006 | /// # struct Request; |
1007 | /// # struct Response; |
1008 | /// # impl Response { |
1009 | /// # fn new() -> Self { Self } |
1010 | /// # } |
1011 | /// |
1012 | /// let service = service_fn(|req: Request| async { |
1013 | /// Ok::<_, BoxError>(Response::new()) |
1014 | /// }); |
1015 | /// |
1016 | /// let service: BoxCloneService<Request, Response, BoxError> = service |
1017 | /// .map_request(|req| { |
1018 | /// println!("received request" ); |
1019 | /// req |
1020 | /// }) |
1021 | /// .map_response(|res| { |
1022 | /// println!("response produced" ); |
1023 | /// res |
1024 | /// }) |
1025 | /// .boxed_clone(); |
1026 | /// |
1027 | /// // The boxed service can still be cloned. |
1028 | /// service.clone(); |
1029 | /// # let service = assert_service(service); |
1030 | /// # fn assert_service<S, R>(svc: S) -> S |
1031 | /// # where S: Service<R> { svc } |
1032 | /// ``` |
1033 | /// |
1034 | /// [`Service`]: crate::Service |
1035 | /// [`boxed`]: Self::boxed |
1036 | fn boxed_clone(self) -> BoxCloneService<Request, Self::Response, Self::Error> |
1037 | where |
1038 | Self: Clone + Sized + Send + 'static, |
1039 | Self::Future: Send + 'static, |
1040 | { |
1041 | BoxCloneService::new(self) |
1042 | } |
1043 | } |
1044 | |
1045 | impl<T: ?Sized, Request> ServiceExt<Request> for T where T: tower_service::Service<Request> {} |
1046 | |
1047 | /// Convert an `Option<Layer>` into a [`Layer`]. |
1048 | /// |
1049 | /// ``` |
1050 | /// # use std::time::Duration; |
1051 | /// # use tower::Service; |
1052 | /// # use tower::builder::ServiceBuilder; |
1053 | /// use tower::util::option_layer; |
1054 | /// # use tower::timeout::TimeoutLayer; |
1055 | /// # async fn wrap<S>(svc: S) where S: Service<(), Error = &'static str> + 'static + Send, S::Future: Send { |
1056 | /// # let timeout = Some(Duration::new(10, 0)); |
1057 | /// // Layer to apply a timeout if configured |
1058 | /// let maybe_timeout = option_layer(timeout.map(TimeoutLayer::new)); |
1059 | /// |
1060 | /// ServiceBuilder::new() |
1061 | /// .layer(maybe_timeout) |
1062 | /// .service(svc); |
1063 | /// # } |
1064 | /// ``` |
1065 | /// |
1066 | /// [`Layer`]: crate::layer::Layer |
1067 | pub fn option_layer<L>(layer: Option<L>) -> Either<L, Identity> { |
1068 | if let Some(layer: L) = layer { |
1069 | Either::Left(layer) |
1070 | } else { |
1071 | Either::Right(Identity::new()) |
1072 | } |
1073 | } |
1074 | |