1 // Copyright 2015 The Gemmlowp Authors. All Rights Reserved.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 // multi_thread_gemm.h: Multi-threaded GEMM entry point.
16 // Readers note: To understand this file, it is useful to first
17 // read and understand the much simpler single_thread_gemm.h.
18
19 #ifndef GEMMLOWP_INTERNAL_MULTI_THREAD_GEMM_H_
20 #define GEMMLOWP_INTERNAL_MULTI_THREAD_GEMM_H_
21
22 #include <atomic> // NOLINT
23 #include <chrono> // NOLINT
24 #include <thread> // NOLINT
25 #include <vector>
26
27 #include "single_thread_gemm.h"
28
29 namespace gemmlowp {
30
31 // This value was empirically derived on an end-to-end application benchmark.
32 // That this number of cycles means that we may be sleeping substantially longer
33 // than a scheduler timeslice's duration is not necessarily surprising. The
34 // idea is to pick up quickly new work after having finished the previous
35 // workload. When it's new work within the same GEMM as the previous work, the
36 // time interval that we might be busy-waiting is very small, so for that
37 // purpose it would be more than enough to sleep for 1 million cycles.
38 // That is all what we would observe on a GEMM benchmark. However, in a real
39 // application, after having finished a GEMM, we might do unrelated work for
40 // a little while, then start on a new GEMM. Think of a neural network
41 // application performing inference, where many but not all layers are
42 // implemented by a GEMM. In such cases, our worker threads might be idle for
43 // longer periods of time before having work again. If we let them passively
44 // wait, on a mobile device, the CPU scheduler might aggressively clock down
45 // or even turn off the CPU cores that they were running on. That would result
46 // in a long delay the next time these need to be turned back on for the next
47 // GEMM. So we need to strike a balance that reflects typical time intervals
48 // between consecutive GEMM invokations, not just intra-GEMM considerations.
49 // Of course, we need to balance keeping CPUs spinning longer to resume work
50 // faster, versus passively waiting to conserve power.
51 const int kMaxBusyWaitNOPs = 4 * 1000 * 1000;
52
53 // On X86 and ARM platforms we may use NOP instructions to know how long we
54 // are busy-waiting.
55
56 #if defined(GEMMLOWP_ALLOW_INLINE_ASM) && !defined(GEMMLOWP_NO_BUSYWAIT) && \
57 (defined(GEMMLOWP_ARM) || defined(GEMMLOWP_X86))
58
59 #define GEMMLOWP_NOP "nop\n"
60
61 #define GEMMLOWP_STRING_CONCAT_4(X) X X X X
62 #define GEMMLOWP_NOP4 GEMMLOWP_STRING_CONCAT_4(GEMMLOWP_NOP)
63 #define GEMMLOWP_NOP16 GEMMLOWP_STRING_CONCAT_4(GEMMLOWP_NOP4)
64 #define GEMMLOWP_NOP64 GEMMLOWP_STRING_CONCAT_4(GEMMLOWP_NOP16)
65
DoSomeNOPs()66 inline int DoSomeNOPs() {
67 asm volatile(GEMMLOWP_NOP64);
68 return 64;
69 }
70
71 #undef GEMMLOWP_STRING_CONCAT_4
72 #undef GEMMLOWP_NOP64
73 #undef GEMMLOWP_NOP16
74 #undef GEMMLOWP_NOP4
75 #undef GEMMLOWP_NOP
76
77 #else // May not use asm NOP.
78
79 // If we can't use NOPs, let's use a non-inline function call as a basic
80 // thing that has some vaguely known, nonzero cost.
81 GEMMLOWP_NOINLINE
DoSomeNOPs()82 inline int DoSomeNOPs() {
83 // Pretend that calling an empty function takes as long as 16 NOPs...
84 return 16;
85 }
86 #endif
87
88 // Waits until *var != initial_value.
89 //
90 // Returns the new value of *var. The guarantee here is that
91 // the return value is different from initial_value, and that that
92 // new value has been taken by *var at some point during the
93 // execution of this function. There is no guarantee that this is
94 // still the value of *var when this function returns, since *var is
95 // not assumed to be guarded by any lock.
96 //
97 // First does some busy-waiting for a fixed number of no-op cycles,
98 // then falls back to passive waiting for the given condvar, guarded
99 // by the given mutex.
100 //
101 // The idea of doing some initial busy-waiting is to help get
102 // better and more consistent multithreading benefits for small GEMM sizes.
103 // Busy-waiting help ensuring that if we need to wake up soon after having
104 // started waiting, then we can wake up quickly (as opposed to, say,
105 // having to wait to be scheduled again by the OS). On the other hand,
106 // we must still eventually revert to passive waiting for longer waits
107 // (e.g. worker threads having finished a GEMM and waiting until the next GEMM)
108 // so as to avoid permanently spinning.
109 //
110 template <typename T>
WaitForVariableChange(std::atomic<T> * var,T initial_value,pthread_cond_t * cond,pthread_mutex_t * mutex)111 T WaitForVariableChange(std::atomic<T>* var, T initial_value,
112 pthread_cond_t* cond, pthread_mutex_t* mutex) {
113 // First, trivial case where the variable already changed value.
114 T new_value = var->load(std::memory_order_acquire);
115 if (new_value != initial_value) {
116 return new_value;
117 }
118 // Then try busy-waiting.
119 int nops = 0;
120 while (nops < kMaxBusyWaitNOPs) {
121 nops += DoSomeNOPs();
122 new_value = var->load(std::memory_order_acquire);
123 if (new_value != initial_value) {
124 return new_value;
125 }
126 }
127
128 // Finally, do real passive waiting.
129 pthread_mutex_lock(mutex);
130 new_value = var->load(std::memory_order_acquire);
131 while (new_value == initial_value) {
132 pthread_cond_wait(cond, mutex);
133 new_value = var->load(std::memory_order_acquire);
134 }
135 pthread_mutex_unlock(mutex);
136 return new_value;
137 }
138
139 // A BlockingCounter lets one thread to wait for N events to occur.
140 // This is how the master thread waits for all the worker threads
141 // to have finished working.
142 // The waiting is done using a naive spinlock waiting for the atomic
143 // count_ to hit the value 0. This is acceptable because in our usage
144 // pattern, BlockingCounter is used only to synchronize threads after
145 // short-lived tasks (performing parts of the same GEMM). It is not used
146 // for synchronizing longer waits (resuming work on the next GEMM).
147 class BlockingCounter {
148 public:
BlockingCounter()149 BlockingCounter() : count_(0) {}
150
151 // Sets/resets the counter; initial_count is the number of
152 // decrementing events that the Wait() call will be waiting for.
Reset(std::size_t initial_count)153 void Reset(std::size_t initial_count) {
154 std::size_t old_count_value = count_.load(std::memory_order_relaxed);
155 assert(old_count_value == 0);
156 (void)old_count_value;
157 count_.store(initial_count, std::memory_order_release);
158 }
159
160 // Decrements the counter; if the counter hits zero, signals
161 // the threads that were waiting for that, and returns true.
162 // Otherwise (if the decremented count is still nonzero),
163 // returns false.
DecrementCount()164 bool DecrementCount() {
165 std::size_t old_count_value =
166 count_.fetch_sub(1, std::memory_order_acq_rel);
167 assert(old_count_value > 0);
168 std::size_t count_value = old_count_value - 1;
169 return count_value == 0;
170 }
171
172 // Waits for the N other threads (N having been set by Reset())
173 // to hit the BlockingCounter.
Wait()174 void Wait() {
175 ScopedProfilingLabel label("BlockingCounter::Wait");
176 // Busy-wait until the count value is 0.
177 int nops = 0;
178 while (count_.load(std::memory_order_acquire)) {
179 nops += DoSomeNOPs();
180 if (nops > kMaxBusyWaitNOPs) {
181 nops = 0;
182 // If we are unlucky, the blocking thread (that calls DecrementCount)
183 // and the blocked thread (here, calling Wait) may be scheduled on
184 // the same CPU, so the busy-waiting of the present thread may prevent
185 // the blocking thread from resuming and unblocking.
186 // If we are even unluckier, the priorities of the present thread
187 // might be higher than that of the blocking thread, so just yielding
188 // wouldn't allow the blocking thread to resume. So we sleep for
189 // a substantial amount of time in that case. Notice that we only
190 // do so after having busy-waited for kMaxBusyWaitNOPs, which is
191 // typically several milliseconds, so sleeping 1 more millisecond
192 // isn't terrible at that point.
193 //
194 // How this is mitigated in practice:
195 // In practice, it is well known that the application should be
196 // conservative in choosing how many threads to tell gemmlowp to use,
197 // as it's hard to know how many CPU cores it will get to run on,
198 // on typical mobile devices.
199 // It seems impossible for gemmlowp to make this choice automatically,
200 // which is why gemmlowp's default is to use only 1 thread, and
201 // applications may override that if they know that they can count on
202 // using more than that.
203 std::this_thread::sleep_for(std::chrono::milliseconds(1));
204 }
205 }
206 }
207
208 private:
209 std::atomic<std::size_t> count_;
210 };
211
212 // A workload for a worker.
213 struct Task {
TaskTask214 Task() : local_allocator(nullptr) {}
~TaskTask215 virtual ~Task() {}
216 virtual void Run() = 0;
217 Allocator* local_allocator;
218 };
219
220 // A worker thread.
221 class Worker {
222 public:
223 enum class State {
224 ThreadStartup, // The initial state before the thread main loop runs.
225 Ready, // Is not working, has not yet received new work to do.
226 HasWork, // Has work to do.
227 ExitAsSoonAsPossible // Should exit at earliest convenience.
228 };
229
Worker(BlockingCounter * counter_to_decrement_when_ready)230 explicit Worker(BlockingCounter* counter_to_decrement_when_ready)
231 : task_(nullptr),
232 state_(State::ThreadStartup),
233 counter_to_decrement_when_ready_(counter_to_decrement_when_ready) {
234 pthread_cond_init(&state_cond_, nullptr);
235 pthread_mutex_init(&state_mutex_, nullptr);
236 pthread_create(&thread_, nullptr, ThreadFunc, this);
237 }
238
~Worker()239 ~Worker() {
240 ChangeState(State::ExitAsSoonAsPossible);
241 pthread_join(thread_, nullptr);
242 pthread_cond_destroy(&state_cond_);
243 pthread_mutex_destroy(&state_mutex_);
244 }
245
246 // Changes State; may be called from either the worker thread
247 // or the master thread; however, not all state transitions are legal,
248 // which is guarded by assertions.
249 //
250 // The Task argument is to be used only with new_state==HasWork.
251 // It specifies the Task being handed to this Worker.
252 void ChangeState(State new_state, Task* task = nullptr) {
253 ScopedProfilingLabel label("Worker::ChangeState");
254 pthread_mutex_lock(&state_mutex_);
255 State old_state = state_.load(std::memory_order_relaxed);
256 assert(old_state != new_state);
257 switch (old_state) {
258 case State::ThreadStartup:
259 assert(new_state == State::Ready);
260 break;
261 case State::Ready:
262 assert(new_state == State::HasWork ||
263 new_state == State::ExitAsSoonAsPossible);
264 break;
265 case State::HasWork:
266 assert(new_state == State::Ready ||
267 new_state == State::ExitAsSoonAsPossible);
268 break;
269 default:
270 abort();
271 }
272 switch (new_state) {
273 case State::Ready:
274 if (task_) {
275 // Doing work is part of reverting to 'ready' state.
276 task_->Run();
277 task_ = nullptr;
278 }
279 break;
280 case State::HasWork:
281 assert(!task_);
282 task->local_allocator = &local_allocator_;
283 task_ = task;
284 break;
285 default:
286 break;
287 }
288 state_.store(new_state, std::memory_order_relaxed);
289 pthread_cond_broadcast(&state_cond_);
290 pthread_mutex_unlock(&state_mutex_);
291 if (new_state == State::Ready) {
292 counter_to_decrement_when_ready_->DecrementCount();
293 }
294 }
295
296 // Thread entry point.
ThreadFunc()297 void ThreadFunc() {
298 ScopedProfilingLabel label("Worker::ThreadFunc");
299
300 ChangeState(State::Ready);
301
302 // Thread main loop
303 while (true) {
304 // Get a state to act on
305 // In the 'Ready' state, we have nothing to do but to wait until
306 // we switch to another state.
307 State state_to_act_upon = WaitForVariableChange(
308 &state_, State::Ready, &state_cond_, &state_mutex_);
309
310 // We now have a state to act on, so act.
311 switch (state_to_act_upon) {
312 case State::HasWork:
313 // Got work to do! So do it, and then revert to 'Ready' state.
314 ChangeState(State::Ready);
315 break;
316 case State::ExitAsSoonAsPossible:
317 return;
318 default:
319 abort();
320 }
321 }
322 }
323
ThreadFunc(void * arg)324 static void* ThreadFunc(void* arg) {
325 static_cast<Worker*>(arg)->ThreadFunc();
326 return nullptr;
327 }
328
329 // Called by the master thead to give this worker work to do.
StartWork(Task * task)330 void StartWork(Task* task) { ChangeState(State::HasWork, task); }
331
332 private:
333 // The underlying thread.
334 pthread_t thread_;
335
336 // The task to be worked on.
337 Task* task_;
338
339 // The condition variable and mutex guarding state changes.
340 pthread_cond_t state_cond_;
341 pthread_mutex_t state_mutex_;
342
343 // The state enum tells if we're currently working, waiting for work, etc.
344 // Its concurrent accesses by the worker and main threads are guarded by
345 // state_mutex_, and can thus use memory_order_relaxed. This still needs
346 // to be a std::atomic because we use WaitForVariableChange.
347 std::atomic<State> state_;
348
349 // Each thread had a local allocator so they can allocate temporary
350 // buffers without blocking each other.
351 Allocator local_allocator_;
352
353 // pointer to the master's thread BlockingCounter object, to notify the
354 // master thread of when this worker switches to the 'Ready' state.
355 BlockingCounter* const counter_to_decrement_when_ready_;
356 };
357
358 // A very simple pool of workers, that only allows the very
359 // specific parallelization pattern that we use here:
360 // a fixed number of workers can be given work, and one then
361 // waits for all of them to finish.
362 //
363 // See MultiThreadGemmContextBase for how other WorkersPool implementations can
364 // be used.
365 class WorkersPool {
366 public:
WorkersPool()367 WorkersPool() {}
368
~WorkersPool()369 ~WorkersPool() {
370 for (auto w : workers_) {
371 delete w;
372 }
373 }
374
375 // Just executes the tasks. Does not destroy them. Similar to
376 // ruy::ThreadPool::Execute.
377 template <typename TaskType>
Execute(int tasks_count,TaskType * tasks)378 void Execute(int tasks_count, TaskType* tasks) {
379 assert(tasks_count >= 1);
380 // One of the tasks will be run on the current thread.
381 std::size_t workers_count = tasks_count - 1;
382 CreateWorkers(workers_count);
383 assert(workers_count <= workers_.size());
384 counter_to_decrement_when_ready_.Reset(workers_count);
385 for (std::size_t i = 0; i < tasks_count - 1; i++) {
386 workers_[i]->StartWork(&tasks[i]);
387 }
388 // Execute the remaining workload immediately on the current thread.
389 Task* task = &tasks[tasks_count - 1];
390 task->local_allocator = &main_thread_task_allocator_;
391 task->Run();
392 // Wait for the workers submitted above to finish.
393 counter_to_decrement_when_ready_.Wait();
394 }
395
396 // Legacy: executes the tasks and destroys them
LegacyExecuteAndDestroyTasks(const std::vector<Task * > & tasks)397 void LegacyExecuteAndDestroyTasks(const std::vector<Task*>& tasks) {
398 std::size_t tasks_count = tasks.size();
399 assert(tasks_count >= 1);
400 // One of the tasks will be run on the current thread.
401 std::size_t workers_count = tasks_count - 1;
402 CreateWorkers(workers_count);
403 assert(workers_count <= workers_.size());
404 counter_to_decrement_when_ready_.Reset(workers_count);
405 for (int i = 0; i < tasks_count - 1; i++) {
406 workers_[i]->StartWork(tasks[i]);
407 }
408 // Execute the remaining workload immediately on the current thread.
409 Task* task = tasks[tasks_count - 1];
410 task->local_allocator = &main_thread_task_allocator_;
411 task->Run();
412 // Wait for the workers submitted above to finish.
413 counter_to_decrement_when_ready_.Wait();
414 // Cleanup tasks (best to do this from the same thread that allocated
415 // the memory).
416 std::for_each(tasks.begin(), tasks.end(), [](Task* task) { delete task; });
417 }
418
419 // Legacy old name of LegacyExecuteAndDestroyTasks
Execute(const std::vector<Task * > & tasks)420 void Execute(const std::vector<Task*>& tasks) {
421 LegacyExecuteAndDestroyTasks(tasks);
422 }
423
424 private:
425 // Ensures that the pool has at least the given count of workers.
426 // If any new worker has to be created, this function waits for it to
427 // be ready.
CreateWorkers(std::size_t workers_count)428 void CreateWorkers(std::size_t workers_count) {
429 if (workers_.size() >= workers_count) {
430 return;
431 }
432 counter_to_decrement_when_ready_.Reset(workers_count - workers_.size());
433 while (workers_.size() < workers_count) {
434 workers_.push_back(new Worker(&counter_to_decrement_when_ready_));
435 }
436 counter_to_decrement_when_ready_.Wait();
437 }
438
439 // copy construction disallowed
440 WorkersPool(const WorkersPool&) = delete;
441
442 // The workers in this pool. They are owned by the pool:
443 // the pool creates workers and destroys them in its destructor.
444 std::vector<Worker*> workers_;
445
446 // The BlockingCounter used to wait for the workers.
447 BlockingCounter counter_to_decrement_when_ready_;
448
449 // For N-threaded operations, we will use only N-1 worker threads
450 // while the last task will be run directly on the main thread.
451 // It will then use this main_thread_task_allocator_; having a
452 // dedicated allocator for that (separate from the base allocator_)
453 // allows to use the same code for all tasks regardless of which
454 // thread they run on.
455 Allocator main_thread_task_allocator_;
456 };
457
458 // The task we use to implement a multi-threaded Gemm: a block of the
459 // RHS has been packed by the master thread; each worker thread
460 // then has to pack a block of the LHS and accumulate the Gemm of these
461 // packed LHS and RHS blocks.
462 template <typename KernelFormat, typename InputScalar, typename OutputScalar,
463 typename BitDepthParams, MapOrder LhsOrder, MapOrder RhsOrder,
464 MapOrder ResultOrder, typename LhsOffset, typename RhsOffset,
465 typename OutputPipelineType, typename GemmContextType>
466 struct GemmWithPackedRhsTask : Task {
467 typedef PackedSideBlock<typename KernelFormat::Lhs> PackedLhs;
468 typedef PackedSideBlock<typename KernelFormat::Rhs> PackedRhs;
GemmWithPackedRhsTaskGemmWithPackedRhsTask469 GemmWithPackedRhsTask(GemmContextType* _context, const KernelBase& _kernel,
470 const MatrixMap<const InputScalar, LhsOrder>& _lhs,
471 const PackedRhs& _packed_rhs,
472 MatrixMap<OutputScalar, ResultOrder>* _result,
473 const MatrixBlockBounds& _result_block,
474 const LhsOffset& _lhs_offset,
475 const RhsOffset& _rhs_offset,
476 const BlockParams& _block_params,
477 const OutputPipelineType& _output_pipeline)
478 : context(_context),
479 kernel(_kernel),
480 lhs(_lhs),
481 packed_rhs(_packed_rhs),
482 result(*_result),
483 result_block(_result_block),
484 lhs_offset(_lhs_offset),
485 rhs_offset(_rhs_offset),
486 block_params(_block_params),
487 output_pipeline(_output_pipeline) {}
488
RunGemmWithPackedRhsTask489 void Run() override {
490 ScopedProfilingLabel label("GemmWithPackedRhsTask");
491
492 const int rows = result_block.rows;
493 const int cols = result_block.cols;
494 const int depth = lhs.cols();
495
496 PackedLhs packed_lhs(Side::Lhs, local_allocator, block_params);
497
498 PackedResult packed_result(local_allocator, block_params);
499
500 local_allocator->Commit();
501
502 for (int c = 0; c < cols; c += block_params.l2_cols) {
503 int cs = std::min(block_params.l2_cols, cols - c);
504
505 for (int r = 0; r < rows; r += block_params.l2_rows) {
506 int rs = std::min(block_params.l2_rows, rows - r);
507
508 PackLhs(&packed_lhs, lhs.block(r, 0, rs, depth));
509
510 Compute(kernel, block_params, &packed_result, packed_lhs, packed_rhs,
511 depth);
512
513 auto curr_result_block = MatrixBlockBounds(
514 result_block.start_row + r, result_block.start_col + c, rs, cs);
515 UnpackResult<KernelFormat>(
516 &result, curr_result_block, packed_result, depth,
517 packed_lhs.sums_of_each_slice(), packed_rhs.sums_of_each_slice(),
518 lhs_offset.block(curr_result_block.start_row, rs),
519 rhs_offset.block(curr_result_block.start_col, cs), output_pipeline);
520 }
521 }
522
523 local_allocator->Decommit();
524 }
525
526 const GemmContextType* context;
527 const KernelBase& kernel;
528 const MatrixMap<const InputScalar, LhsOrder> lhs;
529 const PackedRhs packed_rhs;
530 MatrixMap<OutputScalar, ResultOrder> result;
531 const MatrixBlockBounds result_block;
532 const LhsOffset& lhs_offset;
533 const RhsOffset& rhs_offset;
534 const BlockParams& block_params;
535 const OutputPipelineType& output_pipeline;
536 };
537
538 // This base class for multi-threading allows subclasses to implement their own
539 // workers_pool() method. See MultiThreadGemmContext below for an example;
540 // any other implementation of workers_pool() must return an object with the
541 // same public methods as WorkersPool.
542 class MultiThreadGemmContextBase : public SingleThreadGemmContext {
543 public:
set_max_num_threads(int n)544 void set_max_num_threads(int n) { max_num_threads_ = n; }
545
max_num_threads()546 int max_num_threads() const { return max_num_threads_; }
547
548 protected:
549 // The maximum number of worker threads to use (including
550 // the master thread).
551 // The default value 1 means single-threading. That is the default
552 // because gemmlowp's primary target is mobile hardware, where thermal
553 // constraints usually mean that it may not be realistic to use more
554 // than 1 CPU core even if multiple cores are present.
555 // The special value 0 means try to detect the number of hardware threads.
556 // Note: this assumes that all CPU cores are equivalent. That assumption
557 // is defeated on big.LITTLE ARM devices, where we have no API to query
558 // the number of big cores (which is typically what we would want to use,
559 // leaving aside above-mentioned thermal issues). That is the other reason
560 // why the best compromise here is to let max_num_threads_ default to 1,
561 // so users who want multi-threading have to make the decision of how many
562 // threads to use by themselves.
563 int max_num_threads_ = 1;
564 };
565
566 class MultiThreadGemmContext : public MultiThreadGemmContextBase {
567 public:
workers_pool()568 WorkersPool* workers_pool() { return &workers_pool_; }
569
570 private:
571 // The workers pool used by MultiThreadGemm. Making
572 // this part of the context allows it to be persistent,
573 // avoiding recreating threads on every Gemm.
574 WorkersPool workers_pool_;
575 };
576
577 // Determines how many threads should be used for a given Gemm
578 // operation.
579 template <int KernelRows>
HowManyThreads(int max_num_threads,int rows,int cols,int depth)580 inline int HowManyThreads(int max_num_threads, int rows, int cols, int depth) {
581 // Early-exit in the default case where multi-threading is disabled.
582 if (max_num_threads == 1) {
583 return 1;
584 }
585
586 // Determine the maximum number of threads.
587 int max_count = GetHardwareConcurrency(max_num_threads);
588
589 // Basic calculation: take into account max pool size, and
590 // how many rows we have to feed our kernel.
591 // The motivation for an absolute minimum number of rows per thread,
592 // potentially higher than KernelRows, is that very thin thread workload
593 // currently defeat assumptions of the AddMod generator, resulting
594 // in substantial bias in TestWithRealData on 24 threads.
595 // Ideally, the AddMod generator should be aware of global (r,c) coordinates
596 // so as to be independent of the number of threads.
597 static const int AbsoluteMinRowsPerThread = 16;
598 static const int MinRowsPerThread = KernelRows > AbsoluteMinRowsPerThread
599 ? KernelRows
600 : AbsoluteMinRowsPerThread;
601 int thread_count = std::min(max_count, CeilQuotient(rows, MinRowsPerThread));
602
603 // At this point for small products we already have thread_count==1 so
604 // we can avoid doing more work; otherwise, we still want to check
605 // that the cubic size (rows*cols*depth) is big enough to keep
606 // workers_ busy.
607 if (thread_count > 1) {
608 // Empirically determined value.
609 static const std::uint64_t min_cubic_size_per_thread = 64 * 1024;
610
611 // We can only multiply two out of three sizes without risking overflow
612 const std::uint64_t cubic_size =
613 std::uint64_t(rows) * std::uint64_t(cols) * std::uint64_t(depth);
614
615 thread_count =
616 std::min(thread_count, int(cubic_size / min_cubic_size_per_thread));
617
618 if (thread_count < 1) {
619 thread_count = 1;
620 }
621 }
622
623 assert(thread_count > 0 && thread_count <= max_count);
624 return thread_count;
625 }
626
627 // The main multi-threaded Gemm function.
628 // To understand it, first read the code of SingleThreadGemm().
629 // The parallelization scheme used here is to have this master function
630 // pack a block of RHS and then start worker threads to pack a block of LHS
631 // each, and accumulate the corresponding products.
632 template <typename KernelFormat, typename InputScalar, typename OutputScalar,
633 typename BitDepthParams, MapOrder LhsOrder, MapOrder RhsOrder,
634 MapOrder ResultOrder, typename LhsOffset, typename RhsOffset,
635 typename OutputPipelineType, typename GemmContextType>
MultiThreadGemm(GemmContextType * context,const KernelBase & kernel,const MatrixMap<const InputScalar,LhsOrder> & lhs,const MatrixMap<const InputScalar,RhsOrder> & rhs,MatrixMap<OutputScalar,ResultOrder> * result,const LhsOffset & lhs_offset,const RhsOffset & rhs_offset,const OutputPipelineType & output_pipeline)636 void MultiThreadGemm(GemmContextType* context, const KernelBase& kernel,
637 const MatrixMap<const InputScalar, LhsOrder>& lhs,
638 const MatrixMap<const InputScalar, RhsOrder>& rhs,
639 MatrixMap<OutputScalar, ResultOrder>* result,
640 const LhsOffset& lhs_offset, const RhsOffset& rhs_offset,
641 const OutputPipelineType& output_pipeline) {
642 ScopedProfilingLabel label("gemmlowp::MultiThreadGemm");
643
644 assert(lhs.cols() == rhs.rows());
645
646 int rows = result->rows();
647 int cols = result->cols();
648 int depth = lhs.cols();
649
650 // zero sizes should have been caught earlier and early-returned.
651 assert(rows > 0);
652 assert(cols > 0);
653 assert(depth > 0);
654
655 // The case of rows<cols should have been caught earlier and transposed.
656 assert(rows >= cols);
657
658 const int thread_count = HowManyThreads<KernelFormat::kRows>(
659 context->max_num_threads(), rows, cols, depth);
660 if (thread_count == 1) {
661 return SingleThreadGemm<KernelFormat, InputScalar, OutputScalar,
662 BitDepthParams>(context, kernel, lhs, rhs, result,
663 lhs_offset, rhs_offset,
664 output_pipeline);
665 }
666 assert(thread_count > 1);
667
668 // Simple 1:1 mapping of tasks to physical cores, which is very important
669 // to getting good multithreaded performance, specially for not-very-large
670 // GEMMs, and especially on Android.
671 const int task_count = thread_count;
672
673 Allocator* allocator = context->allocator();
674 auto* workers_pool = context->workers_pool();
675
676 BlockParams block_params;
677 block_params.Init<KernelFormat>(
678 rows, cols, depth, task_count, context->l1_bytes_to_use(),
679 context->l2_bytes_to_use(), context->l2_rhs_factor());
680
681 PackedSideBlock<typename KernelFormat::Rhs> packed_rhs(Side::Rhs, allocator,
682 block_params);
683 allocator->Commit();
684
685 // We loop over large blocks of the RHS.
686 for (int c = 0; c < cols; c += block_params.l2_cols) {
687 int cs = std::min(block_params.l2_cols, cols - c);
688
689 // Pack a large block of the RHS.
690 PackRhs(&packed_rhs, rhs.block(0, c, depth, cs));
691
692 // Give work to each worker.
693 std::vector<Task*> tasks;
694 int next_start_row = 0;
695 for (int n = 0; n < task_count; ++n) {
696 int start_row = next_start_row;
697 next_start_row = std::min(
698 rows, RoundUp<KernelFormat::kRows>(rows * (n + 1) / task_count));
699
700 int block_rows = next_start_row - start_row;
701 auto lhs_block = lhs.block(start_row, 0, block_rows, depth);
702 typedef GemmWithPackedRhsTask<KernelFormat, InputScalar, OutputScalar,
703 BitDepthParams, LhsOrder, RhsOrder,
704 ResultOrder, LhsOffset, RhsOffset,
705 OutputPipelineType, GemmContextType>
706 TaskType;
707 tasks.push_back(
708 new TaskType(context, kernel, lhs_block, packed_rhs, result,
709 MatrixBlockBounds(start_row, c, block_rows, cs),
710 lhs_offset, rhs_offset, block_params, output_pipeline));
711 }
712 // Execute the work on the workers (and partially on this thread).
713 workers_pool->Execute(tasks);
714 }
715
716 allocator->Decommit();
717 }
718
719 } // namespace gemmlowp
720
721 #endif // GEMMLOWP_INTERNAL_MULTI_THREAD_GEMM_H_
722