• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // SPDX-License-Identifier: Apache-2.0
2 // ----------------------------------------------------------------------------
3 // Copyright 2011-2024 Arm Limited
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
6 // use this file except in compliance with the License. You may obtain a copy
7 // of the License at:
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 // License for the specific language governing permissions and limitations
15 // under the License.
16 // ----------------------------------------------------------------------------
17 
18 /**
19  * @brief Functions and data declarations for the outer context.
20  *
21  * The outer context includes thread-pool management, which is slower to
22  * compile due to increased use of C++ stdlib. The inner context used in the
23  * majority of the codec library does not include this.
24  */
25 
26 #ifndef ASTCENC_INTERNAL_ENTRY_INCLUDED
27 #define ASTCENC_INTERNAL_ENTRY_INCLUDED
28 
29 #include <atomic>
30 #include <condition_variable>
31 #include <functional>
32 #include <mutex>
33 
34 #include "astcenc_internal.h"
35 
36 /* ============================================================================
37   Parallel execution control
38 ============================================================================ */
39 
40 /**
41  * @brief A simple counter-based manager for parallel task execution.
42  *
43  * The task processing execution consists of:
44  *
45  *     * A single-threaded init stage.
46  *     * A multi-threaded processing stage.
47  *     * A condition variable so threads can wait for processing completion.
48  *
49  * The init stage will be executed by the first thread to arrive in the critical section, there is
50  * no main thread in the thread pool.
51  *
52  * The processing stage uses dynamic dispatch to assign task tickets to threads on an on-demand
53  * basis. Threads may each therefore executed different numbers of tasks, depending on their
54  * processing complexity. The task queue and the task tickets are just counters; the caller must map
55  * these integers to an actual processing partition in a specific problem domain.
56  *
57  * The exit wait condition is needed to ensure processing has finished before a worker thread can
58  * progress to the next stage of the pipeline. Specifically a worker may exit the processing stage
59  * because there are no new tasks to assign to it while other worker threads are still processing.
60  * Calling @c wait() will ensure that all other worker have finished before the thread can proceed.
61  *
62  * The basic usage model:
63  *
64  *     // --------- From single-threaded code ---------
65  *
66  *     // Reset the tracker state
67  *     manager->reset()
68  *
69  *     // --------- From multi-threaded code ---------
70  *
71  *     // Run the stage init; only first thread actually runs the lambda
72  *     manager->init(<lambda>)
73  *
74  *     do
75  *     {
76  *         // Request a task assignment
77  *         uint task_count;
78  *         uint base_index = manager->get_tasks(<granule>, task_count);
79  *
80  *         // Process any tasks we were given (task_count <= granule size)
81  *         if (task_count)
82  *         {
83  *             // Run the user task processing code for N tasks here
84  *             ...
85  *
86  *             // Flag these tasks as complete
87  *             manager->complete_tasks(task_count);
88  *         }
89  *     } while (task_count);
90  *
91  *     // Wait for all threads to complete tasks before progressing
92  *     manager->wait()
93  *
94   *     // Run the stage term; only first thread actually runs the lambda
95  *     manager->term(<lambda>)
96  */
97 class ParallelManager
98 {
99 private:
100 	/** @brief Lock used for critical section and condition synchronization. */
101 	std::mutex m_lock;
102 
103 	/** @brief True if the stage init() step has been executed. */
104 	bool m_init_done;
105 
106 	/** @brief True if the stage term() step has been executed. */
107 	bool m_term_done;
108 
109 	/** @brief Condition variable for tracking stage processing completion. */
110 	std::condition_variable m_complete;
111 
112 	/** @brief Number of tasks started, but not necessarily finished. */
113 	std::atomic<unsigned int> m_start_count;
114 
115 	/** @brief Number of tasks finished. */
116 	unsigned int m_done_count;
117 
118 	/** @brief Number of tasks that need to be processed. */
119 	unsigned int m_task_count;
120 
121 	/** @brief Progress callback (optional). */
122 	astcenc_progress_callback m_callback;
123 
124 	/** @brief Lock used for callback synchronization. */
125 	std::mutex m_callback_lock;
126 
127 	/** @brief Minimum progress before making a callback. */
128 	float m_callback_min_diff;
129 
130 	/** @brief Last progress callback value. */
131 	float m_callback_last_value;
132 
133 public:
134 	/** @brief Create a new ParallelManager. */
ParallelManager()135 	ParallelManager()
136 	{
137 		reset();
138 	}
139 
140 	/**
141 	 * @brief Reset the tracker for a new processing batch.
142 	 *
143 	 * This must be called from single-threaded code before starting the multi-threaded processing
144 	 * operations.
145 	 */
reset()146 	void reset()
147 	{
148 		m_init_done = false;
149 		m_term_done = false;
150 		m_start_count = 0;
151 		m_done_count = 0;
152 		m_task_count = 0;
153 		m_callback_last_value = 0.0f;
154 		m_callback_min_diff = 1.0f;
155 	}
156 
157 	/**
158 	 * @brief Trigger the pipeline stage init step.
159 	 *
160 	 * This can be called from multi-threaded code. The first thread to hit this will process the
161 	 * initialization. Other threads will block and wait for it to complete.
162 	 *
163 	 * @param init_func   Callable which executes the stage initialization. It must return the
164 	 *                    total number of tasks in the stage.
165 	 */
init(std::function<unsigned int (void)> init_func)166 	void init(std::function<unsigned int(void)> init_func)
167 	{
168 		std::lock_guard<std::mutex> lck(m_lock);
169 		if (!m_init_done)
170 		{
171 			m_task_count = init_func();
172 			m_init_done = true;
173 		}
174 	}
175 
176 	/**
177 	 * @brief Trigger the pipeline stage init step.
178 	 *
179 	 * This can be called from multi-threaded code. The first thread to hit this will process the
180 	 * initialization. Other threads will block and wait for it to complete.
181 	 *
182 	 * @param task_count   Total number of tasks needing processing.
183 	 * @param callback     Function pointer for progress status callbacks.
184 	 */
init(unsigned int task_count,astcenc_progress_callback callback)185 	void init(unsigned int task_count, astcenc_progress_callback callback)
186 	{
187 		std::lock_guard<std::mutex> lck(m_lock);
188 		if (!m_init_done)
189 		{
190 			m_callback = callback;
191 			m_task_count = task_count;
192 			m_init_done = true;
193 
194 			// Report every 1% or 4096 blocks, whichever is larger, to avoid callback overhead
195 			float min_diff = (4096.0f / static_cast<float>(task_count)) * 100.0f;
196 			m_callback_min_diff = astc::max(min_diff, 1.0f);
197 		}
198 	}
199 
200 	/**
201 	 * @brief Request a task assignment.
202 	 *
203 	 * Assign up to @c granule tasks to the caller for processing.
204 	 *
205 	 * @param      granule   Maximum number of tasks that can be assigned.
206 	 * @param[out] count     Actual number of tasks assigned, or zero if no tasks were assigned.
207 	 *
208 	 * @return Task index of the first assigned task; assigned tasks increment from this.
209 	 */
get_task_assignment(unsigned int granule,unsigned int & count)210 	unsigned int get_task_assignment(unsigned int granule, unsigned int& count)
211 	{
212 		unsigned int base = m_start_count.fetch_add(granule, std::memory_order_relaxed);
213 		if (base >= m_task_count)
214 		{
215 			count = 0;
216 			return 0;
217 		}
218 
219 		count = astc::min(m_task_count - base, granule);
220 		return base;
221 	}
222 
223 	/**
224 	 * @brief Complete a task assignment.
225 	 *
226 	 * Mark @c count tasks as complete. This will notify all threads blocked on @c wait() if this
227 	 * completes the processing of the stage.
228 	 *
229 	 * @param count   The number of completed tasks.
230 	 */
complete_task_assignment(unsigned int count)231 	void complete_task_assignment(unsigned int count)
232 	{
233 		// Note: m_done_count cannot use an atomic without the mutex; this has a race between the
234 		// update here and the wait() for other threads
235 		unsigned int local_count;
236 		float local_last_value;
237 		{
238 			std::unique_lock<std::mutex> lck(m_lock);
239 			m_done_count += count;
240 			local_count = m_done_count;
241 			local_last_value = m_callback_last_value;
242 
243 			if (m_done_count == m_task_count)
244 			{
245 				// Ensure the progress bar hits 100%
246 				if (m_callback)
247 				{
248 					std::unique_lock<std::mutex> cblck(m_callback_lock);
249 					m_callback(100.0f);
250 					m_callback_last_value = 100.0f;
251 				}
252 
253 				lck.unlock();
254 				m_complete.notify_all();
255 			}
256 		}
257 
258 		// Process progress callback if we have one
259 		if (m_callback)
260 		{
261 			// Initial lockless test - have we progressed enough to emit?
262 			float num = static_cast<float>(local_count);
263 			float den = static_cast<float>(m_task_count);
264 			float this_value =  (num / den) * 100.0f;
265 			bool report_test = (this_value - local_last_value) > m_callback_min_diff;
266 
267 			// Recheck under lock, because another thread might report first
268 			if (report_test)
269 			{
270 				std::unique_lock<std::mutex> cblck(m_callback_lock);
271 				bool report_retest = (this_value - m_callback_last_value) > m_callback_min_diff;
272 				if (report_retest)
273 				{
274 					m_callback(this_value);
275 					m_callback_last_value = this_value;
276 				}
277 			}
278 		}
279 	}
280 
281 	/**
282 	 * @brief Wait for stage processing to complete.
283 	 */
wait()284 	void wait()
285 	{
286 		std::unique_lock<std::mutex> lck(m_lock);
287 		m_complete.wait(lck, [this]{ return m_done_count == m_task_count; });
288 	}
289 
290 	/**
291 	 * @brief Trigger the pipeline stage term step.
292 	 *
293 	 * This can be called from multi-threaded code. The first thread to hit this will process the
294 	 * work pool termination. Caller must have called @c wait() prior to calling this function to
295 	 * ensure that processing is complete.
296 	 *
297 	 * @param term_func   Callable which executes the stage termination.
298 	 */
term(std::function<void (void)> term_func)299 	void term(std::function<void(void)> term_func)
300 	{
301 		std::lock_guard<std::mutex> lck(m_lock);
302 		if (!m_term_done)
303 		{
304 			term_func();
305 			m_term_done = true;
306 		}
307 	}
308 };
309 
310 /**
311  * @brief The astcenc compression context.
312  */
313 struct astcenc_context
314 {
315 	/** @brief The context internal state. */
316 	astcenc_contexti context;
317 
318 #if !defined(ASTCENC_DECOMPRESS_ONLY)
319 	/** @brief The parallel manager for averages computation. */
320 	ParallelManager manage_avg;
321 
322 	/** @brief The parallel manager for compression. */
323 	ParallelManager manage_compress;
324 #endif
325 
326 	/** @brief The parallel manager for decompression. */
327 	ParallelManager manage_decompress;
328 };
329 
330 #endif
331