1 /* Copyright 2019 Google LLC. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 // The 'middle-end' in ruy. See TrMul function comment.
17
18 #include "ruy/trmul.h"
19
20 #include <algorithm>
21 #include <atomic>
22 #include <cstdint>
23 #include <cstring>
24 #include <memory>
25 #include <vector>
26
27 #include "ruy/allocator.h"
28 #include "ruy/block_map.h"
29 #include "ruy/check_macros.h"
30 #include "ruy/cpu_cache_params.h"
31 #include "ruy/cpuinfo.h"
32 #include "ruy/ctx.h"
33 #include "ruy/denormal.h"
34 #include "ruy/mat.h"
35 #include "ruy/matrix.h"
36 #include "ruy/mul_params.h"
37 #include "ruy/opt_set.h"
38 #include "ruy/profiler/instrumentation.h"
39 #include "ruy/side_pair.h"
40 #include "ruy/size_util.h"
41 #include "ruy/thread_pool.h"
42 #include "ruy/trace.h"
43 #include "ruy/tune.h"
44
45 namespace ruy {
46
47 namespace {
48
49 // Enum to track the packingstatus of a block of the LHS or RHS matrix.
50 enum class PackingStatus : std::uint8_t {
51 kNotStarted, // No thread has started packing this block yet.
52 kInProgress, // Some thread is currently packing this block.
53 kFinished // This block has already been packed.
54 };
55
56 // TrMulTask is the task that a ruy thread runs to perform the TrMul operation.
57 class TrMulTask final : public Task {
58 public:
TrMulTask(TrMulParams * params,const BlockMap & block_map,std::atomic<int> * atomic_block_id,int thread_id,bool need_atomics,SidePair<std::atomic<PackingStatus> * > packing_status,TuningResolver * tuning_resolver,Allocator * local_allocator,CpuInfo * cpuinfo)59 TrMulTask(TrMulParams* params, const BlockMap& block_map,
60 std::atomic<int>* atomic_block_id, int thread_id, bool need_atomics,
61 SidePair<std::atomic<PackingStatus>*> packing_status,
62 TuningResolver* tuning_resolver, Allocator* local_allocator,
63 CpuInfo* cpuinfo)
64 : params_(params),
65 block_map_(block_map),
66 atomic_block_id_(atomic_block_id),
67 thread_id_(thread_id),
68 need_atomics_(need_atomics),
69 packing_status_(packing_status),
70 tuning_resolver_(tuning_resolver),
71 local_allocator_(local_allocator),
72 local_already_packed_{nullptr, nullptr},
73 cpuinfo_(cpuinfo) {}
74
75 // Thread main function. This is one thread's share of the TrMul work.
Run()76 void Run() override {
77 RUY_TRACE_SCOPE_NAME("TrMulTask::Run");
78 RUY_TRACE_SET_THEAD_ID(thread_id_);
79 // Allocate and initialize `local_packed`.
80 for (Side side : {Side::kLhs, Side::kRhs}) {
81 if (!params_->is_prepacked[side]) {
82 const int size = NumBlocksPerSide(side, block_map_);
83 local_allocator_->Allocate(size, &local_already_packed_[side]);
84 memset(local_already_packed_[side], 0, size * sizeof(bool));
85 }
86 }
87
88 const Tuning tuning = tuning_resolver_->Resolve(cpuinfo_);
89 const int num_blocks = NumBlocks(block_map_);
90
91 // Each thread starts by initially reserving the block whose id
92 // is the thread id.
93 int block_id = thread_id_;
94 // Loop until all blocks have been computed.
95 while (block_id < num_blocks) {
96 RUY_TRACE_SCOPE_NAME("Main loop iteration");
97 // Reserve the next block to handle, hiding the latency of this atomic op.
98 const int next_block_id =
99 atomic_block_id_->fetch_add(1, std::memory_order_relaxed);
100 // Get coordinates of the current block to handle, in "block space".
101 SidePair<int> block;
102 GetBlockByIndex(block_map_, block_id, &block);
103 // Get coordinates of the current block to handle, in matrix space.
104 SidePair<int> start, end;
105 GetBlockMatrixCoords(block_map_, block, &start, &end);
106 RUY_TRACE_INFO(TRMUL_TASK_MAIN_LOOP_GOT_BLOCK_COORDS);
107 // Maybe pack the current LHS/RHS block, if not already packed.
108 EnsurePacked(block, start, end, tuning);
109 // Actually do matrix multiplication work
110 params_->RunKernel(tuning, start, end);
111 // Move on to the next block as obtained by the atomic increment
112 // at the start of this while loop iteration.
113 block_id = next_block_id;
114 }
115
116 local_allocator_->FreeAll();
117 }
118
119 private:
120 // Tries to pack a block, without blocking.
121 // If the block was already packed, returns true.
122 // If the block was not started packing, packs it and returns true.
123 // If the block was being packed by another thread, returns false.
TryPack(Side side,int block,int start,int end,Tuning tuning)124 bool TryPack(Side side, int block, int start, int end, Tuning tuning) {
125 if (params_->is_prepacked[side]) {
126 return true;
127 }
128 if (!local_already_packed_[side][block]) {
129 if (need_atomics_) {
130 // Explanation of this compare_exchange_strong operation:
131 // This atomically performs all of the following:
132 // 1. Read `status` with "acquire" memory order.
133 // * That this read uses "acquire" is because both memory orders
134 // specified have "acquire" as their read-component.
135 // 2. Compare (bitwise) with `exchanged_status`.
136 // 3. If equal, stores the value kInProgress to `status` with "release"
137 // memory order, and returns true, so we take this 'if' branch.
138 // * That this store uses "release" is because of the _rel part in
139 // memory_order_acq_rel passed as the first memory order argument.
140 // 4. If not equal, stores the loaded value of `status` to
141 // `exchanged_status` with "relaxed" semantics, and returns false,
142 // so we take the 'else' branch.
143 // * That this store uses "relaxed" is because the second memory
144 // order argument, memory_order_acquire, implies no particular
145 // store semantics. "relaxed" is acceptable here because this
146 // stores to a local stack variable.
147 //
148 // Rationale for compare_exchange_strong as opposed to
149 // compare_exchange_weak:
150 // The spurious-failure case with compare_exchange_weak will actually
151 // happen a lot here, because the atomic 'status' bytes are stored
152 // contiguously in arrays and neighboring values will be accessed
153 // by multiple threads concurrently. On a typical ARM CPU, an exclusives
154 // reservation granule is 64 bytes, so a lot of false-sharing may
155 // happen. Using compare_exchange_weak would thus result in often having
156 // TryPack return 'false' when it could instead have done the packing
157 // work and returned 'true'. Heuristically, that is not a good thing.
158 // Moreover, this changes the TryPack contract, loosening it and making
159 // it harder for the caller to reason about. Finally, the overhead of
160 // atomic operations is mitigated by the enclosing check on
161 // local_already_packed, so maybe the overhead of
162 // compare_exchange_strong isn't such a problem. But we don't really
163 // know for sure, that would be interesting to experiment more with.
164 PackingStatus exchanged_status = PackingStatus::kNotStarted;
165 std::atomic<PackingStatus>& status = packing_status_[side][block];
166 if (status.compare_exchange_strong(
167 exchanged_status, PackingStatus::kInProgress,
168 std::memory_order_acq_rel, std::memory_order_acquire)) {
169 // In this branch, the status was kNotStarted and we just atomically
170 // changed it to kInProgress as we are about to handle the packing
171 // ourselves.
172 RUY_TRACE_INFO(TRYPACK_PACKING);
173 params_->RunPack(side, tuning, start, end);
174 status.store(PackingStatus::kFinished, std::memory_order_release);
175 } else if (exchanged_status == PackingStatus::kInProgress) {
176 // Another thread is currently packing this block.
177 RUY_TRACE_INFO(TRYPACK_ANOTHER_THREAD_PACKING);
178 return false;
179 } else {
180 RUY_TRACE_INFO(TRYPACK_PACKED_BY_ANOTHER_THREAD);
181 }
182 RUY_DCHECK(status.load(std::memory_order_acquire) ==
183 PackingStatus::kFinished);
184 } else {
185 // Single-threaded case: no need for expensive atomics,
186 // local_already_packed is the truth already.
187 params_->RunPack(side, tuning, start, end);
188 }
189 local_already_packed_[side][block] = true;
190 } else {
191 RUY_TRACE_INFO(TRYPACK_PREVIOUSLY_PACKED);
192 }
193 return true;
194 }
195
196 // Ensures that both the LHS and RHS blocks required by the specified block
197 // are packed. In the event that they are already being packed on another
198 // threads, this function may perform the packing of some other block while
199 // waiting for that other thread to finish packing the requested block.
EnsurePacked(const SidePair<int> & block,const SidePair<int> & start,const SidePair<int> & end,Tuning tuning)200 void EnsurePacked(const SidePair<int>& block, const SidePair<int>& start,
201 const SidePair<int>& end, Tuning tuning) {
202 #if RUY_OPT(PACK_AHEAD)
203 SidePair<int> next_runahead_block{block[Side::kLhs] + 1,
204 block[Side::kRhs] + 1};
205 Side next_runahead_side = Side::kLhs;
206 #endif
207 while (true) {
208 bool both_sides_packed = true;
209 for (Side side : {Side::kLhs, Side::kRhs}) {
210 both_sides_packed &=
211 TryPack(side, block[side], start[side], end[side], tuning);
212 }
213 if (both_sides_packed) {
214 break;
215 }
216 #if RUY_OPT(PACK_AHEAD)
217 RUY_TRACE_INFO(ENSURE_PACKED_ENTER_RUN_AHEAD);
218 const Side runahead_side = next_runahead_side;
219 const int runahead_block = next_runahead_block[runahead_side];
220 next_runahead_side = OtherSide(next_runahead_side);
221 if (runahead_block >= NumBlocksPerSide(runahead_side, block_map_)) {
222 continue;
223 }
224 int runahead_block_start, runahead_block_end;
225 GetBlockMatrixCoords(runahead_side, block_map_, runahead_block,
226 &runahead_block_start, &runahead_block_end);
227 TryPack(runahead_side, runahead_block, runahead_block_start,
228 runahead_block_end, tuning);
229 next_runahead_block[runahead_side] = runahead_block + 1;
230 #endif
231 }
232 RUY_TRACE_INFO(ENSURE_PACKED_END);
233 }
234
235 TrMulParams* params_;
236 const BlockMap& block_map_;
237 std::atomic<int>* atomic_block_id_;
238 int thread_id_;
239 bool need_atomics_;
240 SidePair<std::atomic<PackingStatus>*> packing_status_;
241 TuningResolver* tuning_resolver_;
242 Allocator* local_allocator_;
243
244 // Local indicators of packedness to avoid the overhead of atomic ops.
245 SidePair<bool*> local_already_packed_;
246
247 CpuInfo* cpuinfo_;
248 };
249
GetTentativeThreadCount(Ctx * ctx,int rows,int cols,int depth)250 int GetTentativeThreadCount(Ctx* ctx, int rows, int cols, int depth) {
251 #if RUY_PLATFORM_EMSCRIPTEN
252 // b/139927184, std::thread constructor raises exception
253 return 1;
254 #endif
255 RUY_TRACE_SCOPE;
256 // Empirically determined rule for reasonable number of
257 // threads to use. This is proportional to the number of arithmetic ops
258 // in this Mul (product of the 3 sizes).
259 static constexpr int kDivisorLog2 = 15;
260 const int guess_log2 = std::max(
261 0, ceil_log2(rows) + ceil_log2(cols) + ceil_log2(depth) - kDivisorLog2);
262 int tentative_thread_count =
263 std::min(1 << guess_log2, ctx->max_num_threads());
264 RUY_TRACE_INFO(GET_TENTATIVE_THREAD_COUNT);
265 return tentative_thread_count;
266 }
267
GetUseSimpleLoop(int tentative_thread_count,int rows,int cols,int depth,int lhs_scalar_size,int rhs_scalar_size,const CpuCacheParams & cpu_cache_params)268 bool GetUseSimpleLoop(int tentative_thread_count, int rows, int cols, int depth,
269 int lhs_scalar_size, int rhs_scalar_size,
270 const CpuCacheParams& cpu_cache_params) {
271 RUY_TRACE_SCOPE;
272 if (tentative_thread_count == 1) {
273 if (IsObviouslyLinearTraversal(rows, cols, depth, lhs_scalar_size,
274 rhs_scalar_size, cpu_cache_params)) {
275 RUY_TRACE_INFO(GET_USE_SIMPLE_LOOP_RETURNS_TRUE);
276 return true;
277 }
278 }
279 RUY_TRACE_INFO(GET_USE_SIMPLE_LOOP_RETURNS_FALSE);
280 return false;
281 }
282
283 } // namespace
284
285 // TrMul is the ruy middle-end. It contains the high-level logic to perform
286 // a ruy::Mul's work, down to calls to back-end Kernel and Pack functions.
287 // This includes determining how many threads to use, computing the BlockMap,
288 // executing tasks on a thread-pool. The TrMul function itself runs on the main
289 // thread, the code that is potentially running on worker threads is in
290 // TrMulTask::Run().
TrMul(Ctx * ctx,TrMulParams * params)291 void TrMul(Ctx* ctx, TrMulParams* params) {
292 RUY_TRACE_SCOPE;
293 profiler::ScopeLabel label(
294 "TrMul (Path=0x%x, max_num_threads=%d, is_prepacked=(%d,%d))",
295 static_cast<int>(params->path), ctx->max_num_threads(),
296 params->is_prepacked[Side::kLhs], params->is_prepacked[Side::kRhs]);
297
298 PEMat& packed_lhs = params->packed_matrix[Side::kLhs];
299 PEMat& packed_rhs = params->packed_matrix[Side::kRhs];
300 EMat& lhs = params->src[Side::kLhs];
301 EMat& rhs = params->src[Side::kRhs];
302
303 const int rows = lhs.layout.cols;
304 const int cols = rhs.layout.cols;
305 const int depth = lhs.layout.rows;
306
307 const int tentative_thread_count =
308 GetTentativeThreadCount(ctx, rows, cols, depth);
309 const auto& cpu_cache_params = ctx->mutable_cpuinfo()->CacheParams();
310
311 // Suppress denormals to avoid computation inefficiency.
312 // Note this only handles the denormal suppression on the main thread. As for
313 // worker threads, the suppression is handled in each thread's main loop. See
314 // the corresponding code in thread_pool.cc for details.
315 ScopedSuppressDenormals suppress_denormals;
316
317 // Case of running this TrMul as a simple loop.
318 // This is a good place to start reading this function: all the rest
319 // of this function is just an optimized, but functionally equivalent,
320 // version of that.
321 if (GetUseSimpleLoop(tentative_thread_count, rows, cols, depth,
322 lhs.data_type.size, rhs.data_type.size,
323 cpu_cache_params)) {
324 profiler::ScopeLabel label_simple("TrMulImpl, simple loop");
325 Tuning tuning = ctx->GetMainThreadTuning();
326 RUY_TRACE_INFO(TRMUL_SIMPLE_LOOP);
327
328 const SidePair<int> origin{0, 0};
329 const SidePair<int> rounded_dims{packed_lhs.layout.cols,
330 packed_rhs.layout.cols};
331 for (Side side : {Side::kLhs, Side::kRhs}) {
332 if (!params->is_prepacked[side]) {
333 params->RunPack(side, tuning, origin[side], rounded_dims[side]);
334 }
335 }
336 params->RunKernel(tuning, origin, rounded_dims);
337 return;
338 }
339
340 profiler::ScopeLabel label_general("TrMulImpl, general case");
341 RUY_TRACE_INFO(TRMUL_GENERAL_CASE);
342 Allocator* main_allocator = ctx->GetMainAllocator();
343
344 // Initialize block map.
345 BlockMap block_map;
346 MakeBlockMap(packed_lhs.layout.cols, packed_rhs.layout.cols, depth,
347 packed_lhs.layout.kernel.cols, packed_rhs.layout.kernel.cols,
348 packed_lhs.data_type.size, packed_rhs.data_type.size,
349 tentative_thread_count, cpu_cache_params, &block_map);
350
351 // Initialize per-thread state.
352 const int thread_count = block_map.thread_count;
353 const bool need_atomics = thread_count > 1;
354 ctx->EnsureThreadSpecificResources(thread_count);
355 for (int i = 0; i < thread_count; i++) {
356 ctx->GetThreadSpecificTuningResolver(i)->SetTuning(ctx->explicit_tuning());
357 }
358
359 // In the need_atomics case, allocate and initialize atomic values tracking
360 // the packing status of blocks.
361 SidePair<std::atomic<PackingStatus>*> packing_status{nullptr, nullptr};
362 if (need_atomics) {
363 for (Side side : {Side::kLhs, Side::kRhs}) {
364 if (!params->is_prepacked[side]) {
365 const int size = NumBlocksPerSide(side, block_map);
366 main_allocator->Allocate(size, &packing_status[side]);
367 for (int i = 0; i < size; i++) {
368 packing_status[side][i].store(PackingStatus::kNotStarted,
369 std::memory_order_relaxed);
370 }
371 }
372 }
373 }
374
375 // Create the atomic block id, allocate it using Allocator so that
376 // we get the alignment ensuring that it sits alone in its exclusives
377 // reservation granule.
378 std::atomic<int>* atomic_block_id;
379 main_allocator->Allocate(1, &atomic_block_id);
380
381 // Create task objects.
382 TrMulTask* tasks;
383 main_allocator->Allocate(thread_count, &tasks);
384
385 atomic_block_id->store(thread_count);
386
387 for (int i = 0; i < thread_count; i++) {
388 auto* allocator = ctx->GetThreadSpecificAllocator(i);
389 auto* tuning_resolver = ctx->GetThreadSpecificTuningResolver(i);
390 new (tasks + i) TrMulTask(params, block_map, atomic_block_id, i,
391 need_atomics, packing_status, tuning_resolver,
392 allocator, ctx->mutable_cpuinfo());
393 }
394
395 // Do the computation.
396 ctx->mutable_thread_pool()->Execute(thread_count, tasks);
397
398 // Finish up.
399 for (int i = 0; i < thread_count; i++) {
400 tasks[i].~TrMulTask();
401 }
402 }
403
404 } // namespace ruy
405