1#![warn(rust_2018_idioms)]
2#![cfg(feature = "sync")]
3
4#[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
5use wasm_bindgen_test::wasm_bindgen_test as test;
6#[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
7use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test;
8
9#[cfg(not(all(target_family = "wasm", not(target_os = "wasi"))))]
10use tokio::test as maybe_tokio_test;
11
12use std::task::Poll;
13
14use futures::future::FutureExt;
15
16use tokio::sync::{RwLock, RwLockWriteGuard};
17use tokio_test::task::spawn;
18use tokio_test::{assert_pending, assert_ready};
19
20#[test]
21fn into_inner() {
22 let rwlock = RwLock::new(42);
23 assert_eq!(rwlock.into_inner(), 42);
24}
25
26// multiple reads should be Ready
27#[test]
28fn read_shared() {
29 let rwlock = RwLock::new(100);
30
31 let mut t1 = spawn(rwlock.read());
32 let _g1 = assert_ready!(t1.poll());
33 let mut t2 = spawn(rwlock.read());
34 let _g2 = assert_ready!(t2.poll());
35}
36
37// When there is an active shared owner, exclusive access should not be possible
38#[test]
39fn write_shared_pending() {
40 let rwlock = RwLock::new(100);
41 let mut t1 = spawn(rwlock.read());
42
43 let _g1 = assert_ready!(t1.poll());
44 let mut t2 = spawn(rwlock.write());
45 assert_pending!(t2.poll());
46}
47
48// When there is an active exclusive owner, subsequent exclusive access should not be possible
49#[test]
50fn read_exclusive_pending() {
51 let rwlock = RwLock::new(100);
52 let mut t1 = spawn(rwlock.write());
53
54 let _g1 = assert_ready!(t1.poll());
55 let mut t2 = spawn(rwlock.read());
56 assert_pending!(t2.poll());
57}
58
59// If the max shared access is reached and subsequent shared access is pending
60// should be made available when one of the shared accesses is dropped
61#[test]
62fn exhaust_reading() {
63 let rwlock = RwLock::with_max_readers(100, 1024);
64 let mut reads = Vec::new();
65 loop {
66 let mut t = spawn(rwlock.read());
67 match t.poll() {
68 Poll::Ready(guard) => reads.push(guard),
69 Poll::Pending => break,
70 }
71 }
72
73 let mut t1 = spawn(rwlock.read());
74 assert_pending!(t1.poll());
75 let g2 = reads.pop().unwrap();
76 drop(g2);
77 assert!(t1.is_woken());
78 let _g1 = assert_ready!(t1.poll());
79}
80
81// When there is an active exclusive owner, subsequent exclusive access should not be possible
82#[test]
83fn write_exclusive_pending() {
84 let rwlock = RwLock::new(100);
85 let mut t1 = spawn(rwlock.write());
86
87 let _g1 = assert_ready!(t1.poll());
88 let mut t2 = spawn(rwlock.write());
89 assert_pending!(t2.poll());
90}
91
92// When there is an active shared owner, exclusive access should be possible after shared is dropped
93#[test]
94fn write_shared_drop() {
95 let rwlock = RwLock::new(100);
96 let mut t1 = spawn(rwlock.read());
97
98 let g1 = assert_ready!(t1.poll());
99 let mut t2 = spawn(rwlock.write());
100 assert_pending!(t2.poll());
101 drop(g1);
102 assert!(t2.is_woken());
103 let _g2 = assert_ready!(t2.poll());
104}
105
106// when there is an active shared owner, and exclusive access is triggered,
107// subsequent shared access should not be possible as write gathers all the available semaphore permits
108#[test]
109fn write_read_shared_pending() {
110 let rwlock = RwLock::new(100);
111 let mut t1 = spawn(rwlock.read());
112 let _g1 = assert_ready!(t1.poll());
113
114 let mut t2 = spawn(rwlock.read());
115 let _g2 = assert_ready!(t2.poll());
116
117 let mut t3 = spawn(rwlock.write());
118 assert_pending!(t3.poll());
119
120 let mut t4 = spawn(rwlock.read());
121 assert_pending!(t4.poll());
122}
123
124// when there is an active shared owner, and exclusive access is triggered,
125// reading should be possible after pending exclusive access is dropped
126#[test]
127fn write_read_shared_drop_pending() {
128 let rwlock = RwLock::new(100);
129 let mut t1 = spawn(rwlock.read());
130 let _g1 = assert_ready!(t1.poll());
131
132 let mut t2 = spawn(rwlock.write());
133 assert_pending!(t2.poll());
134
135 let mut t3 = spawn(rwlock.read());
136 assert_pending!(t3.poll());
137 drop(t2);
138
139 assert!(t3.is_woken());
140 let _t3 = assert_ready!(t3.poll());
141}
142
143// Acquire an RwLock nonexclusively by a single task
144#[maybe_tokio_test]
145async fn read_uncontested() {
146 let rwlock = RwLock::new(100);
147 let result = *rwlock.read().await;
148
149 assert_eq!(result, 100);
150}
151
152// Acquire an uncontested RwLock in exclusive mode
153#[maybe_tokio_test]
154async fn write_uncontested() {
155 let rwlock = RwLock::new(100);
156 let mut result = rwlock.write().await;
157 *result += 50;
158 assert_eq!(*result, 150);
159}
160
161// RwLocks should be acquired in the order that their Futures are waited upon.
162#[maybe_tokio_test]
163async fn write_order() {
164 let rwlock = RwLock::<Vec<u32>>::new(vec![]);
165 let fut2 = rwlock.write().map(|mut guard| guard.push(2));
166 let fut1 = rwlock.write().map(|mut guard| guard.push(1));
167 fut1.await;
168 fut2.await;
169
170 let g = rwlock.read().await;
171 assert_eq!(*g, vec![1, 2]);
172}
173
174// A single RwLock is contested by tasks in multiple threads
175#[cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support threads
176#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
177async fn multithreaded() {
178 use futures::stream::{self, StreamExt};
179 use std::sync::Arc;
180 use tokio::sync::Barrier;
181
182 let barrier = Arc::new(Barrier::new(5));
183 let rwlock = Arc::new(RwLock::<u32>::new(0));
184 let rwclone1 = rwlock.clone();
185 let rwclone2 = rwlock.clone();
186 let rwclone3 = rwlock.clone();
187 let rwclone4 = rwlock.clone();
188
189 let b1 = barrier.clone();
190 tokio::spawn(async move {
191 stream::iter(0..1000)
192 .for_each(move |_| {
193 let rwlock = rwclone1.clone();
194 async move {
195 let mut guard = rwlock.write().await;
196 *guard += 2;
197 }
198 })
199 .await;
200 b1.wait().await;
201 });
202
203 let b2 = barrier.clone();
204 tokio::spawn(async move {
205 stream::iter(0..1000)
206 .for_each(move |_| {
207 let rwlock = rwclone2.clone();
208 async move {
209 let mut guard = rwlock.write().await;
210 *guard += 3;
211 }
212 })
213 .await;
214 b2.wait().await;
215 });
216
217 let b3 = barrier.clone();
218 tokio::spawn(async move {
219 stream::iter(0..1000)
220 .for_each(move |_| {
221 let rwlock = rwclone3.clone();
222 async move {
223 let mut guard = rwlock.write().await;
224 *guard += 5;
225 }
226 })
227 .await;
228 b3.wait().await;
229 });
230
231 let b4 = barrier.clone();
232 tokio::spawn(async move {
233 stream::iter(0..1000)
234 .for_each(move |_| {
235 let rwlock = rwclone4.clone();
236 async move {
237 let mut guard = rwlock.write().await;
238 *guard += 7;
239 }
240 })
241 .await;
242 b4.wait().await;
243 });
244
245 barrier.wait().await;
246 let g = rwlock.read().await;
247 assert_eq!(*g, 17_000);
248}
249
250#[maybe_tokio_test]
251async fn try_write() {
252 let lock = RwLock::new(0);
253 let read_guard = lock.read().await;
254 assert!(lock.try_write().is_err());
255 drop(read_guard);
256 assert!(lock.try_write().is_ok());
257}
258
259#[test]
260fn try_read_try_write() {
261 let lock: RwLock<usize> = RwLock::new(15);
262
263 {
264 let rg1 = lock.try_read().unwrap();
265 assert_eq!(*rg1, 15);
266
267 assert!(lock.try_write().is_err());
268
269 let rg2 = lock.try_read().unwrap();
270 assert_eq!(*rg2, 15)
271 }
272
273 {
274 let mut wg = lock.try_write().unwrap();
275 *wg = 1515;
276
277 assert!(lock.try_read().is_err())
278 }
279
280 assert_eq!(*lock.try_read().unwrap(), 1515);
281}
282
283#[maybe_tokio_test]
284async fn downgrade_map() {
285 let lock = RwLock::new(0);
286 let write_guard = lock.write().await;
287 let mut read_t = spawn(lock.read());
288
289 // We can't create a read when a write exists
290 assert_pending!(read_t.poll());
291
292 // During the call to `f`, `read_t` doesn't have access yet.
293 let read_guard1 = RwLockWriteGuard::downgrade_map(write_guard, |v| {
294 assert_pending!(read_t.poll());
295 v
296 });
297
298 // After the downgrade, `read_t` got the lock
299 let read_guard2 = assert_ready!(read_t.poll());
300
301 // Ensure they're equal, as we return the original value
302 assert_eq!(&*read_guard1 as *const _, &*read_guard2 as *const _);
303}
304
305#[maybe_tokio_test]
306async fn try_downgrade_map() {
307 let lock = RwLock::new(0);
308 let write_guard = lock.write().await;
309 let mut read_t = spawn(lock.read());
310
311 // We can't create a read when a write exists
312 assert_pending!(read_t.poll());
313
314 // During the call to `f`, `read_t` doesn't have access yet.
315 let write_guard = RwLockWriteGuard::try_downgrade_map(write_guard, |_| {
316 assert_pending!(read_t.poll());
317 None::<&()>
318 })
319 .expect_err("downgrade didn't fail");
320
321 // After `f` returns `None`, `read_t` doesn't have access
322 assert_pending!(read_t.poll());
323
324 // After `f` returns `Some`, `read_t` does have access
325 let read_guard1 = RwLockWriteGuard::try_downgrade_map(write_guard, |v| Some(v))
326 .expect("downgrade didn't succeed");
327 let read_guard2 = assert_ready!(read_t.poll());
328
329 // Ensure they're equal, as we return the original value
330 assert_eq!(&*read_guard1 as *const _, &*read_guard2 as *const _);
331}
332