1#![doc(html_root_url = "https://docs.rs/want/0.3.1")]
2#![deny(warnings)]
3#![deny(missing_docs)]
4#![deny(missing_debug_implementations)]
5
6//! A Futures channel-like utility to signal when a value is wanted.
7//!
8//! Futures are supposed to be lazy, and only starting work if `Future::poll`
9//! is called. The same is true of `Stream`s, but when using a channel as
10//! a `Stream`, it can be hard to know if the receiver is ready for the next
11//! value.
12//!
13//! Put another way, given a `(tx, rx)` from `futures::sync::mpsc::channel()`,
14//! how can the sender (`tx`) know when the receiver (`rx`) actually wants more
15//! work to be produced? Just because there is room in the channel buffer
16//! doesn't mean the work would be used by the receiver.
17//!
18//! This is where something like `want` comes in. Added to a channel, you can
19//! make sure that the `tx` only creates the message and sends it when the `rx`
20//! has `poll()` for it, and the buffer was empty.
21//!
22//! # Example
23//!
24//! ```nightly
25//! # //#![feature(async_await)]
26//! extern crate want;
27//!
28//! # fn spawn<T>(_t: T) {}
29//! # fn we_still_want_message() -> bool { true }
30//! # fn mpsc_channel() -> (Tx, Rx) { (Tx, Rx) }
31//! # struct Tx;
32//! # impl Tx { fn send<T>(&mut self, _: T) {} }
33//! # struct Rx;
34//! # impl Rx { async fn recv(&mut self) -> Option<Expensive> { Some(Expensive) } }
35//!
36//! // Some message that is expensive to produce.
37//! struct Expensive;
38//!
39//! // Some futures-aware MPSC channel...
40//! let (mut tx, mut rx) = mpsc_channel();
41//!
42//! // And our `want` channel!
43//! let (mut gv, mut tk) = want::new();
44//!
45//!
46//! // Our receiving task...
47//! spawn(async move {
48//! // Maybe something comes up that prevents us from ever
49//! // using the expensive message.
50//! //
51//! // Without `want`, the "send" task may have started to
52//! // produce the expensive message even though we wouldn't
53//! // be able to use it.
54//! if !we_still_want_message() {
55//! return;
56//! }
57//!
58//! // But we can use it! So tell the `want` channel.
59//! tk.want();
60//!
61//! match rx.recv().await {
62//! Some(_msg) => println!("got a message"),
63//! None => println!("DONE"),
64//! }
65//! });
66//!
67//! // Our sending task
68//! spawn(async move {
69//! // It's expensive to create a new message, so we wait until the
70//! // receiving end truly *wants* the message.
71//! if let Err(_closed) = gv.want().await {
72//! // Looks like they will never want it...
73//! return;
74//! }
75//!
76//! // They want it, let's go!
77//! tx.send(Expensive);
78//! });
79//!
80//! # fn main() {}
81//! ```
82
83use std::fmt;
84use std::future::Future;
85use std::mem;
86use std::pin::Pin;
87use std::sync::Arc;
88use std::sync::atomic::AtomicUsize;
89// SeqCst is the only ordering used to ensure accessing the state and
90// TryLock are never re-ordered.
91use std::sync::atomic::Ordering::SeqCst;
92use std::task::{self, Poll, Waker};
93
94
95use try_lock::TryLock;
96
97/// Create a new `want` channel.
98pub fn new() -> (Giver, Taker) {
99 let inner: Arc = Arc::new(data:Inner {
100 state: AtomicUsize::new(State::Idle.into()),
101 task: TryLock::new(val:None),
102 });
103 let inner2: Arc = inner.clone();
104 (
105 Giver {
106 inner,
107 },
108 Taker {
109 inner: inner2,
110 },
111 )
112}
113
114/// An entity that gives a value when wanted.
115pub struct Giver {
116 inner: Arc<Inner>,
117}
118
119/// An entity that wants a value.
120pub struct Taker {
121 inner: Arc<Inner>,
122}
123
124/// A cloneable `Giver`.
125///
126/// It differs from `Giver` in that you cannot poll for `want`. It's only
127/// usable as a cancellation watcher.
128#[derive(Clone)]
129pub struct SharedGiver {
130 inner: Arc<Inner>,
131}
132
133/// The `Taker` has canceled its interest in a value.
134pub struct Closed {
135 _inner: (),
136}
137
138#[derive(Clone, Copy, Debug)]
139enum State {
140 Idle,
141 Want,
142 Give,
143 Closed,
144}
145
146impl From<State> for usize {
147 fn from(s: State) -> usize {
148 match s {
149 State::Idle => 0,
150 State::Want => 1,
151 State::Give => 2,
152 State::Closed => 3,
153 }
154 }
155}
156
157impl From<usize> for State {
158 fn from(num: usize) -> State {
159 match num {
160 0 => State::Idle,
161 1 => State::Want,
162 2 => State::Give,
163 3 => State::Closed,
164 _ => unreachable!("unknown state: {}", num),
165 }
166 }
167}
168
169struct Inner {
170 state: AtomicUsize,
171 task: TryLock<Option<Waker>>,
172}
173
174// ===== impl Giver ======
175
176impl Giver {
177 /// Returns a `Future` that fulfills when the `Taker` has done some action.
178 pub fn want(&mut self) -> impl Future<Output = Result<(), Closed>> + '_ {
179 Want(self)
180 }
181
182 /// Poll whether the `Taker` has registered interest in another value.
183 ///
184 /// - If the `Taker` has called `want()`, this returns `Async::Ready(())`.
185 /// - If the `Taker` has not called `want()` since last poll, this
186 /// returns `Async::NotReady`, and parks the current task to be notified
187 /// when the `Taker` does call `want()`.
188 /// - If the `Taker` has canceled (or dropped), this returns `Closed`.
189 ///
190 /// After knowing that the Taker is wanting, the state can be reset by
191 /// calling [`give`](Giver::give).
192 pub fn poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Closed>> {
193 loop {
194 let state = self.inner.state.load(SeqCst).into();
195 match state {
196 State::Want => {
197 return Poll::Ready(Ok(()));
198 },
199 State::Closed => {
200 return Poll::Ready(Err(Closed { _inner: () }));
201 },
202 State::Idle | State::Give => {
203 // Taker doesn't want anything yet, so park.
204 if let Some(mut locked) = self.inner.task.try_lock_explicit(SeqCst, SeqCst) {
205
206 // While we have the lock, try to set to GIVE.
207 let old = self.inner.state.compare_exchange(
208 state.into(),
209 State::Give.into(),
210 SeqCst,
211 SeqCst,
212 );
213 // If it's still the first state (Idle or Give), park current task.
214 if old == Ok(state.into()) {
215 let park = locked.as_ref()
216 .map(|w| !w.will_wake(cx.waker()))
217 .unwrap_or(true);
218 if park {
219 let old = mem::replace(&mut *locked, Some(cx.waker().clone()));
220 drop(locked);
221 if let Some(prev_task) = old {
222 // there was an old task parked here.
223 // it might be waiting to be notified,
224 // so poke it before dropping.
225 prev_task.wake();
226 };
227 }
228 return Poll::Pending;
229 }
230 // Otherwise, something happened! Go around the loop again.
231 } else {
232 // if we couldn't take the lock, then a Taker has it.
233 // The *ONLY* reason is because it is in the process of notifying us
234 // of its want.
235 //
236 // We need to loop again to see what state it was changed to.
237 }
238 },
239 }
240 }
241 }
242
243 /// Mark the state as idle, if the Taker currently is wanting.
244 ///
245 /// Returns true if Taker was wanting, false otherwise.
246 #[inline]
247 pub fn give(&self) -> bool {
248 // only set to IDLE if it is still Want
249 let old = self.inner.state.compare_exchange(
250 State::Want.into(),
251 State::Idle.into(),
252 SeqCst,
253 SeqCst);
254 old == Ok(State::Want.into())
255 }
256
257 /// Check if the `Taker` has called `want()` without parking a task.
258 ///
259 /// This is safe to call outside of a futures task context, but other
260 /// means of being notified is left to the user.
261 #[inline]
262 pub fn is_wanting(&self) -> bool {
263 self.inner.state.load(SeqCst) == State::Want.into()
264 }
265
266
267 /// Check if the `Taker` has canceled interest without parking a task.
268 #[inline]
269 pub fn is_canceled(&self) -> bool {
270 self.inner.state.load(SeqCst) == State::Closed.into()
271 }
272
273 /// Converts this into a `SharedGiver`.
274 #[inline]
275 pub fn shared(self) -> SharedGiver {
276 SharedGiver {
277 inner: self.inner,
278 }
279 }
280}
281
282impl fmt::Debug for Giver {
283 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
284 f&mut DebugStruct<'_, '_>.debug_struct("Giver")
285 .field(name:"state", &self.inner.state())
286 .finish()
287 }
288}
289
290// ===== impl SharedGiver ======
291
292impl SharedGiver {
293 /// Check if the `Taker` has called `want()` without parking a task.
294 ///
295 /// This is safe to call outside of a futures task context, but other
296 /// means of being notified is left to the user.
297 #[inline]
298 pub fn is_wanting(&self) -> bool {
299 self.inner.state.load(order:SeqCst) == State::Want.into()
300 }
301
302
303 /// Check if the `Taker` has canceled interest without parking a task.
304 #[inline]
305 pub fn is_canceled(&self) -> bool {
306 self.inner.state.load(order:SeqCst) == State::Closed.into()
307 }
308}
309
310impl fmt::Debug for SharedGiver {
311 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
312 f&mut DebugStruct<'_, '_>.debug_struct("SharedGiver")
313 .field(name:"state", &self.inner.state())
314 .finish()
315 }
316}
317
318// ===== impl Taker ======
319
320impl Taker {
321 /// Signal to the `Giver` that the want is canceled.
322 ///
323 /// This is useful to tell that the channel is closed if you cannot
324 /// drop the value yet.
325 #[inline]
326 pub fn cancel(&mut self) {
327 self.signal(State::Closed)
328 }
329
330 /// Signal to the `Giver` that a value is wanted.
331 #[inline]
332 pub fn want(&mut self) {
333 debug_assert!(
334 self.inner.state.load(SeqCst) != State::Closed.into(),
335 "want called after cancel"
336 );
337 self.signal(State::Want)
338 }
339
340 #[inline]
341 fn signal(&mut self, state: State) {
342 let old_state = self.inner.state.swap(state.into(), SeqCst).into();
343 match old_state {
344 State::Idle | State::Want | State::Closed => (),
345 State::Give => {
346 loop {
347 if let Some(mut locked) = self.inner.task.try_lock_explicit(SeqCst, SeqCst) {
348 if let Some(task) = locked.take() {
349 drop(locked);
350 task.wake();
351 }
352 return;
353 } else {
354 // if we couldn't take the lock, then a Giver has it.
355 // The *ONLY* reason is because it is in the process of parking.
356 //
357 // We need to loop and take the lock so we can notify this task.
358 }
359 }
360 },
361 }
362 }
363}
364
365impl Drop for Taker {
366 #[inline]
367 fn drop(&mut self) {
368 self.signal(State::Closed);
369 }
370}
371
372impl fmt::Debug for Taker {
373 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
374 f&mut DebugStruct<'_, '_>.debug_struct("Taker")
375 .field(name:"state", &self.inner.state())
376 .finish()
377 }
378}
379
380// ===== impl Closed ======
381
382impl fmt::Debug for Closed {
383 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
384 fDebugStruct<'_, '_>.debug_struct(name:"Closed")
385 .finish()
386 }
387}
388
389// ===== impl Inner ======
390
391impl Inner {
392 #[inline]
393 fn state(&self) -> State {
394 self.state.load(order:SeqCst).into()
395 }
396}
397
398// ===== impl PollFn ======
399
400struct Want<'a>(&'a mut Giver);
401
402
403impl Future for Want<'_> {
404 type Output = Result<(), Closed>;
405
406 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
407 self.0.poll_want(cx)
408 }
409}
410
411#[cfg(test)]
412mod tests {
413 use std::thread;
414 use tokio_sync::oneshot;
415 use super::*;
416
417 fn block_on<F: Future>(f: F) -> F::Output {
418 tokio_executor::enter()
419 .expect("block_on enter")
420 .block_on(f)
421 }
422
423 #[test]
424 fn want_ready() {
425 let (mut gv, mut tk) = new();
426
427 tk.want();
428
429 block_on(gv.want()).unwrap();
430 }
431
432 #[test]
433 fn want_notify_0() {
434 let (mut gv, mut tk) = new();
435 let (tx, rx) = oneshot::channel();
436
437 thread::spawn(move || {
438 tk.want();
439 // use a oneshot to keep this thread alive
440 // until other thread was notified of want
441 block_on(rx).expect("rx");
442 });
443
444 block_on(gv.want()).expect("want");
445
446 assert!(gv.is_wanting(), "still wanting after poll_want success");
447 assert!(gv.give(), "give is true when wanting");
448
449 assert!(!gv.is_wanting(), "no longer wanting after give");
450 assert!(!gv.is_canceled(), "give doesn't cancel");
451
452 assert!(!gv.give(), "give is false if not wanting");
453
454 tx.send(()).expect("tx");
455 }
456
457 /*
458 /// This tests that if the Giver moves tasks after parking,
459 /// it will still wake up the correct task.
460 #[test]
461 fn want_notify_moving_tasks() {
462 use std::sync::Arc;
463 use futures::executor::{spawn, Notify, NotifyHandle};
464
465 struct WantNotify;
466
467 impl Notify for WantNotify {
468 fn notify(&self, _id: usize) {
469 }
470 }
471
472 fn n() -> NotifyHandle {
473 Arc::new(WantNotify).into()
474 }
475
476 let (mut gv, mut tk) = new();
477
478 let mut s = spawn(poll_fn(move || {
479 gv.poll_want()
480 }));
481
482 // Register with t1 as the task::current()
483 let t1 = n();
484 assert!(s.poll_future_notify(&t1, 1).unwrap().is_not_ready());
485
486 thread::spawn(move || {
487 thread::sleep(::std::time::Duration::from_millis(100));
488 tk.want();
489 });
490
491 // And now, move to a ThreadNotify task.
492 s.into_inner().wait().expect("poll_want");
493 }
494 */
495
496 #[test]
497 fn cancel() {
498 // explicit
499 let (mut gv, mut tk) = new();
500
501 assert!(!gv.is_canceled());
502
503 tk.cancel();
504
505 assert!(gv.is_canceled());
506 block_on(gv.want()).unwrap_err();
507
508 // implicit
509 let (mut gv, tk) = new();
510
511 assert!(!gv.is_canceled());
512
513 drop(tk);
514
515 assert!(gv.is_canceled());
516 block_on(gv.want()).unwrap_err();
517
518 // notifies
519 let (mut gv, tk) = new();
520
521 thread::spawn(move || {
522 let _tk = tk;
523 // and dropped
524 });
525
526 block_on(gv.want()).unwrap_err();
527 }
528
529 /*
530 #[test]
531 fn stress() {
532 let nthreads = 5;
533 let nwants = 100;
534
535 for _ in 0..nthreads {
536 let (mut gv, mut tk) = new();
537 let (mut tx, mut rx) = mpsc::channel(0);
538
539 // rx thread
540 thread::spawn(move || {
541 let mut cnt = 0;
542 poll_fn(move || {
543 while cnt < nwants {
544 let n = match rx.poll().expect("rx poll") {
545 Async::Ready(n) => n.expect("rx opt"),
546 Async::NotReady => {
547 tk.want();
548 return Ok(Async::NotReady);
549 },
550 };
551 assert_eq!(cnt, n);
552 cnt += 1;
553 }
554 Ok::<_, ()>(Async::Ready(()))
555 }).wait().expect("rx wait");
556 });
557
558 // tx thread
559 thread::spawn(move || {
560 let mut cnt = 0;
561 let nsent = poll_fn(move || {
562 loop {
563 while let Ok(()) = tx.try_send(cnt) {
564 cnt += 1;
565 }
566 match gv.poll_want() {
567 Ok(Async::Ready(_)) => (),
568 Ok(Async::NotReady) => return Ok::<_, ()>(Async::NotReady),
569 Err(_) => return Ok(Async::Ready(cnt)),
570 }
571 }
572 }).wait().expect("tx wait");
573
574 assert_eq!(nsent, nwants);
575 }).join().expect("thread join");
576 }
577 }
578 */
579}
580