• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::api::icd::*;
2 use crate::api::types::*;
3 use crate::core::context::*;
4 use crate::core::queue::*;
5 use crate::impl_cl_type_trait;
6 
7 use mesa_rust::pipe::query::*;
8 use mesa_rust_gen::*;
9 use mesa_rust_util::static_assert;
10 use rusticl_opencl_gen::*;
11 
12 use std::collections::HashSet;
13 use std::mem;
14 use std::sync::Arc;
15 use std::sync::Condvar;
16 use std::sync::Mutex;
17 use std::sync::MutexGuard;
18 use std::time::Duration;
19 
20 // we assert that those are a continous range of numbers so we won't have to use HashMaps
21 static_assert!(CL_COMPLETE == 0);
22 static_assert!(CL_RUNNING == 1);
23 static_assert!(CL_SUBMITTED == 2);
24 static_assert!(CL_QUEUED == 3);
25 
26 pub type EventSig = Box<dyn FnOnce(&Arc<Queue>, &QueueContext) -> CLResult<()> + Send + Sync>;
27 
28 pub enum EventTimes {
29     Queued = CL_PROFILING_COMMAND_QUEUED as isize,
30     Submit = CL_PROFILING_COMMAND_SUBMIT as isize,
31     Start = CL_PROFILING_COMMAND_START as isize,
32     End = CL_PROFILING_COMMAND_END as isize,
33 }
34 
35 #[derive(Default)]
36 struct EventMutState {
37     status: cl_int,
38     cbs: [Vec<EventCB>; 3],
39     work: Option<EventSig>,
40     time_queued: cl_ulong,
41     time_submit: cl_ulong,
42     time_start: cl_ulong,
43     time_end: cl_ulong,
44 }
45 
46 pub struct Event {
47     pub base: CLObjectBase<CL_INVALID_EVENT>,
48     pub context: Arc<Context>,
49     pub queue: Option<Arc<Queue>>,
50     pub cmd_type: cl_command_type,
51     pub deps: Vec<Arc<Event>>,
52     state: Mutex<EventMutState>,
53     cv: Condvar,
54 }
55 
56 impl_cl_type_trait!(cl_event, Event, CL_INVALID_EVENT);
57 
58 impl Event {
new( queue: &Arc<Queue>, cmd_type: cl_command_type, deps: Vec<Arc<Event>>, work: EventSig, ) -> Arc<Event>59     pub fn new(
60         queue: &Arc<Queue>,
61         cmd_type: cl_command_type,
62         deps: Vec<Arc<Event>>,
63         work: EventSig,
64     ) -> Arc<Event> {
65         Arc::new(Self {
66             base: CLObjectBase::new(RusticlTypes::Event),
67             context: queue.context.clone(),
68             queue: Some(queue.clone()),
69             cmd_type: cmd_type,
70             deps: deps,
71             state: Mutex::new(EventMutState {
72                 status: CL_QUEUED as cl_int,
73                 work: Some(work),
74                 ..Default::default()
75             }),
76             cv: Condvar::new(),
77         })
78     }
79 
new_user(context: Arc<Context>) -> Arc<Event>80     pub fn new_user(context: Arc<Context>) -> Arc<Event> {
81         Arc::new(Self {
82             base: CLObjectBase::new(RusticlTypes::Event),
83             context: context,
84             queue: None,
85             cmd_type: CL_COMMAND_USER,
86             deps: Vec::new(),
87             state: Mutex::new(EventMutState {
88                 status: CL_SUBMITTED as cl_int,
89                 ..Default::default()
90             }),
91             cv: Condvar::new(),
92         })
93     }
94 
state(&self) -> MutexGuard<EventMutState>95     fn state(&self) -> MutexGuard<EventMutState> {
96         self.state.lock().unwrap()
97     }
98 
status(&self) -> cl_int99     pub fn status(&self) -> cl_int {
100         self.state().status
101     }
102 
set_status(&self, mut lock: MutexGuard<EventMutState>, new: cl_int)103     fn set_status(&self, mut lock: MutexGuard<EventMutState>, new: cl_int) {
104         lock.status = new;
105 
106         // signal on completion or an error
107         if new <= CL_COMPLETE as cl_int {
108             self.cv.notify_all();
109         }
110 
111         // errors we treat as CL_COMPLETE
112         let cb_max = if new < 0 { CL_COMPLETE } else { new as u32 };
113 
114         // there are only callbacks for those
115         if ![CL_COMPLETE, CL_RUNNING, CL_SUBMITTED].contains(&cb_max) {
116             return;
117         }
118 
119         let mut cbs = Vec::new();
120         // Collect all cbs we need to call first. Technically it is not required to call them in
121         // order, but let's be nice to applications as it's for free
122         for idx in (cb_max..=CL_SUBMITTED).rev() {
123             cbs.extend(
124                 // use mem::take as each callback is only supposed to be called exactly once
125                 mem::take(&mut lock.cbs[idx as usize])
126                     .into_iter()
127                     // we need to save the status this cb was registered with
128                     .map(|cb| (idx as cl_int, cb)),
129             );
130         }
131 
132         // applications might want to access the event in the callback, so drop the lock before
133         // calling into the callbacks.
134         drop(lock);
135 
136         for (idx, cb) in cbs {
137             // from the CL spec:
138             //
139             // event_command_status is equal to the command_exec_callback_type used while
140             // registering the callback. [...] If the callback is called as the result of the
141             // command associated with event being abnormally terminated, an appropriate error code
142             // for the error that caused the termination will be passed to event_command_status
143             // instead.
144             let status = if new < 0 { new } else { idx };
145             cb.call(self, status);
146         }
147     }
148 
set_user_status(&self, status: cl_int)149     pub fn set_user_status(&self, status: cl_int) {
150         self.set_status(self.state(), status);
151     }
152 
is_error(&self) -> bool153     pub fn is_error(&self) -> bool {
154         self.status() < 0
155     }
156 
is_user(&self) -> bool157     pub fn is_user(&self) -> bool {
158         self.cmd_type == CL_COMMAND_USER
159     }
160 
set_time(&self, which: EventTimes, value: cl_ulong)161     pub fn set_time(&self, which: EventTimes, value: cl_ulong) {
162         let mut lock = self.state();
163         match which {
164             EventTimes::Queued => lock.time_queued = value,
165             EventTimes::Submit => lock.time_submit = value,
166             EventTimes::Start => lock.time_start = value,
167             EventTimes::End => lock.time_end = value,
168         }
169     }
170 
get_time(&self, which: EventTimes) -> cl_ulong171     pub fn get_time(&self, which: EventTimes) -> cl_ulong {
172         let lock = self.state();
173 
174         match which {
175             EventTimes::Queued => lock.time_queued,
176             EventTimes::Submit => lock.time_submit,
177             EventTimes::Start => lock.time_start,
178             EventTimes::End => lock.time_end,
179         }
180     }
181 
add_cb(&self, state: cl_int, cb: EventCB)182     pub fn add_cb(&self, state: cl_int, cb: EventCB) {
183         let mut lock = self.state();
184         let status = lock.status;
185 
186         // call cb if the status was already reached
187         if state >= status {
188             drop(lock);
189             cb.call(self, state);
190         } else {
191             lock.cbs.get_mut(state as usize).unwrap().push(cb);
192         }
193     }
194 
signal(&self)195     pub(super) fn signal(&self) {
196         let state = self.state();
197         // we don't want to call signal on errored events, but if that still happens, handle it
198         // gracefully
199         debug_assert_eq!(state.status, CL_SUBMITTED as cl_int);
200         if state.status < 0 {
201             return;
202         }
203         self.set_status(state, CL_RUNNING as cl_int);
204         self.set_status(self.state(), CL_COMPLETE as cl_int);
205     }
206 
wait(&self) -> cl_int207     pub fn wait(&self) -> cl_int {
208         let mut lock = self.state();
209         while lock.status >= CL_RUNNING as cl_int {
210             lock = self
211                 .cv
212                 .wait_timeout(lock, Duration::from_secs(1))
213                 .unwrap()
214                 .0;
215 
216             if let Some(queue) = &self.queue {
217                 // in case the queue worker thread exited abnormally we'll need to stop spinning on
218                 // the cv here, because otherwise we'll end up spinning endlessly.
219                 if queue.is_dead() {
220                     return CL_OUT_OF_HOST_MEMORY;
221                 }
222             }
223         }
224         lock.status
225     }
226 
227     // We always assume that work here simply submits stuff to the hardware even if it's just doing
228     // sw emulation or nothing at all.
229     // If anything requets waiting, we will update the status through fencing later.
call(&self, ctx: &QueueContext) -> cl_int230     pub fn call(&self, ctx: &QueueContext) -> cl_int {
231         let mut lock = self.state();
232         let mut status = lock.status;
233         let queue = self.queue.as_ref().unwrap();
234         let profiling_enabled = queue.is_profiling_enabled();
235         if status == CL_QUEUED as cl_int {
236             if profiling_enabled {
237                 // We already have the lock so can't call set_time on the event
238                 lock.time_submit = queue.device.screen().get_timestamp();
239             }
240             let mut query_start = None;
241             let mut query_end = None;
242             status = lock.work.take().map_or(
243                 // if there is no work
244                 CL_SUBMITTED as cl_int,
245                 |w| {
246                     if profiling_enabled {
247                         query_start =
248                             PipeQueryGen::<{ pipe_query_type::PIPE_QUERY_TIMESTAMP }>::new(ctx);
249                     }
250 
251                     let res = w(queue, ctx).err().map_or(
252                         // return the error if there is one
253                         CL_SUBMITTED as cl_int,
254                         |e| e,
255                     );
256                     if profiling_enabled {
257                         query_end =
258                             PipeQueryGen::<{ pipe_query_type::PIPE_QUERY_TIMESTAMP }>::new(ctx);
259                     }
260                     res
261                 },
262             );
263 
264             if profiling_enabled {
265                 lock.time_start = query_start.unwrap().read_blocked();
266                 lock.time_end = query_end.unwrap().read_blocked();
267             }
268             self.set_status(lock, status);
269         }
270         status
271     }
272 
deep_unflushed_deps_impl<'a>(&'a self, result: &mut HashSet<&'a Event>)273     fn deep_unflushed_deps_impl<'a>(&'a self, result: &mut HashSet<&'a Event>) {
274         if self.status() <= CL_SUBMITTED as i32 {
275             return;
276         }
277 
278         // only scan dependencies if it's a new one
279         if result.insert(self) {
280             for e in &self.deps {
281                 e.deep_unflushed_deps_impl(result);
282             }
283         }
284     }
285 
286     /// does a deep search and returns a list of all dependencies including `events` which haven't
287     /// been flushed out yet
deep_unflushed_deps(events: &[Arc<Event>]) -> HashSet<&Event>288     pub fn deep_unflushed_deps(events: &[Arc<Event>]) -> HashSet<&Event> {
289         let mut result = HashSet::new();
290 
291         for e in events {
292             e.deep_unflushed_deps_impl(&mut result);
293         }
294 
295         result
296     }
297 
298     /// does a deep search and returns a list of all queues which haven't been flushed yet
deep_unflushed_queues(events: &[Arc<Event>]) -> HashSet<Arc<Queue>>299     pub fn deep_unflushed_queues(events: &[Arc<Event>]) -> HashSet<Arc<Queue>> {
300         Event::deep_unflushed_deps(events)
301             .iter()
302             .filter_map(|e| e.queue.clone())
303             .collect()
304     }
305 }
306 
307 impl Drop for Event {
308     // implement drop in order to prevent stack overflows of long dependency chains.
309     //
310     // This abuses the fact that `Arc::into_inner` only succeeds when there is one strong reference
311     // so we turn a recursive drop chain into a drop list for events having no other references.
drop(&mut self)312     fn drop(&mut self) {
313         if self.deps.is_empty() {
314             return;
315         }
316 
317         let mut deps_list = vec![mem::take(&mut self.deps)];
318         while let Some(deps) = deps_list.pop() {
319             for dep in deps {
320                 if let Some(mut dep) = Arc::into_inner(dep) {
321                     deps_list.push(mem::take(&mut dep.deps));
322                 }
323             }
324         }
325     }
326 }
327 
328 // TODO worker thread per device
329 // Condvar to wait on new events to work on
330 // notify condvar when flushing queue events to worker
331 // attach fence to flushed events on context->flush
332 // store "newest" event for in-order queues per queue
333 // reordering/graph building done in worker
334