1 /**
2 * Copyright 2020 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/engine/cache/cache_arena.h"
17 #include "minddata/dataset/engine/cache/cache_server.h"
18 #include "minddata/dataset/util/path.h"
19 namespace mindspore {
20 namespace dataset {
CachedSharedMemory(int32_t port,size_t val_in_GB)21 CachedSharedMemory::CachedSharedMemory(int32_t port, size_t val_in_GB)
22 : shared_memory_sz_in_gb_(val_in_GB), port_(port), num_numa_nodes_(-1), sub_pool_sz_(-1) {
23 // We create the shared memory and we will destroy it. All other client just detach only.
24 shm_.RemoveResourcesOnExit();
25 }
26 CachedSharedMemory::~CachedSharedMemory() = default;
27
Init()28 Status CachedSharedMemory::Init() {
29 CacheServer &cs = CacheServer::GetInstance();
30 num_numa_nodes_ = cs.GetNumaNodeCount();
31 // Generate the ftok using a combination of port.
32 SharedMemory::shm_key_t shm_key;
33 RETURN_IF_NOT_OK(PortToFtok(port_, &shm_key));
34 shm_.SetPublicKey(shm_key);
35 // Value is in GB. Convert into bytes.
36 int64_t shm_mem_sz = shared_memory_sz_in_gb_ * 1073741824L;
37 RETURN_IF_NOT_OK(shm_.Create(shm_mem_sz));
38 MS_LOG(INFO) << "Creation of shared memory successful. Shared memory key " << shm_.GetKey();
39 // Interleave the memory.
40 cs.GetHWControl()->InterleaveMemory(shm_.SharedMemoryBaseAddr(), shm_mem_sz);
41 // We will create a number of sub pool out of shared memory to reduce latch contention
42 int32_t num_of_pools = num_numa_nodes_;
43 if (num_numa_nodes_ == 1) {
44 constexpr int32_t kNumPoolMultiplier = 2;
45 num_of_pools = shared_memory_sz_in_gb_ * kNumPoolMultiplier;
46 }
47 sub_pool_sz_ = shm_mem_sz / num_of_pools;
48 // If each subpool is too small, readjust the number of pools
49 constexpr int64 min_subpool_sz = 512 * 1048576L;
50 if (sub_pool_sz_ < min_subpool_sz) {
51 sub_pool_sz_ = min_subpool_sz;
52 num_of_pools = shm_mem_sz / min_subpool_sz;
53 }
54 shm_pool_.reserve(num_of_pools);
55 for (auto i = 0; i < num_of_pools; ++i) {
56 void *ptr = static_cast<char *>(shm_.SharedMemoryBaseAddr()) + i * sub_pool_sz_;
57 shm_pool_.push_back(std::make_unique<ArenaImpl>(ptr, sub_pool_sz_));
58 }
59 mux_ = std::make_unique<std::mutex[]>(num_of_pools);
60 return Status::OK();
61 }
62
CreateArena(std::unique_ptr<CachedSharedMemory> * out,int32_t port,size_t val_in_GB)63 Status CachedSharedMemory::CreateArena(std::unique_ptr<CachedSharedMemory> *out, int32_t port, size_t val_in_GB) {
64 RETURN_UNEXPECTED_IF_NULL(out);
65 auto mem_pool = std::unique_ptr<CachedSharedMemory>(new CachedSharedMemory(port, val_in_GB));
66 RETURN_IF_NOT_OK(mem_pool->Init());
67 *out = std::move(mem_pool);
68 return Status::OK();
69 }
70
AllocateSharedMemory(int32_t client_id,size_t sz,void ** p)71 Status CachedSharedMemory::AllocateSharedMemory(int32_t client_id, size_t sz, void **p) {
72 Status rc;
73 RETURN_UNEXPECTED_IF_NULL(p);
74 auto begin_slot = client_id % shm_pool_.size();
75 auto slot = begin_slot;
76 do {
77 std::unique_lock<std::mutex> lock(mux_[slot]);
78 rc = shm_pool_[slot]->Allocate(sz, p);
79 if (rc == StatusCode::kMDOutOfMemory) {
80 slot = (slot + 1) % shm_pool_.size();
81 }
82 } while (rc.IsError() && slot != begin_slot);
83 if (rc.IsError()) {
84 return rc;
85 }
86 return Status::OK();
87 }
88
DeallocateSharedMemory(int32_t client_id,void * p)89 void CachedSharedMemory::DeallocateSharedMemory(int32_t client_id, void *p) {
90 auto begin_slot = client_id % shm_pool_.size();
91 auto slot = begin_slot;
92 auto start_addr = static_cast<char *>(SharedMemoryBaseAddr());
93 bool found = false;
94 do {
95 auto ptr = start_addr + slot * sub_pool_sz_;
96 if (ptr <= p && p < (ptr + sub_pool_sz_)) {
97 std::unique_lock<std::mutex> lock(mux_[slot]);
98 shm_pool_[slot]->Deallocate(p);
99 found = true;
100 break;
101 } else {
102 slot = (slot + 1) % shm_pool_.size();
103 }
104 } while (slot != begin_slot);
105 if (!found) {
106 MS_LOG(ERROR) << "Programming error. Can't find the arena the pointer " << p << " comes from";
107 }
108 }
109 } // namespace dataset
110 } // namespace mindspore
111