1 use crate::api::icd::*; 2 use crate::api::types::*; 3 use crate::core::context::*; 4 use crate::core::queue::*; 5 use crate::impl_cl_type_trait; 6 7 use mesa_rust::pipe::query::*; 8 use mesa_rust_gen::*; 9 use mesa_rust_util::static_assert; 10 use rusticl_opencl_gen::*; 11 12 use std::collections::HashSet; 13 use std::sync::Arc; 14 use std::sync::Condvar; 15 use std::sync::Mutex; 16 use std::sync::MutexGuard; 17 use std::time::Duration; 18 19 // we assert that those are a continous range of numbers so we won't have to use HashMaps 20 static_assert!(CL_COMPLETE == 0); 21 static_assert!(CL_RUNNING == 1); 22 static_assert!(CL_SUBMITTED == 2); 23 static_assert!(CL_QUEUED == 3); 24 25 pub type EventSig = Box<dyn FnOnce(&Arc<Queue>, &QueueContext) -> CLResult<()> + Send + Sync>; 26 27 pub enum EventTimes { 28 Queued = CL_PROFILING_COMMAND_QUEUED as isize, 29 Submit = CL_PROFILING_COMMAND_SUBMIT as isize, 30 Start = CL_PROFILING_COMMAND_START as isize, 31 End = CL_PROFILING_COMMAND_END as isize, 32 } 33 34 #[derive(Default)] 35 struct EventMutState { 36 status: cl_int, 37 cbs: [Vec<EventCB>; 3], 38 work: Option<EventSig>, 39 time_queued: cl_ulong, 40 time_submit: cl_ulong, 41 time_start: cl_ulong, 42 time_end: cl_ulong, 43 } 44 45 pub struct Event { 46 pub base: CLObjectBase<CL_INVALID_EVENT>, 47 pub context: Arc<Context>, 48 pub queue: Option<Arc<Queue>>, 49 pub cmd_type: cl_command_type, 50 pub deps: Vec<Arc<Event>>, 51 state: Mutex<EventMutState>, 52 cv: Condvar, 53 } 54 55 impl_cl_type_trait!(cl_event, Event, CL_INVALID_EVENT); 56 57 impl Event { new( queue: &Arc<Queue>, cmd_type: cl_command_type, deps: Vec<Arc<Event>>, work: EventSig, ) -> Arc<Event>58 pub fn new( 59 queue: &Arc<Queue>, 60 cmd_type: cl_command_type, 61 deps: Vec<Arc<Event>>, 62 work: EventSig, 63 ) -> Arc<Event> { 64 Arc::new(Self { 65 base: CLObjectBase::new(RusticlTypes::Event), 66 context: queue.context.clone(), 67 queue: Some(queue.clone()), 68 cmd_type: cmd_type, 69 deps: deps, 70 state: Mutex::new(EventMutState { 71 status: CL_QUEUED as cl_int, 72 work: Some(work), 73 ..Default::default() 74 }), 75 cv: Condvar::new(), 76 }) 77 } 78 new_user(context: Arc<Context>) -> Arc<Event>79 pub fn new_user(context: Arc<Context>) -> Arc<Event> { 80 Arc::new(Self { 81 base: CLObjectBase::new(RusticlTypes::Event), 82 context: context, 83 queue: None, 84 cmd_type: CL_COMMAND_USER, 85 deps: Vec::new(), 86 state: Mutex::new(EventMutState { 87 status: CL_SUBMITTED as cl_int, 88 ..Default::default() 89 }), 90 cv: Condvar::new(), 91 }) 92 } 93 state(&self) -> MutexGuard<EventMutState>94 fn state(&self) -> MutexGuard<EventMutState> { 95 self.state.lock().unwrap() 96 } 97 status(&self) -> cl_int98 pub fn status(&self) -> cl_int { 99 self.state().status 100 } 101 set_status(&self, lock: &mut MutexGuard<EventMutState>, new: cl_int)102 fn set_status(&self, lock: &mut MutexGuard<EventMutState>, new: cl_int) { 103 lock.status = new; 104 105 // signal on completion or an error 106 if new <= CL_COMPLETE as cl_int { 107 self.cv.notify_all(); 108 } 109 110 if [CL_COMPLETE, CL_RUNNING, CL_SUBMITTED].contains(&(new as u32)) { 111 if let Some(cbs) = lock.cbs.get_mut(new as usize) { 112 cbs.drain(..).for_each(|cb| cb.call(self, new)); 113 } 114 } 115 } 116 set_user_status(&self, status: cl_int)117 pub fn set_user_status(&self, status: cl_int) { 118 let mut lock = self.state(); 119 self.set_status(&mut lock, status); 120 } 121 is_error(&self) -> bool122 pub fn is_error(&self) -> bool { 123 self.status() < 0 124 } 125 is_user(&self) -> bool126 pub fn is_user(&self) -> bool { 127 self.cmd_type == CL_COMMAND_USER 128 } 129 set_time(&self, which: EventTimes, value: cl_ulong)130 pub fn set_time(&self, which: EventTimes, value: cl_ulong) { 131 let mut lock = self.state(); 132 match which { 133 EventTimes::Queued => lock.time_queued = value, 134 EventTimes::Submit => lock.time_submit = value, 135 EventTimes::Start => lock.time_start = value, 136 EventTimes::End => lock.time_end = value, 137 } 138 } 139 get_time(&self, which: EventTimes) -> cl_ulong140 pub fn get_time(&self, which: EventTimes) -> cl_ulong { 141 let lock = self.state(); 142 143 match which { 144 EventTimes::Queued => lock.time_queued, 145 EventTimes::Submit => lock.time_submit, 146 EventTimes::Start => lock.time_start, 147 EventTimes::End => lock.time_end, 148 } 149 } 150 add_cb(&self, state: cl_int, cb: EventCB)151 pub fn add_cb(&self, state: cl_int, cb: EventCB) { 152 let mut lock = self.state(); 153 let status = lock.status; 154 155 // call cb if the status was already reached 156 if state >= status { 157 drop(lock); 158 cb.call(self, state); 159 } else { 160 lock.cbs.get_mut(state as usize).unwrap().push(cb); 161 } 162 } 163 signal(&self)164 pub(super) fn signal(&self) { 165 let mut lock = self.state(); 166 167 self.set_status(&mut lock, CL_RUNNING as cl_int); 168 self.set_status(&mut lock, CL_COMPLETE as cl_int); 169 } 170 wait(&self) -> cl_int171 pub fn wait(&self) -> cl_int { 172 let mut lock = self.state(); 173 while lock.status >= CL_RUNNING as cl_int { 174 lock = self 175 .cv 176 .wait_timeout(lock, Duration::from_secs(1)) 177 .unwrap() 178 .0; 179 } 180 lock.status 181 } 182 183 // We always assume that work here simply submits stuff to the hardware even if it's just doing 184 // sw emulation or nothing at all. 185 // If anything requets waiting, we will update the status through fencing later. call(&self, ctx: &QueueContext)186 pub fn call(&self, ctx: &QueueContext) { 187 let mut lock = self.state(); 188 let status = lock.status; 189 let queue = self.queue.as_ref().unwrap(); 190 let profiling_enabled = queue.is_profiling_enabled(); 191 if status == CL_QUEUED as cl_int { 192 if profiling_enabled { 193 // We already have the lock so can't call set_time on the event 194 lock.time_submit = queue.device.screen().get_timestamp(); 195 } 196 let mut query_start = None; 197 let mut query_end = None; 198 let new = lock.work.take().map_or( 199 // if there is no work 200 CL_SUBMITTED as cl_int, 201 |w| { 202 if profiling_enabled { 203 query_start = 204 PipeQueryGen::<{ pipe_query_type::PIPE_QUERY_TIMESTAMP }>::new(ctx); 205 } 206 207 let res = w(queue, ctx).err().map_or( 208 // if there is an error, negate it 209 CL_SUBMITTED as cl_int, 210 |e| e, 211 ); 212 if profiling_enabled { 213 query_end = 214 PipeQueryGen::<{ pipe_query_type::PIPE_QUERY_TIMESTAMP }>::new(ctx); 215 } 216 res 217 }, 218 ); 219 220 if profiling_enabled { 221 lock.time_start = query_start.unwrap().read_blocked(); 222 lock.time_end = query_end.unwrap().read_blocked(); 223 } 224 self.set_status(&mut lock, new); 225 } 226 } 227 deep_unflushed_deps_impl<'a>(&'a self, result: &mut HashSet<&'a Event>)228 fn deep_unflushed_deps_impl<'a>(&'a self, result: &mut HashSet<&'a Event>) { 229 if self.status() <= CL_SUBMITTED as i32 { 230 return; 231 } 232 233 // only scan dependencies if it's a new one 234 if result.insert(self) { 235 for e in &self.deps { 236 e.deep_unflushed_deps_impl(result); 237 } 238 } 239 } 240 241 /// does a deep search and returns a list of all dependencies including `events` which haven't 242 /// been flushed out yet deep_unflushed_deps(events: &[Arc<Event>]) -> HashSet<&Event>243 pub fn deep_unflushed_deps(events: &[Arc<Event>]) -> HashSet<&Event> { 244 let mut result = HashSet::new(); 245 246 for e in events { 247 e.deep_unflushed_deps_impl(&mut result); 248 } 249 250 result 251 } 252 253 /// does a deep search and returns a list of all queues which haven't been flushed yet deep_unflushed_queues(events: &[Arc<Event>]) -> HashSet<Arc<Queue>>254 pub fn deep_unflushed_queues(events: &[Arc<Event>]) -> HashSet<Arc<Queue>> { 255 Event::deep_unflushed_deps(events) 256 .iter() 257 .filter_map(|e| e.queue.clone()) 258 .collect() 259 } 260 } 261 262 // TODO worker thread per device 263 // Condvar to wait on new events to work on 264 // notify condvar when flushing queue events to worker 265 // attach fence to flushed events on context->flush 266 // store "newest" event for in-order queues per queue 267 // reordering/graph building done in worker 268