1 /**
2 * Copyright 2021-2024 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "runtime/graph_scheduler/actor/memory_manager_actor.h"
18 #include "runtime/graph_scheduler/actor/data_source_actor.h"
19 #include "runtime/graph_scheduler/actor/kernel_actor.h"
20 #include "include/backend/mem_reuse/mem_tracker.h"
21 #include "mindrt/include/async/async.h"
22 #include "utils/log_adapter.h"
23
24 namespace mindspore {
25 namespace runtime {
26 namespace {
OnMemoryAllocFinish(const AID & from_aid,OpContext<DeviceTensor> * const op_context)27 void OnMemoryAllocFinish(const AID &from_aid, OpContext<DeviceTensor> *const op_context) {
28 if (!ActorDispatcher::is_memory_allocation_sync()) {
29 ActorDispatcher::Send(from_aid, &MemoryAwareActor::OnMemoryAllocFinish, op_context);
30 }
31 }
32 } // namespace
33
AllocateMemory(const std::vector<DeviceTensor * > * alloc_list,const DeviceContext * device_context,OpContext<DeviceTensor> * const op_context,const AID & from_aid)34 void MemoryManagerActor::AllocateMemory(const std::vector<DeviceTensor *> *alloc_list,
35 const DeviceContext *device_context, OpContext<DeviceTensor> *const op_context,
36 const AID &from_aid) {
37 for (auto &device_tensor : *alloc_list) {
38 MS_EXCEPTION_IF_NULL(device_tensor);
39 // Unused device address need skip to reduce memory use.
40 if (device_tensor->IsNotNeedAlloc()) {
41 continue;
42 }
43
44 if (device::tracker::MemTrackerManager::GetInstance().IsEnabled()) {
45 device::tracker::CALL_MEMORY_TRACKER_WITH_FILE(AddMemInfo, from_aid.Name(), device::tracker::MemType::kKernel,
46 device_tensor->GetSize(), device_tensor);
47 }
48
49 try {
50 // Allocate memory through the device context.
51 device::DynamicMemAllocatorDebugInfo::SetDebugInfo(from_aid.Name(), device::AllocatorType::kKernelOutput);
52 if (!device_context->device_res_manager_->AllocateMemory(device_tensor, kDefaultStreamIndex)) {
53 SetOpContextMemoryAllocFail(from_aid.Name(), device_context, device_tensor->GetSize(), op_context);
54 return;
55 }
56 } catch (const std::exception &e) {
57 SetOpContextMemoryAllocFail(from_aid.Name(), device_context, device_tensor->GetSize(), op_context);
58 return;
59 }
60
61 if (common::IsNeedProfileMemory()) {
62 auto output_address = reinterpret_cast<std::uintptr_t>(device_tensor);
63 MS_LOG(WARNING) << "Need Profile Memory, alloc type: MemoryManagerActor, device address class ptr: "
64 << output_address << ", device address size: " << device_tensor->GetSize()
65 << ", device address addr: " << device_tensor->GetPtr();
66 }
67 }
68 }
69
AllocateContinuousMemory(const std::vector<std::vector<DeviceTensorPtr>> * alloc_list_list,const std::vector<std::vector<size_t>> * size_list_list,const std::vector<uint32_t> * stream_id_list,const std::vector<size_t> * total_size_list,const std::vector<const DeviceContext * > * device_contexts,OpContext<DeviceTensor> * const op_context,const AID & from_aid)70 void MemoryManagerActor::AllocateContinuousMemory(const std::vector<std::vector<DeviceTensorPtr>> *alloc_list_list,
71 const std::vector<std::vector<size_t>> *size_list_list,
72 const std::vector<uint32_t> *stream_id_list,
73 const std::vector<size_t> *total_size_list,
74 const std::vector<const DeviceContext *> *device_contexts,
75 OpContext<DeviceTensor> *const op_context, const AID &from_aid) {
76 uint64_t start_time = 0;
77 PROFILER_START(start_time);
78
79 MS_EXCEPTION_IF_NULL(alloc_list_list);
80 MS_EXCEPTION_IF_NULL(size_list_list);
81 MS_EXCEPTION_IF_NULL(total_size_list);
82 MS_EXCEPTION_IF_NULL(device_contexts);
83 MS_EXCEPTION_IF_NULL(op_context);
84 if (((*alloc_list_list).size() != (*size_list_list).size()) ||
85 ((*size_list_list).size() != (*stream_id_list).size()) ||
86 ((*stream_id_list).size() != (*total_size_list).size()) ||
87 ((*total_size_list).size() != (*device_contexts).size())) {
88 SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*op_context),
89 "The size of alloc_list_list, size_list_list, stream_id_list, total_size_list "
90 "and device_contexts are not equal.");
91 }
92
93 device::tracker::CALL_MEMORY_TRACKER_WITH_FILE(AddTask, from_aid.Name(), "ContinuousMemory", "");
94 for (size_t i = 0; i < (*alloc_list_list).size(); ++i) {
95 auto &alloc_list = (*alloc_list_list)[i];
96 auto &size_list = (*size_list_list)[i];
97 auto stream_id = (*stream_id_list)[i];
98 auto &device_context = (*device_contexts)[i];
99 MS_EXCEPTION_IF_NULL(device_context);
100 // If the address of continuous tensor has already been allocated, skip the tensor.
101 if (alloc_list[0]->GetPtr() != nullptr) {
102 MS_LOG(WARNING) << "The continuous memory has already been allocated of actor: " << from_aid.Name()
103 << " with index: " << i;
104 continue;
105 }
106 // Allocate memory through the device context.
107 device::DynamicMemAllocatorDebugInfo::SetDebugInfo(from_aid.Name(), device::AllocatorType::kKernelOutput);
108 auto dev_ptr_list = device_context->device_res_manager_->AllocateContinuousMemory(size_list, stream_id);
109 if (dev_ptr_list.empty() || dev_ptr_list.size() != alloc_list.size()) {
110 MS_LOG(ERROR) << "Allocate continuous memory failed, device ptr list size: " << dev_ptr_list.size()
111 << ", address list size:" << alloc_list.size();
112 auto &total_size = (*total_size_list)[i];
113 SetOpContextMemoryAllocFail(from_aid.Name(), device_context, total_size, op_context);
114 return;
115 }
116
117 for (size_t index = 0; index < alloc_list.size(); index++) {
118 MS_EXCEPTION_IF_NULL(alloc_list[index]);
119 if (alloc_list[index]->GetPtr() != nullptr) {
120 auto old_dev_addr = alloc_list[index];
121 MS_EXCEPTION_IF_NULL(old_dev_addr);
122 auto old_size = old_dev_addr->GetSize();
123 if (old_size > size_list[index]) {
124 MS_LOG(EXCEPTION) << "Device size of old device address is larger than new device address, " << old_size
125 << " vs " << size_list[index];
126 }
127
128 auto kernel_tensor = std::make_shared<kernel::KernelTensor>(
129 dev_ptr_list[index], old_size, kernel::GetFormatFromStrToEnum(old_dev_addr->format()),
130 old_dev_addr->type_id(), old_dev_addr->host_shape(), device_context->device_context_key().device_name_,
131 device_context->device_context_key().device_id_);
132 kernel_tensor->set_stream_id(old_dev_addr->stream_id());
133 auto new_dev_addr = device_context->device_res_manager_->CreateDeviceAddress(kernel_tensor);
134 MS_LOG(DEBUG) << "Create device tensor:" << new_dev_addr << " type:" << new_dev_addr->type_id();
135 (void)new_dev_addr->SyncDeviceToDevice(old_dev_addr.get());
136 device_context->device_res_manager_->FreeMemory(old_dev_addr.get());
137 }
138 device::tracker::CALL_MEMORY_TRACKER_WITH_FILE(AddMemInfo, from_aid.Name(),
139 device::tracker::MemType::kContinuousMemory,
140 alloc_list[index]->GetSize(), alloc_list[index].get());
141 device::tracker::CALL_MEMORY_TRACKER_WITH_FILE(BindDevicePtr, alloc_list[index].get(), dev_ptr_list[index]);
142 alloc_list[index]->set_ptr(dev_ptr_list[index]);
143 alloc_list[index]->SetSize(size_list[index]);
144 alloc_list[index]->set_from_mem_pool(true);
145 }
146 }
147
148 // Call back to the from actor to process after memory allocation finished.
149 OnMemoryAllocFinish(from_aid, op_context);
150
151 PROFILER_END(start_time, ProfilerModule::kRuntime, ProfilerEvent::kMemoryAlloc, from_aid.Name(), false);
152 }
153
AllocateBatchMemory(const std::vector<DeviceTensor * > * alloc_list,const std::vector<const DeviceContext * > * device_contexts,OpContext<DeviceTensor> * const op_context,const AID & from_aid)154 void MemoryManagerActor::AllocateBatchMemory(const std::vector<DeviceTensor *> *alloc_list,
155 const std::vector<const DeviceContext *> *device_contexts,
156 OpContext<DeviceTensor> *const op_context, const AID &from_aid) {
157 uint64_t start_time = 0;
158 PROFILER_START(start_time);
159
160 MS_EXCEPTION_IF_NULL(alloc_list);
161 MS_EXCEPTION_IF_NULL(device_contexts);
162 MS_EXCEPTION_IF_NULL(op_context);
163 if ((*alloc_list).size() != (*device_contexts).size()) {
164 SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*op_context),
165 "The size of alloc list is not equal to the size of device contexts.");
166 }
167
168 for (size_t i = 0; i < (*alloc_list).size(); ++i) {
169 auto &device_tensor = (*alloc_list)[i];
170 auto &device_context = (*device_contexts)[i];
171 MS_EXCEPTION_IF_NULL(device_tensor);
172 MS_EXCEPTION_IF_NULL(device_context);
173 // Unused device address need skip to reduce memory use.
174 if (device_tensor->IsNotNeedAlloc()) {
175 continue;
176 }
177
178 try {
179 // Allocate memory through the device context.
180 device::DynamicMemAllocatorDebugInfo::SetDebugInfo(from_aid.Name(), device::AllocatorType::kKernelOutput);
181 device::tracker::CALL_MEMORY_TRACKER_WITH_FILE(AddTask, from_aid.Name(), "BatchMemory", "");
182 device::tracker::CALL_MEMORY_TRACKER_WITH_FILE(
183 AddMemInfo, from_aid.Name(), device::tracker::MemType::kBatchMemory, device_tensor->GetSize(), device_tensor);
184 if (!device_context->device_res_manager_->AllocateMemory(device_tensor, kDefaultStreamIndex)) {
185 SetOpContextMemoryAllocFail(from_aid.Name(), device_context, device_tensor->GetSize(), op_context);
186 return;
187 }
188 } catch (const std::exception &e) {
189 SetOpContextMemoryAllocFail(from_aid.Name(), device_context, device_tensor->GetSize(), op_context);
190 return;
191 }
192 }
193
194 // Call back to the from actor to process after memory allocation finished.
195 OnMemoryAllocFinish(from_aid, op_context);
196
197 PROFILER_END(start_time, ProfilerModule::kRuntime, ProfilerEvent::kMemoryAlloc, from_aid.Name(), false);
198 }
199
AllocateSomasMemory(SomasInfo * const somas_info,const DeviceContext * device_context,OpContext<DeviceTensor> * const op_context,const AID & from_aid)200 void MemoryManagerActor::AllocateSomasMemory(SomasInfo *const somas_info, const DeviceContext *device_context,
201 OpContext<DeviceTensor> *const op_context, const AID &from_aid) {
202 uint64_t start_time = 0;
203 PROFILER_START(start_time);
204
205 MS_EXCEPTION_IF_NULL(somas_info);
206 MS_EXCEPTION_IF_NULL(device_context);
207 MS_EXCEPTION_IF_NULL(device_context->device_res_manager_);
208 MS_EXCEPTION_IF_NULL(op_context);
209
210 device::tracker::CALL_MEMORY_TRACKER_WITH_FILE(AddTask, from_aid.Name(), "SomasMemory",
211 "kernel_graph_" + std::to_string(somas_info->graph_id_));
212
213 // Allocate the whole block memory.
214 if (somas_info->base_address_ != nullptr) {
215 std::string error_info = from_aid.Name() + " already has the base somas address.";
216 SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*op_context), error_info);
217 }
218 try {
219 device::DynamicMemAllocatorDebugInfo::SetDebugInfo(from_aid.Name(), device::AllocatorType::kKernelOutput);
220 auto device_ptr = device_context->device_res_manager_->AllocateMemory(somas_info->whole_block_size_);
221 if (device_ptr == nullptr) {
222 MS_LOG(INFO) << from_aid.Name()
223 << " allocate somas whole block memory failed, alloc size: " << somas_info->whole_block_size_
224 << ". Try to allocate the merged blocks memory.";
225 } else {
226 device::tracker::CALL_MEMORY_TRACKER_WITH_FILE(AddCompileTimeMemInfo, from_aid.Name(),
227 somas_info->whole_block_size_, device_ptr,
228 device::tracker::MemType::kSomas);
229 somas_info->base_address_ = device_ptr;
230 PROFILER_END(start_time, ProfilerModule::kRuntime, ProfilerEvent::kMemoryAlloc, from_aid.Name(), false);
231 return;
232 }
233 } catch (const std::exception &e) {
234 SetOpContextMemoryAllocFail(from_aid.Name(), device_context, somas_info->whole_block_size_, op_context);
235 return;
236 }
237
238 // Allocate the merged blocks memory.
239 try {
240 auto &merged_base_addresses = somas_info->merged_base_addresses_;
241 for (auto &megred_block : somas_info->merged_blocks_map_) {
242 size_t block_offset = megred_block.first;
243 size_t block_size = megred_block.second;
244 if ((merged_base_addresses.count(block_offset) > 0) && (merged_base_addresses[block_offset] != nullptr)) {
245 std::string error_info = from_aid.Name() + " already has the base somas address.";
246 SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*op_context), error_info);
247 }
248 device::DynamicMemAllocatorDebugInfo::SetDebugInfo(from_aid.Name(), device::AllocatorType::kKernelOutput);
249 auto device_ptr = device_context->device_res_manager_->AllocateMemory(block_size);
250 if (device_ptr == nullptr) {
251 SetOpContextMemoryAllocFail(from_aid.Name(), device_context, block_size, op_context);
252 return;
253 }
254 device::tracker::CALL_MEMORY_TRACKER_WITH_FILE(AddCompileTimeMemInfo, from_aid.Name(), block_size, device_ptr,
255 device::tracker::MemType::kSomas);
256 merged_base_addresses[block_offset] = device_ptr;
257 }
258 } catch (const std::exception &e) {
259 SetOpContextMemoryAllocFail(from_aid.Name(), device_context, somas_info->whole_block_size_, op_context);
260 return;
261 }
262 MS_LOG(INFO) << from_aid.Name() << " allocate somas merged blocks memory succeeded and continue running.";
263
264 // Call back to the from actor to process after memory allocation finished.
265 OnMemoryAllocFinish(from_aid, op_context);
266
267 PROFILER_END(start_time, ProfilerModule::kRuntime, ProfilerEvent::kMemoryAlloc, from_aid.Name(), false);
268 }
269
FreeMemory(const std::vector<DeviceTensor * > * free_list,const DeviceContext * device_context,OpContext<DeviceTensor> *,const AID & from_aid)270 void MemoryManagerActor::FreeMemory(const std::vector<DeviceTensor *> *free_list, const DeviceContext *device_context,
271 OpContext<DeviceTensor> *, const AID &from_aid) {
272 for (auto &device_tensor : *free_list) {
273 FreeMemoryByRefCount(device_tensor, device_context, from_aid.Name());
274 }
275 }
276
FreeBatchMemory(const std::vector<DeviceTensor * > * free_list,const std::vector<const DeviceContext * > * device_contexts,OpContext<DeviceTensor> * const op_context,const AID & from_aid)277 void MemoryManagerActor::FreeBatchMemory(const std::vector<DeviceTensor *> *free_list,
278 const std::vector<const DeviceContext *> *device_contexts,
279 OpContext<DeviceTensor> *const op_context, const AID &from_aid) {
280 uint64_t start_time = 0;
281 PROFILER_START(start_time);
282
283 MS_EXCEPTION_IF_NULL(free_list);
284 MS_EXCEPTION_IF_NULL(device_contexts);
285 MS_EXCEPTION_IF_NULL(op_context);
286 if ((*free_list).size() != (*device_contexts).size()) {
287 SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*op_context),
288 "The size of free list is not equal to the size of device contexts.");
289 }
290
291 for (size_t i = 0; i < (*free_list).size(); ++i) {
292 auto &device_tensor = (*free_list)[i];
293 auto &device_context = (*device_contexts)[i];
294 FreeMemoryByRefCount(device_tensor, device_context, from_aid.Name());
295 }
296
297 PROFILER_END(start_time, ProfilerModule::kRuntime, ProfilerEvent::kMemoryFree, from_aid.Name(), false);
298 }
299
FreeSomasMemory(SomasInfo * const somas_info,const DeviceContext * device_context,OpContext<DeviceTensor> * const op_context,const AID & from_aid)300 void MemoryManagerActor::FreeSomasMemory(SomasInfo *const somas_info, const DeviceContext *device_context,
301 OpContext<DeviceTensor> *const op_context, const AID &from_aid) {
302 uint64_t start_time = 0;
303 PROFILER_START(start_time);
304
305 MS_EXCEPTION_IF_NULL(somas_info);
306 MS_EXCEPTION_IF_NULL(device_context);
307 MS_EXCEPTION_IF_NULL(device_context->device_res_manager_);
308 MS_EXCEPTION_IF_NULL(op_context);
309
310 std::vector<void *> keep_addrs;
311 for (auto &output_address : somas_info->graph_output_device_addresses_) {
312 MS_EXCEPTION_IF_NULL(output_address);
313 MS_LOG(DEBUG) << "Keep address:" << output_address << " ptr:" << output_address->GetPtr()
314 << " size:" << output_address->GetSize() << " for actor:" << from_aid;
315 (void)keep_addrs.emplace_back(output_address->GetMutablePtr());
316 }
317
318 device::DynamicMemAllocatorDebugInfo::SetDebugInfo(from_aid.Name(), device::AllocatorType::kGraphOutput);
319 // Free the whole block memory.
320 if (somas_info->base_address_ != nullptr) {
321 device_context->device_res_manager_->FreePartMemorys({somas_info->base_address_}, keep_addrs,
322 somas_info->graph_output_address_sizes_);
323 somas_info->base_address_ = nullptr;
324
325 for (auto &merged_base_address : somas_info->merged_base_addresses_) {
326 if (merged_base_address.second != nullptr) {
327 std::string error_info = " There should have no megred block base address for " + from_aid.Name();
328 SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*op_context), error_info);
329 }
330 }
331 } else {
332 // Free the merged blocks memory.
333 std::vector<void *> free_addrs;
334 for (auto &merged_base_address : somas_info->merged_base_addresses_) {
335 if (merged_base_address.second == nullptr) {
336 std::string error_info = " There should have megred block base address for " + from_aid.Name();
337 SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*op_context), error_info);
338 }
339 (void)free_addrs.emplace_back(merged_base_address.second);
340 merged_base_address.second = nullptr;
341 }
342 device_context->device_res_manager_->FreePartMemorys(free_addrs, keep_addrs,
343 somas_info->graph_output_address_sizes_);
344 }
345
346 // Somas decrease the ref count.
347 for (auto &output_address : somas_info->graph_output_device_addresses_) {
348 output_address->set_from_mem_pool(true);
349 device::tracker::CALL_MEMORY_TRACKER_WITH_FILE(UpdateMemInfo, output_address,
350 device::tracker::MemType::kSomasOutput);
351 device::tracker::CALL_MEMORY_TRACKER_WITH_FILE(BindDevicePtr, output_address, output_address->GetPtr());
352 FreeMemoryByRefCount(output_address, device_context, from_aid.Name());
353 }
354
355 PROFILER_END(start_time, ProfilerModule::kRuntime, ProfilerEvent::kMemoryFree, from_aid.Name(), false);
356 }
357
Wait(OpContext<DeviceTensor> * const op_context,const AID & from_aid)358 void MemoryManagerActor::Wait(OpContext<DeviceTensor> *const op_context, const AID &from_aid) {
359 // Call back to the from actor to process.
360 ActorDispatcher::Send(from_aid, &MemoryAwareActor::OnMemoryAllocFinish, op_context);
361 }
362
363 // Only one of the static and dynamic reference counts will take effect.
FreeMemoryByRefCount(DeviceTensor * const device_tensor,const DeviceContext * device_context,const std::string & op_name)364 void MemoryManagerActor::FreeMemoryByRefCount(DeviceTensor *const device_tensor, const DeviceContext *device_context,
365 const std::string &op_name) {
366 MS_EXCEPTION_IF_NULL(device_tensor);
367 if (device_tensor->original_ref_count() != SIZE_MAX) {
368 // The static reference count is decremented to zero to free memory, and reset to the original count.
369 size_t ref_count = device_tensor->DecreaseRefCount();
370 if (ref_count == 0) {
371 device_tensor->ResetRefCount();
372 device_tensor->ClearUserData();
373 if (device_tensor->GetPtr() != nullptr) {
374 auto held_by_nodes = device_tensor->held_by_nodes();
375 if (held_by_nodes.empty()) {
376 FreeMemoryByDeviceContext(device_tensor, device_context);
377 } else {
378 FreeMemoryByValueNode(held_by_nodes, device_tensor);
379 }
380 }
381 }
382 } else if (device_tensor->dynamic_ref_count() != INT32_MAX) {
383 // The dynamic reference count is decremented to zero to free memory.
384 if ((device_tensor->DecreaseDynamicRefCount(op_name) == 0) && (device_tensor->GetPtr() != nullptr)) {
385 device_tensor->ClearUserData();
386 MS_LOG(DEBUG) << "Free memory by the dynamic reference count, device address" << device_tensor->GetPtr() << ".";
387 if (device_tensor->deleter() != nullptr) {
388 MS_LOG(DEBUG) << "Free ptr:" << device_tensor->GetPtr() << " for device address:" << device_tensor;
389 device_tensor->deleter()(static_cast<uint8_t *>(device_tensor->GetMutablePtr()));
390 device_tensor->set_deleter(nullptr);
391 device_tensor->set_ptr(nullptr);
392 return;
393 }
394 FreeMemoryByDeviceContext(device_tensor, device_context);
395 }
396 }
397 }
398
SetOpContextMemoryAllocFail(const std::string & kernel_name,const DeviceContext * device_context,size_t alloc_size,OpContext<DeviceTensor> * const op_context)399 void MemoryManagerActor::SetOpContextMemoryAllocFail(const std::string &kernel_name,
400 const DeviceContext *device_context, size_t alloc_size,
401 OpContext<DeviceTensor> *const op_context) {
402 MS_EXCEPTION_IF_NULL(device_context);
403 MS_EXCEPTION_IF_NULL(op_context);
404
405 std::lock_guard<std::mutex> locker(mem_alloc_failed_mutex_);
406 int step_id = op_context->sequential_num_;
407 // First occur allocating memory failed.
408 if (mem_alloc_failed_step_ids_.find(step_id) == mem_alloc_failed_step_ids_.end()) {
409 mem_alloc_failed_step_ids_.clear();
410 (void)mem_alloc_failed_step_ids_.insert(step_id);
411 SET_OPCONTEXT_MEMORY_ALLOC_FAIL_BY_STRATEGY(GraphExecutionStrategy::kPipeline, *op_context, *device_context,
412 kernel_name, alloc_size);
413 }
414 }
415 } // namespace runtime
416 } // namespace mindspore
417