• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::api::icd::*;
2 use crate::core::context::*;
3 use crate::core::device::*;
4 use crate::core::event::*;
5 use crate::core::platform::*;
6 use crate::impl_cl_type_trait;
7 
8 use mesa_rust::pipe::context::PipeContext;
9 use mesa_rust::pipe::resource::PipeResource;
10 use mesa_rust::pipe::screen::ResourceType;
11 use mesa_rust_util::properties::*;
12 use rusticl_opencl_gen::*;
13 
14 use std::mem;
15 use std::ops::Deref;
16 use std::sync::mpsc;
17 use std::sync::Arc;
18 use std::sync::Mutex;
19 use std::sync::Weak;
20 use std::thread;
21 use std::thread::JoinHandle;
22 
23 /// State tracking wrapper for [PipeContext]
24 ///
25 /// Used for tracking bound GPU state to lower CPU overhead and centralize state tracking
26 pub struct QueueContext {
27     ctx: PipeContext,
28     cb0: Option<PipeResource>,
29 }
30 
31 impl QueueContext {
new_for(device: &Device) -> CLResult<Self>32     fn new_for(device: &Device) -> CLResult<Self> {
33         let ctx = device
34             .screen()
35             .create_context()
36             .ok_or(CL_OUT_OF_HOST_MEMORY)?;
37         let size = device.param_max_size() as u32;
38         let cb0 = if device.prefers_real_buffer_in_cb0() {
39             device
40                 .screen()
41                 .resource_create_buffer(size, ResourceType::Cb0, 0)
42         } else {
43             None
44         };
45 
46         if let Some(cb0) = &cb0 {
47             ctx.bind_constant_buffer(0, cb0);
48         }
49 
50         Ok(Self { ctx: ctx, cb0: cb0 })
51     }
52 
update_cb0(&self, data: &[u8])53     pub fn update_cb0(&self, data: &[u8]) {
54         // only update if we actually bind data
55         if !data.is_empty() {
56             // if we have a real buffer, update that, otherwise just set the data directly
57             if let Some(cb) = &self.cb0 {
58                 debug_assert!(data.len() <= cb.width() as usize);
59                 self.ctx
60                     .buffer_subdata(cb, 0, data.as_ptr().cast(), data.len() as u32);
61             } else {
62                 self.ctx.set_constant_buffer(0, data);
63             }
64         }
65     }
66 }
67 
68 // This should go once we moved all state tracking into QueueContext
69 impl Deref for QueueContext {
70     type Target = PipeContext;
71 
deref(&self) -> &Self::Target72     fn deref(&self) -> &Self::Target {
73         &self.ctx
74     }
75 }
76 
77 impl Drop for QueueContext {
drop(&mut self)78     fn drop(&mut self) {
79         self.ctx.set_constant_buffer(0, &[])
80     }
81 }
82 
83 struct QueueState {
84     pending: Vec<Arc<Event>>,
85     last: Weak<Event>,
86     // `Sync` on `Sender` was stabilized in 1.72, until then, put it into our Mutex.
87     // see https://github.com/rust-lang/rust/commit/5f56956b3c7edb9801585850d1f41b0aeb1888ff
88     chan_in: mpsc::Sender<Vec<Arc<Event>>>,
89 }
90 
91 pub struct Queue {
92     pub base: CLObjectBase<CL_INVALID_COMMAND_QUEUE>,
93     pub context: Arc<Context>,
94     pub device: &'static Device,
95     pub props: cl_command_queue_properties,
96     pub props_v2: Option<Properties<cl_queue_properties>>,
97     state: Mutex<QueueState>,
98     _thrd: JoinHandle<()>,
99 }
100 
101 impl_cl_type_trait!(cl_command_queue, Queue, CL_INVALID_COMMAND_QUEUE);
102 
flush_events(evs: &mut Vec<Arc<Event>>, pipe: &PipeContext)103 fn flush_events(evs: &mut Vec<Arc<Event>>, pipe: &PipeContext) {
104     if !evs.is_empty() {
105         pipe.flush().wait();
106         evs.drain(..).for_each(|e| e.signal());
107     }
108 }
109 
110 impl Queue {
new( context: Arc<Context>, device: &'static Device, props: cl_command_queue_properties, props_v2: Option<Properties<cl_queue_properties>>, ) -> CLResult<Arc<Queue>>111     pub fn new(
112         context: Arc<Context>,
113         device: &'static Device,
114         props: cl_command_queue_properties,
115         props_v2: Option<Properties<cl_queue_properties>>,
116     ) -> CLResult<Arc<Queue>> {
117         // we assume that memory allocation is the only possible failure. Any other failure reason
118         // should be detected earlier (e.g.: checking for CAPs).
119         let ctx = QueueContext::new_for(device)?;
120         let (tx_q, rx_t) = mpsc::channel::<Vec<Arc<Event>>>();
121         Ok(Arc::new(Self {
122             base: CLObjectBase::new(RusticlTypes::Queue),
123             context: context,
124             device: device,
125             props: props,
126             props_v2: props_v2,
127             state: Mutex::new(QueueState {
128                 pending: Vec::new(),
129                 last: Weak::new(),
130                 chan_in: tx_q,
131             }),
132             _thrd: thread::Builder::new()
133                 .name("rusticl queue thread".into())
134                 .spawn(move || loop {
135                     let r = rx_t.recv();
136                     if r.is_err() {
137                         break;
138                     }
139 
140                     let new_events = r.unwrap();
141                     let mut flushed = Vec::new();
142 
143                     for e in new_events {
144                         // If we hit any deps from another queue, flush so we don't risk a dead
145                         // lock.
146                         if e.deps.iter().any(|ev| ev.queue != e.queue) {
147                             flush_events(&mut flushed, &ctx);
148                         }
149 
150                         // We have to wait on user events or events from other queues.
151                         let err = e
152                             .deps
153                             .iter()
154                             .filter(|ev| ev.is_user() || ev.queue != e.queue)
155                             .map(|e| e.wait())
156                             .find(|s| *s < 0);
157 
158                         if let Some(err) = err {
159                             // If a dependency failed, fail this event as well.
160                             e.set_user_status(err);
161                             continue;
162                         }
163 
164                         e.call(&ctx);
165 
166                         if e.is_user() {
167                             // On each user event we flush our events as application might
168                             // wait on them before signaling user events.
169                             flush_events(&mut flushed, &ctx);
170 
171                             // Wait on user events as they are synchronization points in the
172                             // application's control.
173                             e.wait();
174                         } else if Platform::dbg().sync_every_event {
175                             flushed.push(e);
176                             flush_events(&mut flushed, &ctx);
177                         } else {
178                             flushed.push(e);
179                         }
180                     }
181 
182                     flush_events(&mut flushed, &ctx);
183                 })
184                 .unwrap(),
185         }))
186     }
187 
queue(&self, e: Arc<Event>)188     pub fn queue(&self, e: Arc<Event>) {
189         if self.is_profiling_enabled() {
190             e.set_time(EventTimes::Queued, self.device.screen().get_timestamp());
191         }
192         self.state.lock().unwrap().pending.push(e);
193     }
194 
flush(&self, wait: bool) -> CLResult<()>195     pub fn flush(&self, wait: bool) -> CLResult<()> {
196         let mut state = self.state.lock().unwrap();
197         let events = mem::take(&mut state.pending);
198         let mut queues = Event::deep_unflushed_queues(&events);
199 
200         // Update last if and only if we get new events, this prevents breaking application code
201         // doing things like `clFlush(q); clFinish(q);`
202         if let Some(last) = events.last() {
203             state.last = Arc::downgrade(last);
204 
205             // This should never ever error, but if it does return an error
206             state
207                 .chan_in
208                 .send(events)
209                 .map_err(|_| CL_OUT_OF_HOST_MEMORY)?;
210         }
211 
212         let last = wait.then(|| state.last.clone());
213 
214         // We have to unlock before actually flushing otherwise we'll run into dead locks when a
215         // queue gets flushed concurrently.
216         drop(state);
217 
218         // We need to flush out other queues implicitly and this _has_ to happen after taking the
219         // pending events, otherwise we'll risk dead locks when waiting on events.
220         queues.remove(self);
221         for q in queues {
222             q.flush(false)?;
223         }
224 
225         if let Some(last) = last {
226             // Waiting on the last event is good enough here as the queue will process it in order
227             // It's not a problem if the weak ref is invalid as that means the work is already done
228             // and waiting isn't necessary anymore.
229             last.upgrade().map(|e| e.wait());
230         }
231         Ok(())
232     }
233 
is_profiling_enabled(&self) -> bool234     pub fn is_profiling_enabled(&self) -> bool {
235         (self.props & (CL_QUEUE_PROFILING_ENABLE as u64)) != 0
236     }
237 }
238 
239 impl Drop for Queue {
drop(&mut self)240     fn drop(&mut self) {
241         // when deleting the application side object, we have to flush
242         // From the OpenCL spec:
243         // clReleaseCommandQueue performs an implicit flush to issue any previously queued OpenCL
244         // commands in command_queue.
245         // TODO: maybe we have to do it on every release?
246         let _ = self.flush(true);
247     }
248 }
249