1 | #![doc (html_root_url = "https://docs.rs/want/0.3.1" )] |
2 | #![deny (warnings)] |
3 | #![deny (missing_docs)] |
4 | #![deny (missing_debug_implementations)] |
5 | |
6 | //! A Futures channel-like utility to signal when a value is wanted. |
7 | //! |
8 | //! Futures are supposed to be lazy, and only starting work if `Future::poll` |
9 | //! is called. The same is true of `Stream`s, but when using a channel as |
10 | //! a `Stream`, it can be hard to know if the receiver is ready for the next |
11 | //! value. |
12 | //! |
13 | //! Put another way, given a `(tx, rx)` from `futures::sync::mpsc::channel()`, |
14 | //! how can the sender (`tx`) know when the receiver (`rx`) actually wants more |
15 | //! work to be produced? Just because there is room in the channel buffer |
16 | //! doesn't mean the work would be used by the receiver. |
17 | //! |
18 | //! This is where something like `want` comes in. Added to a channel, you can |
19 | //! make sure that the `tx` only creates the message and sends it when the `rx` |
20 | //! has `poll()` for it, and the buffer was empty. |
21 | //! |
22 | //! # Example |
23 | //! |
24 | //! ```nightly |
25 | //! # //#![feature(async_await)] |
26 | //! extern crate want; |
27 | //! |
28 | //! # fn spawn<T>(_t: T) {} |
29 | //! # fn we_still_want_message() -> bool { true } |
30 | //! # fn mpsc_channel() -> (Tx, Rx) { (Tx, Rx) } |
31 | //! # struct Tx; |
32 | //! # impl Tx { fn send<T>(&mut self, _: T) {} } |
33 | //! # struct Rx; |
34 | //! # impl Rx { async fn recv(&mut self) -> Option<Expensive> { Some(Expensive) } } |
35 | //! |
36 | //! // Some message that is expensive to produce. |
37 | //! struct Expensive; |
38 | //! |
39 | //! // Some futures-aware MPSC channel... |
40 | //! let (mut tx, mut rx) = mpsc_channel(); |
41 | //! |
42 | //! // And our `want` channel! |
43 | //! let (mut gv, mut tk) = want::new(); |
44 | //! |
45 | //! |
46 | //! // Our receiving task... |
47 | //! spawn(async move { |
48 | //! // Maybe something comes up that prevents us from ever |
49 | //! // using the expensive message. |
50 | //! // |
51 | //! // Without `want`, the "send" task may have started to |
52 | //! // produce the expensive message even though we wouldn't |
53 | //! // be able to use it. |
54 | //! if !we_still_want_message() { |
55 | //! return; |
56 | //! } |
57 | //! |
58 | //! // But we can use it! So tell the `want` channel. |
59 | //! tk.want(); |
60 | //! |
61 | //! match rx.recv().await { |
62 | //! Some(_msg) => println!("got a message"), |
63 | //! None => println!("DONE"), |
64 | //! } |
65 | //! }); |
66 | //! |
67 | //! // Our sending task |
68 | //! spawn(async move { |
69 | //! // It's expensive to create a new message, so we wait until the |
70 | //! // receiving end truly *wants* the message. |
71 | //! if let Err(_closed) = gv.want().await { |
72 | //! // Looks like they will never want it... |
73 | //! return; |
74 | //! } |
75 | //! |
76 | //! // They want it, let's go! |
77 | //! tx.send(Expensive); |
78 | //! }); |
79 | //! |
80 | //! # fn main() {} |
81 | //! ``` |
82 | |
83 | use std::fmt; |
84 | use std::future::Future; |
85 | use std::mem; |
86 | use std::pin::Pin; |
87 | use std::sync::Arc; |
88 | use std::sync::atomic::AtomicUsize; |
89 | // SeqCst is the only ordering used to ensure accessing the state and |
90 | // TryLock are never re-ordered. |
91 | use std::sync::atomic::Ordering::SeqCst; |
92 | use std::task::{self, Poll, Waker}; |
93 | |
94 | |
95 | use try_lock::TryLock; |
96 | |
97 | /// Create a new `want` channel. |
98 | pub fn new() -> (Giver, Taker) { |
99 | let inner: Arc = Arc::new(data:Inner { |
100 | state: AtomicUsize::new(State::Idle.into()), |
101 | task: TryLock::new(val:None), |
102 | }); |
103 | let inner2: Arc = inner.clone(); |
104 | ( |
105 | Giver { |
106 | inner, |
107 | }, |
108 | Taker { |
109 | inner: inner2, |
110 | }, |
111 | ) |
112 | } |
113 | |
114 | /// An entity that gives a value when wanted. |
115 | pub struct Giver { |
116 | inner: Arc<Inner>, |
117 | } |
118 | |
119 | /// An entity that wants a value. |
120 | pub struct Taker { |
121 | inner: Arc<Inner>, |
122 | } |
123 | |
124 | /// A cloneable `Giver`. |
125 | /// |
126 | /// It differs from `Giver` in that you cannot poll for `want`. It's only |
127 | /// usable as a cancellation watcher. |
128 | #[derive (Clone)] |
129 | pub struct SharedGiver { |
130 | inner: Arc<Inner>, |
131 | } |
132 | |
133 | /// The `Taker` has canceled its interest in a value. |
134 | pub struct Closed { |
135 | _inner: (), |
136 | } |
137 | |
138 | #[derive (Clone, Copy, Debug)] |
139 | enum State { |
140 | Idle, |
141 | Want, |
142 | Give, |
143 | Closed, |
144 | } |
145 | |
146 | impl From<State> for usize { |
147 | fn from(s: State) -> usize { |
148 | match s { |
149 | State::Idle => 0, |
150 | State::Want => 1, |
151 | State::Give => 2, |
152 | State::Closed => 3, |
153 | } |
154 | } |
155 | } |
156 | |
157 | impl From<usize> for State { |
158 | fn from(num: usize) -> State { |
159 | match num { |
160 | 0 => State::Idle, |
161 | 1 => State::Want, |
162 | 2 => State::Give, |
163 | 3 => State::Closed, |
164 | _ => unreachable!("unknown state: {}" , num), |
165 | } |
166 | } |
167 | } |
168 | |
169 | struct Inner { |
170 | state: AtomicUsize, |
171 | task: TryLock<Option<Waker>>, |
172 | } |
173 | |
174 | // ===== impl Giver ====== |
175 | |
176 | impl Giver { |
177 | /// Returns a `Future` that fulfills when the `Taker` has done some action. |
178 | pub fn want(&mut self) -> impl Future<Output = Result<(), Closed>> + '_ { |
179 | Want(self) |
180 | } |
181 | |
182 | /// Poll whether the `Taker` has registered interest in another value. |
183 | /// |
184 | /// - If the `Taker` has called `want()`, this returns `Async::Ready(())`. |
185 | /// - If the `Taker` has not called `want()` since last poll, this |
186 | /// returns `Async::NotReady`, and parks the current task to be notified |
187 | /// when the `Taker` does call `want()`. |
188 | /// - If the `Taker` has canceled (or dropped), this returns `Closed`. |
189 | /// |
190 | /// After knowing that the Taker is wanting, the state can be reset by |
191 | /// calling [`give`](Giver::give). |
192 | pub fn poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Closed>> { |
193 | loop { |
194 | let state = self.inner.state.load(SeqCst).into(); |
195 | match state { |
196 | State::Want => { |
197 | return Poll::Ready(Ok(())); |
198 | }, |
199 | State::Closed => { |
200 | return Poll::Ready(Err(Closed { _inner: () })); |
201 | }, |
202 | State::Idle | State::Give => { |
203 | // Taker doesn't want anything yet, so park. |
204 | if let Some(mut locked) = self.inner.task.try_lock_explicit(SeqCst, SeqCst) { |
205 | |
206 | // While we have the lock, try to set to GIVE. |
207 | let old = self.inner.state.compare_exchange( |
208 | state.into(), |
209 | State::Give.into(), |
210 | SeqCst, |
211 | SeqCst, |
212 | ); |
213 | // If it's still the first state (Idle or Give), park current task. |
214 | if old == Ok(state.into()) { |
215 | let park = locked.as_ref() |
216 | .map(|w| !w.will_wake(cx.waker())) |
217 | .unwrap_or(true); |
218 | if park { |
219 | let old = mem::replace(&mut *locked, Some(cx.waker().clone())); |
220 | drop(locked); |
221 | if let Some(prev_task) = old { |
222 | // there was an old task parked here. |
223 | // it might be waiting to be notified, |
224 | // so poke it before dropping. |
225 | prev_task.wake(); |
226 | }; |
227 | } |
228 | return Poll::Pending; |
229 | } |
230 | // Otherwise, something happened! Go around the loop again. |
231 | } else { |
232 | // if we couldn't take the lock, then a Taker has it. |
233 | // The *ONLY* reason is because it is in the process of notifying us |
234 | // of its want. |
235 | // |
236 | // We need to loop again to see what state it was changed to. |
237 | } |
238 | }, |
239 | } |
240 | } |
241 | } |
242 | |
243 | /// Mark the state as idle, if the Taker currently is wanting. |
244 | /// |
245 | /// Returns true if Taker was wanting, false otherwise. |
246 | #[inline ] |
247 | pub fn give(&self) -> bool { |
248 | // only set to IDLE if it is still Want |
249 | let old = self.inner.state.compare_exchange( |
250 | State::Want.into(), |
251 | State::Idle.into(), |
252 | SeqCst, |
253 | SeqCst); |
254 | old == Ok(State::Want.into()) |
255 | } |
256 | |
257 | /// Check if the `Taker` has called `want()` without parking a task. |
258 | /// |
259 | /// This is safe to call outside of a futures task context, but other |
260 | /// means of being notified is left to the user. |
261 | #[inline ] |
262 | pub fn is_wanting(&self) -> bool { |
263 | self.inner.state.load(SeqCst) == State::Want.into() |
264 | } |
265 | |
266 | |
267 | /// Check if the `Taker` has canceled interest without parking a task. |
268 | #[inline ] |
269 | pub fn is_canceled(&self) -> bool { |
270 | self.inner.state.load(SeqCst) == State::Closed.into() |
271 | } |
272 | |
273 | /// Converts this into a `SharedGiver`. |
274 | #[inline ] |
275 | pub fn shared(self) -> SharedGiver { |
276 | SharedGiver { |
277 | inner: self.inner, |
278 | } |
279 | } |
280 | } |
281 | |
282 | impl fmt::Debug for Giver { |
283 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
284 | f&mut DebugStruct<'_, '_>.debug_struct("Giver" ) |
285 | .field(name:"state" , &self.inner.state()) |
286 | .finish() |
287 | } |
288 | } |
289 | |
290 | // ===== impl SharedGiver ====== |
291 | |
292 | impl SharedGiver { |
293 | /// Check if the `Taker` has called `want()` without parking a task. |
294 | /// |
295 | /// This is safe to call outside of a futures task context, but other |
296 | /// means of being notified is left to the user. |
297 | #[inline ] |
298 | pub fn is_wanting(&self) -> bool { |
299 | self.inner.state.load(order:SeqCst) == State::Want.into() |
300 | } |
301 | |
302 | |
303 | /// Check if the `Taker` has canceled interest without parking a task. |
304 | #[inline ] |
305 | pub fn is_canceled(&self) -> bool { |
306 | self.inner.state.load(order:SeqCst) == State::Closed.into() |
307 | } |
308 | } |
309 | |
310 | impl fmt::Debug for SharedGiver { |
311 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
312 | f&mut DebugStruct<'_, '_>.debug_struct("SharedGiver" ) |
313 | .field(name:"state" , &self.inner.state()) |
314 | .finish() |
315 | } |
316 | } |
317 | |
318 | // ===== impl Taker ====== |
319 | |
320 | impl Taker { |
321 | /// Signal to the `Giver` that the want is canceled. |
322 | /// |
323 | /// This is useful to tell that the channel is closed if you cannot |
324 | /// drop the value yet. |
325 | #[inline ] |
326 | pub fn cancel(&mut self) { |
327 | self.signal(State::Closed) |
328 | } |
329 | |
330 | /// Signal to the `Giver` that a value is wanted. |
331 | #[inline ] |
332 | pub fn want(&mut self) { |
333 | debug_assert!( |
334 | self.inner.state.load(SeqCst) != State::Closed.into(), |
335 | "want called after cancel" |
336 | ); |
337 | self.signal(State::Want) |
338 | } |
339 | |
340 | #[inline ] |
341 | fn signal(&mut self, state: State) { |
342 | let old_state = self.inner.state.swap(state.into(), SeqCst).into(); |
343 | match old_state { |
344 | State::Idle | State::Want | State::Closed => (), |
345 | State::Give => { |
346 | loop { |
347 | if let Some(mut locked) = self.inner.task.try_lock_explicit(SeqCst, SeqCst) { |
348 | if let Some(task) = locked.take() { |
349 | drop(locked); |
350 | task.wake(); |
351 | } |
352 | return; |
353 | } else { |
354 | // if we couldn't take the lock, then a Giver has it. |
355 | // The *ONLY* reason is because it is in the process of parking. |
356 | // |
357 | // We need to loop and take the lock so we can notify this task. |
358 | } |
359 | } |
360 | }, |
361 | } |
362 | } |
363 | } |
364 | |
365 | impl Drop for Taker { |
366 | #[inline ] |
367 | fn drop(&mut self) { |
368 | self.signal(State::Closed); |
369 | } |
370 | } |
371 | |
372 | impl fmt::Debug for Taker { |
373 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
374 | f&mut DebugStruct<'_, '_>.debug_struct("Taker" ) |
375 | .field(name:"state" , &self.inner.state()) |
376 | .finish() |
377 | } |
378 | } |
379 | |
380 | // ===== impl Closed ====== |
381 | |
382 | impl fmt::Debug for Closed { |
383 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
384 | fDebugStruct<'_, '_>.debug_struct(name:"Closed" ) |
385 | .finish() |
386 | } |
387 | } |
388 | |
389 | // ===== impl Inner ====== |
390 | |
391 | impl Inner { |
392 | #[inline ] |
393 | fn state(&self) -> State { |
394 | self.state.load(order:SeqCst).into() |
395 | } |
396 | } |
397 | |
398 | // ===== impl PollFn ====== |
399 | |
400 | struct Want<'a>(&'a mut Giver); |
401 | |
402 | |
403 | impl Future for Want<'_> { |
404 | type Output = Result<(), Closed>; |
405 | |
406 | fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { |
407 | self.0.poll_want(cx) |
408 | } |
409 | } |
410 | |
411 | #[cfg (test)] |
412 | mod tests { |
413 | use std::thread; |
414 | use tokio_sync::oneshot; |
415 | use super::*; |
416 | |
417 | fn block_on<F: Future>(f: F) -> F::Output { |
418 | tokio_executor::enter() |
419 | .expect("block_on enter" ) |
420 | .block_on(f) |
421 | } |
422 | |
423 | #[test ] |
424 | fn want_ready() { |
425 | let (mut gv, mut tk) = new(); |
426 | |
427 | tk.want(); |
428 | |
429 | block_on(gv.want()).unwrap(); |
430 | } |
431 | |
432 | #[test ] |
433 | fn want_notify_0() { |
434 | let (mut gv, mut tk) = new(); |
435 | let (tx, rx) = oneshot::channel(); |
436 | |
437 | thread::spawn(move || { |
438 | tk.want(); |
439 | // use a oneshot to keep this thread alive |
440 | // until other thread was notified of want |
441 | block_on(rx).expect("rx" ); |
442 | }); |
443 | |
444 | block_on(gv.want()).expect("want" ); |
445 | |
446 | assert!(gv.is_wanting(), "still wanting after poll_want success" ); |
447 | assert!(gv.give(), "give is true when wanting" ); |
448 | |
449 | assert!(!gv.is_wanting(), "no longer wanting after give" ); |
450 | assert!(!gv.is_canceled(), "give doesn't cancel" ); |
451 | |
452 | assert!(!gv.give(), "give is false if not wanting" ); |
453 | |
454 | tx.send(()).expect("tx" ); |
455 | } |
456 | |
457 | /* |
458 | /// This tests that if the Giver moves tasks after parking, |
459 | /// it will still wake up the correct task. |
460 | #[test] |
461 | fn want_notify_moving_tasks() { |
462 | use std::sync::Arc; |
463 | use futures::executor::{spawn, Notify, NotifyHandle}; |
464 | |
465 | struct WantNotify; |
466 | |
467 | impl Notify for WantNotify { |
468 | fn notify(&self, _id: usize) { |
469 | } |
470 | } |
471 | |
472 | fn n() -> NotifyHandle { |
473 | Arc::new(WantNotify).into() |
474 | } |
475 | |
476 | let (mut gv, mut tk) = new(); |
477 | |
478 | let mut s = spawn(poll_fn(move || { |
479 | gv.poll_want() |
480 | })); |
481 | |
482 | // Register with t1 as the task::current() |
483 | let t1 = n(); |
484 | assert!(s.poll_future_notify(&t1, 1).unwrap().is_not_ready()); |
485 | |
486 | thread::spawn(move || { |
487 | thread::sleep(::std::time::Duration::from_millis(100)); |
488 | tk.want(); |
489 | }); |
490 | |
491 | // And now, move to a ThreadNotify task. |
492 | s.into_inner().wait().expect("poll_want"); |
493 | } |
494 | */ |
495 | |
496 | #[test ] |
497 | fn cancel() { |
498 | // explicit |
499 | let (mut gv, mut tk) = new(); |
500 | |
501 | assert!(!gv.is_canceled()); |
502 | |
503 | tk.cancel(); |
504 | |
505 | assert!(gv.is_canceled()); |
506 | block_on(gv.want()).unwrap_err(); |
507 | |
508 | // implicit |
509 | let (mut gv, tk) = new(); |
510 | |
511 | assert!(!gv.is_canceled()); |
512 | |
513 | drop(tk); |
514 | |
515 | assert!(gv.is_canceled()); |
516 | block_on(gv.want()).unwrap_err(); |
517 | |
518 | // notifies |
519 | let (mut gv, tk) = new(); |
520 | |
521 | thread::spawn(move || { |
522 | let _tk = tk; |
523 | // and dropped |
524 | }); |
525 | |
526 | block_on(gv.want()).unwrap_err(); |
527 | } |
528 | |
529 | /* |
530 | #[test] |
531 | fn stress() { |
532 | let nthreads = 5; |
533 | let nwants = 100; |
534 | |
535 | for _ in 0..nthreads { |
536 | let (mut gv, mut tk) = new(); |
537 | let (mut tx, mut rx) = mpsc::channel(0); |
538 | |
539 | // rx thread |
540 | thread::spawn(move || { |
541 | let mut cnt = 0; |
542 | poll_fn(move || { |
543 | while cnt < nwants { |
544 | let n = match rx.poll().expect("rx poll") { |
545 | Async::Ready(n) => n.expect("rx opt"), |
546 | Async::NotReady => { |
547 | tk.want(); |
548 | return Ok(Async::NotReady); |
549 | }, |
550 | }; |
551 | assert_eq!(cnt, n); |
552 | cnt += 1; |
553 | } |
554 | Ok::<_, ()>(Async::Ready(())) |
555 | }).wait().expect("rx wait"); |
556 | }); |
557 | |
558 | // tx thread |
559 | thread::spawn(move || { |
560 | let mut cnt = 0; |
561 | let nsent = poll_fn(move || { |
562 | loop { |
563 | while let Ok(()) = tx.try_send(cnt) { |
564 | cnt += 1; |
565 | } |
566 | match gv.poll_want() { |
567 | Ok(Async::Ready(_)) => (), |
568 | Ok(Async::NotReady) => return Ok::<_, ()>(Async::NotReady), |
569 | Err(_) => return Ok(Async::Ready(cnt)), |
570 | } |
571 | } |
572 | }).wait().expect("tx wait"); |
573 | |
574 | assert_eq!(nsent, nwants); |
575 | }).join().expect("thread join"); |
576 | } |
577 | } |
578 | */ |
579 | } |
580 | |