• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::api::icd::*;
2 use crate::api::types::*;
3 use crate::core::context::*;
4 use crate::core::queue::*;
5 use crate::impl_cl_type_trait;
6 
7 use mesa_rust::pipe::query::*;
8 use mesa_rust_gen::*;
9 use mesa_rust_util::static_assert;
10 use rusticl_opencl_gen::*;
11 
12 use std::collections::HashSet;
13 use std::sync::Arc;
14 use std::sync::Condvar;
15 use std::sync::Mutex;
16 use std::sync::MutexGuard;
17 use std::time::Duration;
18 
19 // we assert that those are a continous range of numbers so we won't have to use HashMaps
20 static_assert!(CL_COMPLETE == 0);
21 static_assert!(CL_RUNNING == 1);
22 static_assert!(CL_SUBMITTED == 2);
23 static_assert!(CL_QUEUED == 3);
24 
25 pub type EventSig = Box<dyn FnOnce(&Arc<Queue>, &QueueContext) -> CLResult<()> + Send + Sync>;
26 
27 pub enum EventTimes {
28     Queued = CL_PROFILING_COMMAND_QUEUED as isize,
29     Submit = CL_PROFILING_COMMAND_SUBMIT as isize,
30     Start = CL_PROFILING_COMMAND_START as isize,
31     End = CL_PROFILING_COMMAND_END as isize,
32 }
33 
34 #[derive(Default)]
35 struct EventMutState {
36     status: cl_int,
37     cbs: [Vec<EventCB>; 3],
38     work: Option<EventSig>,
39     time_queued: cl_ulong,
40     time_submit: cl_ulong,
41     time_start: cl_ulong,
42     time_end: cl_ulong,
43 }
44 
45 pub struct Event {
46     pub base: CLObjectBase<CL_INVALID_EVENT>,
47     pub context: Arc<Context>,
48     pub queue: Option<Arc<Queue>>,
49     pub cmd_type: cl_command_type,
50     pub deps: Vec<Arc<Event>>,
51     state: Mutex<EventMutState>,
52     cv: Condvar,
53 }
54 
55 impl_cl_type_trait!(cl_event, Event, CL_INVALID_EVENT);
56 
57 impl Event {
new( queue: &Arc<Queue>, cmd_type: cl_command_type, deps: Vec<Arc<Event>>, work: EventSig, ) -> Arc<Event>58     pub fn new(
59         queue: &Arc<Queue>,
60         cmd_type: cl_command_type,
61         deps: Vec<Arc<Event>>,
62         work: EventSig,
63     ) -> Arc<Event> {
64         Arc::new(Self {
65             base: CLObjectBase::new(RusticlTypes::Event),
66             context: queue.context.clone(),
67             queue: Some(queue.clone()),
68             cmd_type: cmd_type,
69             deps: deps,
70             state: Mutex::new(EventMutState {
71                 status: CL_QUEUED as cl_int,
72                 work: Some(work),
73                 ..Default::default()
74             }),
75             cv: Condvar::new(),
76         })
77     }
78 
new_user(context: Arc<Context>) -> Arc<Event>79     pub fn new_user(context: Arc<Context>) -> Arc<Event> {
80         Arc::new(Self {
81             base: CLObjectBase::new(RusticlTypes::Event),
82             context: context,
83             queue: None,
84             cmd_type: CL_COMMAND_USER,
85             deps: Vec::new(),
86             state: Mutex::new(EventMutState {
87                 status: CL_SUBMITTED as cl_int,
88                 ..Default::default()
89             }),
90             cv: Condvar::new(),
91         })
92     }
93 
state(&self) -> MutexGuard<EventMutState>94     fn state(&self) -> MutexGuard<EventMutState> {
95         self.state.lock().unwrap()
96     }
97 
status(&self) -> cl_int98     pub fn status(&self) -> cl_int {
99         self.state().status
100     }
101 
set_status(&self, lock: &mut MutexGuard<EventMutState>, new: cl_int)102     fn set_status(&self, lock: &mut MutexGuard<EventMutState>, new: cl_int) {
103         lock.status = new;
104 
105         // signal on completion or an error
106         if new <= CL_COMPLETE as cl_int {
107             self.cv.notify_all();
108         }
109 
110         if [CL_COMPLETE, CL_RUNNING, CL_SUBMITTED].contains(&(new as u32)) {
111             if let Some(cbs) = lock.cbs.get_mut(new as usize) {
112                 cbs.drain(..).for_each(|cb| cb.call(self, new));
113             }
114         }
115     }
116 
set_user_status(&self, status: cl_int)117     pub fn set_user_status(&self, status: cl_int) {
118         let mut lock = self.state();
119         self.set_status(&mut lock, status);
120     }
121 
is_error(&self) -> bool122     pub fn is_error(&self) -> bool {
123         self.status() < 0
124     }
125 
is_user(&self) -> bool126     pub fn is_user(&self) -> bool {
127         self.cmd_type == CL_COMMAND_USER
128     }
129 
set_time(&self, which: EventTimes, value: cl_ulong)130     pub fn set_time(&self, which: EventTimes, value: cl_ulong) {
131         let mut lock = self.state();
132         match which {
133             EventTimes::Queued => lock.time_queued = value,
134             EventTimes::Submit => lock.time_submit = value,
135             EventTimes::Start => lock.time_start = value,
136             EventTimes::End => lock.time_end = value,
137         }
138     }
139 
get_time(&self, which: EventTimes) -> cl_ulong140     pub fn get_time(&self, which: EventTimes) -> cl_ulong {
141         let lock = self.state();
142 
143         match which {
144             EventTimes::Queued => lock.time_queued,
145             EventTimes::Submit => lock.time_submit,
146             EventTimes::Start => lock.time_start,
147             EventTimes::End => lock.time_end,
148         }
149     }
150 
add_cb(&self, state: cl_int, cb: EventCB)151     pub fn add_cb(&self, state: cl_int, cb: EventCB) {
152         let mut lock = self.state();
153         let status = lock.status;
154 
155         // call cb if the status was already reached
156         if state >= status {
157             drop(lock);
158             cb.call(self, state);
159         } else {
160             lock.cbs.get_mut(state as usize).unwrap().push(cb);
161         }
162     }
163 
signal(&self)164     pub(super) fn signal(&self) {
165         let mut lock = self.state();
166 
167         self.set_status(&mut lock, CL_RUNNING as cl_int);
168         self.set_status(&mut lock, CL_COMPLETE as cl_int);
169     }
170 
wait(&self) -> cl_int171     pub fn wait(&self) -> cl_int {
172         let mut lock = self.state();
173         while lock.status >= CL_RUNNING as cl_int {
174             lock = self
175                 .cv
176                 .wait_timeout(lock, Duration::from_secs(1))
177                 .unwrap()
178                 .0;
179         }
180         lock.status
181     }
182 
183     // We always assume that work here simply submits stuff to the hardware even if it's just doing
184     // sw emulation or nothing at all.
185     // If anything requets waiting, we will update the status through fencing later.
call(&self, ctx: &QueueContext)186     pub fn call(&self, ctx: &QueueContext) {
187         let mut lock = self.state();
188         let status = lock.status;
189         let queue = self.queue.as_ref().unwrap();
190         let profiling_enabled = queue.is_profiling_enabled();
191         if status == CL_QUEUED as cl_int {
192             if profiling_enabled {
193                 // We already have the lock so can't call set_time on the event
194                 lock.time_submit = queue.device.screen().get_timestamp();
195             }
196             let mut query_start = None;
197             let mut query_end = None;
198             let new = lock.work.take().map_or(
199                 // if there is no work
200                 CL_SUBMITTED as cl_int,
201                 |w| {
202                     if profiling_enabled {
203                         query_start =
204                             PipeQueryGen::<{ pipe_query_type::PIPE_QUERY_TIMESTAMP }>::new(ctx);
205                     }
206 
207                     let res = w(queue, ctx).err().map_or(
208                         // if there is an error, negate it
209                         CL_SUBMITTED as cl_int,
210                         |e| e,
211                     );
212                     if profiling_enabled {
213                         query_end =
214                             PipeQueryGen::<{ pipe_query_type::PIPE_QUERY_TIMESTAMP }>::new(ctx);
215                     }
216                     res
217                 },
218             );
219 
220             if profiling_enabled {
221                 lock.time_start = query_start.unwrap().read_blocked();
222                 lock.time_end = query_end.unwrap().read_blocked();
223             }
224             self.set_status(&mut lock, new);
225         }
226     }
227 
deep_unflushed_deps_impl<'a>(&'a self, result: &mut HashSet<&'a Event>)228     fn deep_unflushed_deps_impl<'a>(&'a self, result: &mut HashSet<&'a Event>) {
229         if self.status() <= CL_SUBMITTED as i32 {
230             return;
231         }
232 
233         // only scan dependencies if it's a new one
234         if result.insert(self) {
235             for e in &self.deps {
236                 e.deep_unflushed_deps_impl(result);
237             }
238         }
239     }
240 
241     /// does a deep search and returns a list of all dependencies including `events` which haven't
242     /// been flushed out yet
deep_unflushed_deps(events: &[Arc<Event>]) -> HashSet<&Event>243     pub fn deep_unflushed_deps(events: &[Arc<Event>]) -> HashSet<&Event> {
244         let mut result = HashSet::new();
245 
246         for e in events {
247             e.deep_unflushed_deps_impl(&mut result);
248         }
249 
250         result
251     }
252 
253     /// does a deep search and returns a list of all queues which haven't been flushed yet
deep_unflushed_queues(events: &[Arc<Event>]) -> HashSet<Arc<Queue>>254     pub fn deep_unflushed_queues(events: &[Arc<Event>]) -> HashSet<Arc<Queue>> {
255         Event::deep_unflushed_deps(events)
256             .iter()
257             .filter_map(|e| e.queue.clone())
258             .collect()
259     }
260 }
261 
262 // TODO worker thread per device
263 // Condvar to wait on new events to work on
264 // notify condvar when flushing queue events to worker
265 // attach fence to flushed events on context->flush
266 // store "newest" event for in-order queues per queue
267 // reordering/graph building done in worker
268