• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::api::icd::*;
2 use crate::core::context::*;
3 use crate::core::device::*;
4 use crate::core::event::*;
5 use crate::core::platform::*;
6 use crate::impl_cl_type_trait;
7 
8 use mesa_rust::pipe::context::PipeContext;
9 use mesa_rust_gen::*;
10 use mesa_rust_util::properties::*;
11 use rusticl_opencl_gen::*;
12 
13 use std::cmp;
14 use std::mem;
15 use std::mem::ManuallyDrop;
16 use std::ops::Deref;
17 use std::sync::mpsc;
18 use std::sync::Arc;
19 use std::sync::Mutex;
20 use std::sync::Weak;
21 use std::thread;
22 use std::thread::JoinHandle;
23 
24 /// State tracking wrapper for [PipeContext]
25 ///
26 /// Used for tracking bound GPU state to lower CPU overhead and centralize state tracking
27 pub struct QueueContext {
28     // need to use ManuallyDrop so we can recycle the context without cloning
29     ctx: ManuallyDrop<PipeContext>,
30     pub dev: &'static Device,
31     use_stream: bool,
32 }
33 
34 impl QueueContext {
new_for(device: &'static Device) -> CLResult<Self>35     fn new_for(device: &'static Device) -> CLResult<Self> {
36         let ctx = device.create_context().ok_or(CL_OUT_OF_HOST_MEMORY)?;
37 
38         Ok(Self {
39             ctx: ManuallyDrop::new(ctx),
40             dev: device,
41             use_stream: device.prefers_real_buffer_in_cb0(),
42         })
43     }
44 
update_cb0(&self, data: &[u8]) -> CLResult<()>45     pub fn update_cb0(&self, data: &[u8]) -> CLResult<()> {
46         // only update if we actually bind data
47         if !data.is_empty() {
48             if self.use_stream {
49                 if !self.ctx.set_constant_buffer_stream(0, data) {
50                     return Err(CL_OUT_OF_RESOURCES);
51                 }
52             } else {
53                 self.ctx.set_constant_buffer(0, data);
54             }
55         }
56         Ok(())
57     }
58 }
59 
60 // This should go once we moved all state tracking into QueueContext
61 impl Deref for QueueContext {
62     type Target = PipeContext;
63 
deref(&self) -> &Self::Target64     fn deref(&self) -> &Self::Target {
65         &self.ctx
66     }
67 }
68 
69 impl Drop for QueueContext {
drop(&mut self)70     fn drop(&mut self) {
71         let ctx = unsafe { ManuallyDrop::take(&mut self.ctx) };
72         ctx.set_constant_buffer(0, &[]);
73         self.dev.recycle_context(ctx);
74     }
75 }
76 
77 struct QueueState {
78     pending: Vec<Arc<Event>>,
79     last: Weak<Event>,
80     // `Sync` on `Sender` was stabilized in 1.72, until then, put it into our Mutex.
81     // see https://github.com/rust-lang/rust/commit/5f56956b3c7edb9801585850d1f41b0aeb1888ff
82     chan_in: mpsc::Sender<Vec<Arc<Event>>>,
83 }
84 
85 pub struct Queue {
86     pub base: CLObjectBase<CL_INVALID_COMMAND_QUEUE>,
87     pub context: Arc<Context>,
88     pub device: &'static Device,
89     pub props: cl_command_queue_properties,
90     pub props_v2: Properties<cl_queue_properties>,
91     state: Mutex<QueueState>,
92     thrd: JoinHandle<()>,
93 }
94 
95 impl_cl_type_trait!(cl_command_queue, Queue, CL_INVALID_COMMAND_QUEUE);
96 
flush_events(evs: &mut Vec<Arc<Event>>, pipe: &PipeContext) -> cl_int97 fn flush_events(evs: &mut Vec<Arc<Event>>, pipe: &PipeContext) -> cl_int {
98     if !evs.is_empty() {
99         pipe.flush().wait();
100         if pipe.device_reset_status() != pipe_reset_status::PIPE_NO_RESET {
101             // if the context reset while executing, simply put all events into error state.
102             evs.drain(..)
103                 .for_each(|e| e.set_user_status(CL_OUT_OF_RESOURCES));
104             return CL_OUT_OF_RESOURCES;
105         } else {
106             evs.drain(..).for_each(|e| e.signal());
107         }
108     }
109 
110     CL_SUCCESS as cl_int
111 }
112 
113 impl Queue {
new( context: Arc<Context>, device: &'static Device, props: cl_command_queue_properties, props_v2: Properties<cl_queue_properties>, ) -> CLResult<Arc<Queue>>114     pub fn new(
115         context: Arc<Context>,
116         device: &'static Device,
117         props: cl_command_queue_properties,
118         props_v2: Properties<cl_queue_properties>,
119     ) -> CLResult<Arc<Queue>> {
120         // we assume that memory allocation is the only possible failure. Any other failure reason
121         // should be detected earlier (e.g.: checking for CAPs).
122         let ctx = QueueContext::new_for(device)?;
123         let (tx_q, rx_t) = mpsc::channel::<Vec<Arc<Event>>>();
124         Ok(Arc::new(Self {
125             base: CLObjectBase::new(RusticlTypes::Queue),
126             context: context,
127             device: device,
128             props: props,
129             props_v2: props_v2,
130             state: Mutex::new(QueueState {
131                 pending: Vec::new(),
132                 last: Weak::new(),
133                 chan_in: tx_q,
134             }),
135             thrd: thread::Builder::new()
136                 .name("rusticl queue thread".into())
137                 .spawn(move || {
138                     // Track the error of all executed events. This is only needed for in-order
139                     // queues, so for out of order we'll need to update this.
140                     // Also, the OpenCL specification gives us enough freedom to do whatever we want
141                     // in case of any event running into an error while executing:
142                     //
143                     //   Unsuccessful completion results in abnormal termination of the command
144                     //   which is indicated by setting the event status to a negative value. In this
145                     //   case, the command-queue associated with the abnormally terminated command
146                     //   and all other command-queues in the same context may no longer be available
147                     //   and their behavior is implementation-defined.
148                     //
149                     // TODO: use pipe_context::set_device_reset_callback to get notified about gone
150                     //       GPU contexts
151                     let mut last_err = CL_SUCCESS as cl_int;
152                     loop {
153                         let r = rx_t.recv();
154                         if r.is_err() {
155                             break;
156                         }
157 
158                         let new_events = r.unwrap();
159                         let mut flushed = Vec::new();
160 
161                         for e in new_events {
162                             // If we hit any deps from another queue, flush so we don't risk a dead
163                             // lock.
164                             if e.deps.iter().any(|ev| ev.queue != e.queue) {
165                                 let dep_err = flush_events(&mut flushed, &ctx);
166                                 last_err = cmp::min(last_err, dep_err);
167                             }
168 
169                             // check if any dependency has an error
170                             for dep in &e.deps {
171                                 // We have to wait on user events or events from other queues.
172                                 let dep_err = if dep.is_user() || dep.queue != e.queue {
173                                     dep.wait()
174                                 } else {
175                                     dep.status()
176                                 };
177 
178                                 last_err = cmp::min(last_err, dep_err);
179                             }
180 
181                             if last_err < 0 {
182                                 // If a dependency failed, fail this event as well.
183                                 e.set_user_status(last_err);
184                                 continue;
185                             }
186 
187                             // if there is an execution error don't bother signaling it as the  context
188                             // might be in a broken state. How queues behave after any event hit an
189                             // error is entirely implementation defined.
190                             last_err = e.call(&ctx);
191                             if last_err < 0 {
192                                 continue;
193                             }
194 
195                             if e.is_user() {
196                                 // On each user event we flush our events as application might
197                                 // wait on them before signaling user events.
198                                 last_err = flush_events(&mut flushed, &ctx);
199 
200                                 if last_err >= 0 {
201                                     // Wait on user events as they are synchronization points in the
202                                     // application's control.
203                                     e.wait();
204                                 }
205                             } else if Platform::dbg().sync_every_event {
206                                 flushed.push(e);
207                                 last_err = flush_events(&mut flushed, &ctx);
208                             } else {
209                                 flushed.push(e);
210                             }
211                         }
212 
213                         let flush_err = flush_events(&mut flushed, &ctx);
214                         last_err = cmp::min(last_err, flush_err);
215                     }
216                 })
217                 .unwrap(),
218         }))
219     }
220 
queue(&self, e: Arc<Event>)221     pub fn queue(&self, e: Arc<Event>) {
222         if self.is_profiling_enabled() {
223             e.set_time(EventTimes::Queued, self.device.screen().get_timestamp());
224         }
225         self.state.lock().unwrap().pending.push(e);
226     }
227 
flush(&self, wait: bool) -> CLResult<()>228     pub fn flush(&self, wait: bool) -> CLResult<()> {
229         let mut state = self.state.lock().unwrap();
230         let events = mem::take(&mut state.pending);
231         let mut queues = Event::deep_unflushed_queues(&events);
232 
233         // Update last if and only if we get new events, this prevents breaking application code
234         // doing things like `clFlush(q); clFinish(q);`
235         if let Some(last) = events.last() {
236             state.last = Arc::downgrade(last);
237 
238             // This should never ever error, but if it does return an error
239             state
240                 .chan_in
241                 .send(events)
242                 .map_err(|_| CL_OUT_OF_HOST_MEMORY)?;
243         }
244 
245         let last = wait.then(|| state.last.clone());
246 
247         // We have to unlock before actually flushing otherwise we'll run into dead locks when a
248         // queue gets flushed concurrently.
249         drop(state);
250 
251         // We need to flush out other queues implicitly and this _has_ to happen after taking the
252         // pending events, otherwise we'll risk dead locks when waiting on events.
253         queues.remove(self);
254         for q in queues {
255             q.flush(false)?;
256         }
257 
258         if let Some(last) = last {
259             // Waiting on the last event is good enough here as the queue will process it in order
260             // It's not a problem if the weak ref is invalid as that means the work is already done
261             // and waiting isn't necessary anymore.
262             let err = last.upgrade().map(|e| e.wait()).unwrap_or_default();
263             if err < 0 {
264                 return Err(err);
265             }
266         }
267         Ok(())
268     }
269 
is_dead(&self) -> bool270     pub fn is_dead(&self) -> bool {
271         self.thrd.is_finished()
272     }
273 
is_profiling_enabled(&self) -> bool274     pub fn is_profiling_enabled(&self) -> bool {
275         (self.props & (CL_QUEUE_PROFILING_ENABLE as u64)) != 0
276     }
277 }
278 
279 impl Drop for Queue {
drop(&mut self)280     fn drop(&mut self) {
281         // when deleting the application side object, we have to flush
282         // From the OpenCL spec:
283         // clReleaseCommandQueue performs an implicit flush to issue any previously queued OpenCL
284         // commands in command_queue.
285         // TODO: maybe we have to do it on every release?
286         let _ = self.flush(true);
287     }
288 }
289