1 use crate::runtime::blocking::NoopSchedule;
2 use crate::runtime::queue;
3 use crate::runtime::stats::WorkerStatsBatcher;
4 use crate::runtime::task::Inject;
5
6 use loom::thread;
7
8 #[test]
basic()9 fn basic() {
10 loom::model(|| {
11 let (steal, mut local) = queue::local();
12 let inject = Inject::new();
13
14 let th = thread::spawn(move || {
15 let mut stats = WorkerStatsBatcher::new(0);
16 let (_, mut local) = queue::local();
17 let mut n = 0;
18
19 for _ in 0..3 {
20 if steal.steal_into(&mut local, &mut stats).is_some() {
21 n += 1;
22 }
23
24 while local.pop().is_some() {
25 n += 1;
26 }
27 }
28
29 n
30 });
31
32 let mut n = 0;
33
34 for _ in 0..2 {
35 for _ in 0..2 {
36 let (task, _) = super::unowned(async {});
37 local.push_back(task, &inject);
38 }
39
40 if local.pop().is_some() {
41 n += 1;
42 }
43
44 // Push another task
45 let (task, _) = super::unowned(async {});
46 local.push_back(task, &inject);
47
48 while local.pop().is_some() {
49 n += 1;
50 }
51 }
52
53 while inject.pop().is_some() {
54 n += 1;
55 }
56
57 n += th.join().unwrap();
58
59 assert_eq!(6, n);
60 });
61 }
62
63 #[test]
steal_overflow()64 fn steal_overflow() {
65 loom::model(|| {
66 let (steal, mut local) = queue::local();
67 let inject = Inject::new();
68
69 let th = thread::spawn(move || {
70 let mut stats = WorkerStatsBatcher::new(0);
71 let (_, mut local) = queue::local();
72 let mut n = 0;
73
74 if steal.steal_into(&mut local, &mut stats).is_some() {
75 n += 1;
76 }
77
78 while local.pop().is_some() {
79 n += 1;
80 }
81
82 n
83 });
84
85 let mut n = 0;
86
87 // push a task, pop a task
88 let (task, _) = super::unowned(async {});
89 local.push_back(task, &inject);
90
91 if local.pop().is_some() {
92 n += 1;
93 }
94
95 for _ in 0..6 {
96 let (task, _) = super::unowned(async {});
97 local.push_back(task, &inject);
98 }
99
100 n += th.join().unwrap();
101
102 while local.pop().is_some() {
103 n += 1;
104 }
105
106 while inject.pop().is_some() {
107 n += 1;
108 }
109
110 assert_eq!(7, n);
111 });
112 }
113
114 #[test]
multi_stealer()115 fn multi_stealer() {
116 const NUM_TASKS: usize = 5;
117
118 fn steal_tasks(steal: queue::Steal<NoopSchedule>) -> usize {
119 let mut stats = WorkerStatsBatcher::new(0);
120 let (_, mut local) = queue::local();
121
122 if steal.steal_into(&mut local, &mut stats).is_none() {
123 return 0;
124 }
125
126 let mut n = 1;
127
128 while local.pop().is_some() {
129 n += 1;
130 }
131
132 n
133 }
134
135 loom::model(|| {
136 let (steal, mut local) = queue::local();
137 let inject = Inject::new();
138
139 // Push work
140 for _ in 0..NUM_TASKS {
141 let (task, _) = super::unowned(async {});
142 local.push_back(task, &inject);
143 }
144
145 let th1 = {
146 let steal = steal.clone();
147 thread::spawn(move || steal_tasks(steal))
148 };
149
150 let th2 = thread::spawn(move || steal_tasks(steal));
151
152 let mut n = 0;
153
154 while local.pop().is_some() {
155 n += 1;
156 }
157
158 while inject.pop().is_some() {
159 n += 1;
160 }
161
162 n += th1.join().unwrap();
163 n += th2.join().unwrap();
164
165 assert_eq!(n, NUM_TASKS);
166 });
167 }
168
169 #[test]
chained_steal()170 fn chained_steal() {
171 loom::model(|| {
172 let mut stats = WorkerStatsBatcher::new(0);
173 let (s1, mut l1) = queue::local();
174 let (s2, mut l2) = queue::local();
175 let inject = Inject::new();
176
177 // Load up some tasks
178 for _ in 0..4 {
179 let (task, _) = super::unowned(async {});
180 l1.push_back(task, &inject);
181
182 let (task, _) = super::unowned(async {});
183 l2.push_back(task, &inject);
184 }
185
186 // Spawn a task to steal from **our** queue
187 let th = thread::spawn(move || {
188 let mut stats = WorkerStatsBatcher::new(0);
189 let (_, mut local) = queue::local();
190 s1.steal_into(&mut local, &mut stats);
191
192 while local.pop().is_some() {}
193 });
194
195 // Drain our tasks, then attempt to steal
196 while l1.pop().is_some() {}
197
198 s2.steal_into(&mut l1, &mut stats);
199
200 th.join().unwrap();
201
202 while l1.pop().is_some() {}
203 while l2.pop().is_some() {}
204 while inject.pop().is_some() {}
205 });
206 }
207