1 use crate::api::icd::*;
2 use crate::core::context::*;
3 use crate::core::device::*;
4 use crate::core::event::*;
5 use crate::core::platform::*;
6 use crate::impl_cl_type_trait;
7
8 use mesa_rust::pipe::context::PipeContext;
9 use mesa_rust_gen::*;
10 use mesa_rust_util::properties::*;
11 use rusticl_opencl_gen::*;
12
13 use std::cmp;
14 use std::mem;
15 use std::mem::ManuallyDrop;
16 use std::ops::Deref;
17 use std::sync::mpsc;
18 use std::sync::Arc;
19 use std::sync::Mutex;
20 use std::sync::Weak;
21 use std::thread;
22 use std::thread::JoinHandle;
23
24 /// State tracking wrapper for [PipeContext]
25 ///
26 /// Used for tracking bound GPU state to lower CPU overhead and centralize state tracking
27 pub struct QueueContext {
28 // need to use ManuallyDrop so we can recycle the context without cloning
29 ctx: ManuallyDrop<PipeContext>,
30 pub dev: &'static Device,
31 use_stream: bool,
32 }
33
34 impl QueueContext {
new_for(device: &'static Device) -> CLResult<Self>35 fn new_for(device: &'static Device) -> CLResult<Self> {
36 let ctx = device.create_context().ok_or(CL_OUT_OF_HOST_MEMORY)?;
37
38 Ok(Self {
39 ctx: ManuallyDrop::new(ctx),
40 dev: device,
41 use_stream: device.prefers_real_buffer_in_cb0(),
42 })
43 }
44
update_cb0(&self, data: &[u8]) -> CLResult<()>45 pub fn update_cb0(&self, data: &[u8]) -> CLResult<()> {
46 // only update if we actually bind data
47 if !data.is_empty() {
48 if self.use_stream {
49 if !self.ctx.set_constant_buffer_stream(0, data) {
50 return Err(CL_OUT_OF_RESOURCES);
51 }
52 } else {
53 self.ctx.set_constant_buffer(0, data);
54 }
55 }
56 Ok(())
57 }
58 }
59
60 // This should go once we moved all state tracking into QueueContext
61 impl Deref for QueueContext {
62 type Target = PipeContext;
63
deref(&self) -> &Self::Target64 fn deref(&self) -> &Self::Target {
65 &self.ctx
66 }
67 }
68
69 impl Drop for QueueContext {
drop(&mut self)70 fn drop(&mut self) {
71 let ctx = unsafe { ManuallyDrop::take(&mut self.ctx) };
72 ctx.set_constant_buffer(0, &[]);
73 self.dev.recycle_context(ctx);
74 }
75 }
76
77 struct QueueState {
78 pending: Vec<Arc<Event>>,
79 last: Weak<Event>,
80 // `Sync` on `Sender` was stabilized in 1.72, until then, put it into our Mutex.
81 // see https://github.com/rust-lang/rust/commit/5f56956b3c7edb9801585850d1f41b0aeb1888ff
82 chan_in: mpsc::Sender<Vec<Arc<Event>>>,
83 }
84
85 pub struct Queue {
86 pub base: CLObjectBase<CL_INVALID_COMMAND_QUEUE>,
87 pub context: Arc<Context>,
88 pub device: &'static Device,
89 pub props: cl_command_queue_properties,
90 pub props_v2: Properties<cl_queue_properties>,
91 state: Mutex<QueueState>,
92 thrd: JoinHandle<()>,
93 }
94
95 impl_cl_type_trait!(cl_command_queue, Queue, CL_INVALID_COMMAND_QUEUE);
96
flush_events(evs: &mut Vec<Arc<Event>>, pipe: &PipeContext) -> cl_int97 fn flush_events(evs: &mut Vec<Arc<Event>>, pipe: &PipeContext) -> cl_int {
98 if !evs.is_empty() {
99 pipe.flush().wait();
100 if pipe.device_reset_status() != pipe_reset_status::PIPE_NO_RESET {
101 // if the context reset while executing, simply put all events into error state.
102 evs.drain(..)
103 .for_each(|e| e.set_user_status(CL_OUT_OF_RESOURCES));
104 return CL_OUT_OF_RESOURCES;
105 } else {
106 evs.drain(..).for_each(|e| e.signal());
107 }
108 }
109
110 CL_SUCCESS as cl_int
111 }
112
113 impl Queue {
new( context: Arc<Context>, device: &'static Device, props: cl_command_queue_properties, props_v2: Properties<cl_queue_properties>, ) -> CLResult<Arc<Queue>>114 pub fn new(
115 context: Arc<Context>,
116 device: &'static Device,
117 props: cl_command_queue_properties,
118 props_v2: Properties<cl_queue_properties>,
119 ) -> CLResult<Arc<Queue>> {
120 // we assume that memory allocation is the only possible failure. Any other failure reason
121 // should be detected earlier (e.g.: checking for CAPs).
122 let ctx = QueueContext::new_for(device)?;
123 let (tx_q, rx_t) = mpsc::channel::<Vec<Arc<Event>>>();
124 Ok(Arc::new(Self {
125 base: CLObjectBase::new(RusticlTypes::Queue),
126 context: context,
127 device: device,
128 props: props,
129 props_v2: props_v2,
130 state: Mutex::new(QueueState {
131 pending: Vec::new(),
132 last: Weak::new(),
133 chan_in: tx_q,
134 }),
135 thrd: thread::Builder::new()
136 .name("rusticl queue thread".into())
137 .spawn(move || {
138 // Track the error of all executed events. This is only needed for in-order
139 // queues, so for out of order we'll need to update this.
140 // Also, the OpenCL specification gives us enough freedom to do whatever we want
141 // in case of any event running into an error while executing:
142 //
143 // Unsuccessful completion results in abnormal termination of the command
144 // which is indicated by setting the event status to a negative value. In this
145 // case, the command-queue associated with the abnormally terminated command
146 // and all other command-queues in the same context may no longer be available
147 // and their behavior is implementation-defined.
148 //
149 // TODO: use pipe_context::set_device_reset_callback to get notified about gone
150 // GPU contexts
151 let mut last_err = CL_SUCCESS as cl_int;
152 loop {
153 let r = rx_t.recv();
154 if r.is_err() {
155 break;
156 }
157
158 let new_events = r.unwrap();
159 let mut flushed = Vec::new();
160
161 for e in new_events {
162 // If we hit any deps from another queue, flush so we don't risk a dead
163 // lock.
164 if e.deps.iter().any(|ev| ev.queue != e.queue) {
165 let dep_err = flush_events(&mut flushed, &ctx);
166 last_err = cmp::min(last_err, dep_err);
167 }
168
169 // check if any dependency has an error
170 for dep in &e.deps {
171 // We have to wait on user events or events from other queues.
172 let dep_err = if dep.is_user() || dep.queue != e.queue {
173 dep.wait()
174 } else {
175 dep.status()
176 };
177
178 last_err = cmp::min(last_err, dep_err);
179 }
180
181 if last_err < 0 {
182 // If a dependency failed, fail this event as well.
183 e.set_user_status(last_err);
184 continue;
185 }
186
187 // if there is an execution error don't bother signaling it as the context
188 // might be in a broken state. How queues behave after any event hit an
189 // error is entirely implementation defined.
190 last_err = e.call(&ctx);
191 if last_err < 0 {
192 continue;
193 }
194
195 if e.is_user() {
196 // On each user event we flush our events as application might
197 // wait on them before signaling user events.
198 last_err = flush_events(&mut flushed, &ctx);
199
200 if last_err >= 0 {
201 // Wait on user events as they are synchronization points in the
202 // application's control.
203 e.wait();
204 }
205 } else if Platform::dbg().sync_every_event {
206 flushed.push(e);
207 last_err = flush_events(&mut flushed, &ctx);
208 } else {
209 flushed.push(e);
210 }
211 }
212
213 let flush_err = flush_events(&mut flushed, &ctx);
214 last_err = cmp::min(last_err, flush_err);
215 }
216 })
217 .unwrap(),
218 }))
219 }
220
queue(&self, e: Arc<Event>)221 pub fn queue(&self, e: Arc<Event>) {
222 if self.is_profiling_enabled() {
223 e.set_time(EventTimes::Queued, self.device.screen().get_timestamp());
224 }
225 self.state.lock().unwrap().pending.push(e);
226 }
227
flush(&self, wait: bool) -> CLResult<()>228 pub fn flush(&self, wait: bool) -> CLResult<()> {
229 let mut state = self.state.lock().unwrap();
230 let events = mem::take(&mut state.pending);
231 let mut queues = Event::deep_unflushed_queues(&events);
232
233 // Update last if and only if we get new events, this prevents breaking application code
234 // doing things like `clFlush(q); clFinish(q);`
235 if let Some(last) = events.last() {
236 state.last = Arc::downgrade(last);
237
238 // This should never ever error, but if it does return an error
239 state
240 .chan_in
241 .send(events)
242 .map_err(|_| CL_OUT_OF_HOST_MEMORY)?;
243 }
244
245 let last = wait.then(|| state.last.clone());
246
247 // We have to unlock before actually flushing otherwise we'll run into dead locks when a
248 // queue gets flushed concurrently.
249 drop(state);
250
251 // We need to flush out other queues implicitly and this _has_ to happen after taking the
252 // pending events, otherwise we'll risk dead locks when waiting on events.
253 queues.remove(self);
254 for q in queues {
255 q.flush(false)?;
256 }
257
258 if let Some(last) = last {
259 // Waiting on the last event is good enough here as the queue will process it in order
260 // It's not a problem if the weak ref is invalid as that means the work is already done
261 // and waiting isn't necessary anymore.
262 let err = last.upgrade().map(|e| e.wait()).unwrap_or_default();
263 if err < 0 {
264 return Err(err);
265 }
266 }
267 Ok(())
268 }
269
is_dead(&self) -> bool270 pub fn is_dead(&self) -> bool {
271 self.thrd.is_finished()
272 }
273
is_profiling_enabled(&self) -> bool274 pub fn is_profiling_enabled(&self) -> bool {
275 (self.props & (CL_QUEUE_PROFILING_ENABLE as u64)) != 0
276 }
277 }
278
279 impl Drop for Queue {
drop(&mut self)280 fn drop(&mut self) {
281 // when deleting the application side object, we have to flush
282 // From the OpenCL spec:
283 // clReleaseCommandQueue performs an implicit flush to issue any previously queued OpenCL
284 // commands in command_queue.
285 // TODO: maybe we have to do it on every release?
286 let _ = self.flush(true);
287 }
288 }
289