1 use crate::api::icd::*;
2 use crate::core::context::*;
3 use crate::core::device::*;
4 use crate::core::event::*;
5 use crate::core::platform::*;
6 use crate::impl_cl_type_trait;
7
8 use mesa_rust::pipe::context::PipeContext;
9 use mesa_rust::pipe::resource::PipeResource;
10 use mesa_rust::pipe::screen::ResourceType;
11 use mesa_rust_util::properties::*;
12 use rusticl_opencl_gen::*;
13
14 use std::mem;
15 use std::ops::Deref;
16 use std::sync::mpsc;
17 use std::sync::Arc;
18 use std::sync::Mutex;
19 use std::sync::Weak;
20 use std::thread;
21 use std::thread::JoinHandle;
22
23 /// State tracking wrapper for [PipeContext]
24 ///
25 /// Used for tracking bound GPU state to lower CPU overhead and centralize state tracking
26 pub struct QueueContext {
27 ctx: PipeContext,
28 cb0: Option<PipeResource>,
29 }
30
31 impl QueueContext {
new_for(device: &Device) -> CLResult<Self>32 fn new_for(device: &Device) -> CLResult<Self> {
33 let ctx = device
34 .screen()
35 .create_context()
36 .ok_or(CL_OUT_OF_HOST_MEMORY)?;
37 let size = device.param_max_size() as u32;
38 let cb0 = if device.prefers_real_buffer_in_cb0() {
39 device
40 .screen()
41 .resource_create_buffer(size, ResourceType::Cb0, 0)
42 } else {
43 None
44 };
45
46 if let Some(cb0) = &cb0 {
47 ctx.bind_constant_buffer(0, cb0);
48 }
49
50 Ok(Self { ctx: ctx, cb0: cb0 })
51 }
52
update_cb0(&self, data: &[u8])53 pub fn update_cb0(&self, data: &[u8]) {
54 // only update if we actually bind data
55 if !data.is_empty() {
56 // if we have a real buffer, update that, otherwise just set the data directly
57 if let Some(cb) = &self.cb0 {
58 debug_assert!(data.len() <= cb.width() as usize);
59 self.ctx
60 .buffer_subdata(cb, 0, data.as_ptr().cast(), data.len() as u32);
61 } else {
62 self.ctx.set_constant_buffer(0, data);
63 }
64 }
65 }
66 }
67
68 // This should go once we moved all state tracking into QueueContext
69 impl Deref for QueueContext {
70 type Target = PipeContext;
71
deref(&self) -> &Self::Target72 fn deref(&self) -> &Self::Target {
73 &self.ctx
74 }
75 }
76
77 impl Drop for QueueContext {
drop(&mut self)78 fn drop(&mut self) {
79 self.ctx.set_constant_buffer(0, &[])
80 }
81 }
82
83 struct QueueState {
84 pending: Vec<Arc<Event>>,
85 last: Weak<Event>,
86 // `Sync` on `Sender` was stabilized in 1.72, until then, put it into our Mutex.
87 // see https://github.com/rust-lang/rust/commit/5f56956b3c7edb9801585850d1f41b0aeb1888ff
88 chan_in: mpsc::Sender<Vec<Arc<Event>>>,
89 }
90
91 pub struct Queue {
92 pub base: CLObjectBase<CL_INVALID_COMMAND_QUEUE>,
93 pub context: Arc<Context>,
94 pub device: &'static Device,
95 pub props: cl_command_queue_properties,
96 pub props_v2: Option<Properties<cl_queue_properties>>,
97 state: Mutex<QueueState>,
98 _thrd: JoinHandle<()>,
99 }
100
101 impl_cl_type_trait!(cl_command_queue, Queue, CL_INVALID_COMMAND_QUEUE);
102
flush_events(evs: &mut Vec<Arc<Event>>, pipe: &PipeContext)103 fn flush_events(evs: &mut Vec<Arc<Event>>, pipe: &PipeContext) {
104 if !evs.is_empty() {
105 pipe.flush().wait();
106 evs.drain(..).for_each(|e| e.signal());
107 }
108 }
109
110 impl Queue {
new( context: Arc<Context>, device: &'static Device, props: cl_command_queue_properties, props_v2: Option<Properties<cl_queue_properties>>, ) -> CLResult<Arc<Queue>>111 pub fn new(
112 context: Arc<Context>,
113 device: &'static Device,
114 props: cl_command_queue_properties,
115 props_v2: Option<Properties<cl_queue_properties>>,
116 ) -> CLResult<Arc<Queue>> {
117 // we assume that memory allocation is the only possible failure. Any other failure reason
118 // should be detected earlier (e.g.: checking for CAPs).
119 let ctx = QueueContext::new_for(device)?;
120 let (tx_q, rx_t) = mpsc::channel::<Vec<Arc<Event>>>();
121 Ok(Arc::new(Self {
122 base: CLObjectBase::new(RusticlTypes::Queue),
123 context: context,
124 device: device,
125 props: props,
126 props_v2: props_v2,
127 state: Mutex::new(QueueState {
128 pending: Vec::new(),
129 last: Weak::new(),
130 chan_in: tx_q,
131 }),
132 _thrd: thread::Builder::new()
133 .name("rusticl queue thread".into())
134 .spawn(move || loop {
135 let r = rx_t.recv();
136 if r.is_err() {
137 break;
138 }
139
140 let new_events = r.unwrap();
141 let mut flushed = Vec::new();
142
143 for e in new_events {
144 // If we hit any deps from another queue, flush so we don't risk a dead
145 // lock.
146 if e.deps.iter().any(|ev| ev.queue != e.queue) {
147 flush_events(&mut flushed, &ctx);
148 }
149
150 // We have to wait on user events or events from other queues.
151 let err = e
152 .deps
153 .iter()
154 .filter(|ev| ev.is_user() || ev.queue != e.queue)
155 .map(|e| e.wait())
156 .find(|s| *s < 0);
157
158 if let Some(err) = err {
159 // If a dependency failed, fail this event as well.
160 e.set_user_status(err);
161 continue;
162 }
163
164 e.call(&ctx);
165
166 if e.is_user() {
167 // On each user event we flush our events as application might
168 // wait on them before signaling user events.
169 flush_events(&mut flushed, &ctx);
170
171 // Wait on user events as they are synchronization points in the
172 // application's control.
173 e.wait();
174 } else if Platform::dbg().sync_every_event {
175 flushed.push(e);
176 flush_events(&mut flushed, &ctx);
177 } else {
178 flushed.push(e);
179 }
180 }
181
182 flush_events(&mut flushed, &ctx);
183 })
184 .unwrap(),
185 }))
186 }
187
queue(&self, e: Arc<Event>)188 pub fn queue(&self, e: Arc<Event>) {
189 if self.is_profiling_enabled() {
190 e.set_time(EventTimes::Queued, self.device.screen().get_timestamp());
191 }
192 self.state.lock().unwrap().pending.push(e);
193 }
194
flush(&self, wait: bool) -> CLResult<()>195 pub fn flush(&self, wait: bool) -> CLResult<()> {
196 let mut state = self.state.lock().unwrap();
197 let events = mem::take(&mut state.pending);
198 let mut queues = Event::deep_unflushed_queues(&events);
199
200 // Update last if and only if we get new events, this prevents breaking application code
201 // doing things like `clFlush(q); clFinish(q);`
202 if let Some(last) = events.last() {
203 state.last = Arc::downgrade(last);
204
205 // This should never ever error, but if it does return an error
206 state
207 .chan_in
208 .send(events)
209 .map_err(|_| CL_OUT_OF_HOST_MEMORY)?;
210 }
211
212 let last = wait.then(|| state.last.clone());
213
214 // We have to unlock before actually flushing otherwise we'll run into dead locks when a
215 // queue gets flushed concurrently.
216 drop(state);
217
218 // We need to flush out other queues implicitly and this _has_ to happen after taking the
219 // pending events, otherwise we'll risk dead locks when waiting on events.
220 queues.remove(self);
221 for q in queues {
222 q.flush(false)?;
223 }
224
225 if let Some(last) = last {
226 // Waiting on the last event is good enough here as the queue will process it in order
227 // It's not a problem if the weak ref is invalid as that means the work is already done
228 // and waiting isn't necessary anymore.
229 last.upgrade().map(|e| e.wait());
230 }
231 Ok(())
232 }
233
is_profiling_enabled(&self) -> bool234 pub fn is_profiling_enabled(&self) -> bool {
235 (self.props & (CL_QUEUE_PROFILING_ENABLE as u64)) != 0
236 }
237 }
238
239 impl Drop for Queue {
drop(&mut self)240 fn drop(&mut self) {
241 // when deleting the application side object, we have to flush
242 // From the OpenCL spec:
243 // clReleaseCommandQueue performs an implicit flush to issue any previously queued OpenCL
244 // commands in command_queue.
245 // TODO: maybe we have to do it on every release?
246 let _ = self.flush(true);
247 }
248 }
249