1 /**
2 * Copyright 2020 Huawei Technologies Co., Ltd
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include <algorithm>
17 #include <iterator>
18 #include <limits>
19 #include "minddata/dataset/engine/cache/cache_hw.h"
20 #include "minddata/dataset/engine/cache/cache_numa.h"
21 #include "minddata/dataset/util/random.h"
22 namespace mindspore {
23 namespace dataset {
NumaMemoryPool(std::shared_ptr<CacheServerHW> hw,float memory_cap_ratio)24 NumaMemoryPool::NumaMemoryPool(std::shared_ptr<CacheServerHW> hw, float memory_cap_ratio)
25 : hw_(std::move(hw)), memory_cap_ratio_(memory_cap_ratio) {
26 int64_t total_avail = 0;
27 // We will create a number of small Arenas to spread out the server threads so it
28 // will be less contention. If we link with the numa library, i.e. if
29 // NUMA_ENABLED is defined, we will make use of the low level numa library such that
30 // each Arena solely comes from one particular socket.
31 // The total number of Arenas will be controlled under the number of cpus.
32 auto num_cpus = hw_->GetCpuCount();
33 memory_segments_.reserve(num_cpus);
34 arena_list_.reserve(num_cpus);
35 mux_ = std::make_unique<std::mutex[]>(num_cpus);
36 auto num_memory_nodes = num_cpus;
37 int64_t max_avail = CacheServerHW::GetTotalSystemMemory() * memory_cap_ratio_;
38 int64_t arena_sz = max_avail / num_memory_nodes;
39 // If arena_sz is too small, lower the number of Arenas.
40 if (arena_sz < std::numeric_limits<int32_t>::max()) {
41 arena_sz = round_up_4K(std::numeric_limits<int32_t>::max());
42 num_memory_nodes = max_avail / arena_sz;
43 if (num_memory_nodes == 0) {
44 num_memory_nodes = 1;
45 arena_sz = max_avail;
46 }
47 }
48 MS_LOG(INFO) << "Creating " << num_memory_nodes << " number of arena. Each one of size " << arena_sz;
49
50 #ifdef NUMA_ENABLED
51 if (numa_available() != -1) {
52 auto num_numa_nodes = hw_->GetNumaNodeCount();
53 numa_id_t node_id = 0;
54 for (auto i = 0; i < num_memory_nodes; ++i) {
55 auto success = CreateMultipleArenas(arena_sz, node_id++ % num_numa_nodes, 1);
56 total_avail += success * arena_sz;
57 }
58 } else {
59 auto success = CreateMultipleArenas(arena_sz, 0, num_memory_nodes);
60 total_avail += success * arena_sz;
61 }
62 #else
63 auto success = CreateMultipleArenas(arena_sz, 0, num_memory_nodes);
64 total_avail += success * arena_sz;
65 #endif
66 memory_cap_ = total_avail;
67 MS_LOG(WARNING) << "Memory pool created. Total available memory " << memory_cap_ << " spread in " << nodes_.size()
68 << " arenas";
69 int32_t slot = 0;
70 // Set up a map for future easy access.
71 for (auto node_id : nodes_) {
72 numa_map_[node_id].push_back(slot);
73 ++slot;
74 }
75 }
76
CreateMultipleArenas(int64_t segment_sz,numa_id_t node_id,int32_t repeat_count)77 int32_t NumaMemoryPool::CreateMultipleArenas(int64_t segment_sz, numa_id_t node_id, int32_t repeat_count) {
78 int32_t success = 0;
79 for (auto i = 0; i < repeat_count; ++i) {
80 #ifdef NUMA_ENABLED
81 void *ptr = numa_alloc_onnode(segment_sz, node_id);
82 #else
83 void *ptr = malloc(segment_sz);
84 #endif
85 if (ptr != nullptr) {
86 memory_segments_.emplace_back(ptr, segment_sz);
87 arena_list_.push_back(std::make_unique<ArenaImpl>(ptr, segment_sz));
88 nodes_.push_back(node_id);
89 ++success;
90 } else {
91 // Skip the rest.
92 break;
93 }
94 }
95 MS_LOG(DEBUG) << "Allocate " << success << " arenas from node " << node_id;
96 return success;
97 }
98
~NumaMemoryPool()99 NumaMemoryPool::~NumaMemoryPool() {
100 if (!memory_segments_.empty()) {
101 for (auto &s : memory_segments_) {
102 #ifdef NUMA_ENABLED
103 numa_free(s.first, s.second);
104 #else
105 free(s.first);
106 #endif
107 }
108 }
109 }
110
Allocate(size_t n,void ** p)111 Status NumaMemoryPool::Allocate(size_t n, void **p) {
112 RETURN_UNEXPECTED_IF_NULL(p);
113 auto mt = GetRandomDevice();
114 Status rc;
115 void *ptr = nullptr;
116 size_t num_segments = memory_segments_.size();
117 CHECK_FAIL_RETURN_UNEXPECTED(num_segments > 0, "No numa nodes available");
118 if (NumaAware()) {
119 auto num_numa_nodes = hw_->GetNumaNodeCount();
120 // We will start from the numa node this worker id is running on and do a round robin search.
121 numa_id_t start = hw_->GetMyNode();
122 numa_id_t node_id = start;
123 do {
124 auto it = numa_map_.find(node_id);
125 if (it != numa_map_.end()) {
126 auto &slots = it->second;
127 size_t num_slots = slots.size();
128 std::uniform_int_distribution<size_t> distribution(0, num_slots - 1);
129 size_t start_slot = distribution(mt);
130 size_t inx = start_slot;
131 do {
132 size_t k = slots.at(inx);
133 std::unique_lock lock_x(mux_[k]);
134 auto &impl = arena_list_.at(k);
135 rc = impl->Allocate(n, &ptr);
136 if (rc.IsOk()) {
137 *p = ptr;
138 break;
139 } else if (rc == StatusCode::kMDOutOfMemory) {
140 inx = (inx + 1) % num_slots;
141 } else {
142 return rc;
143 }
144 } while (inx != start_slot);
145 }
146 // We have done searching for this numa node. If not found, move to the next node.
147 if (ptr == nullptr) {
148 node_id = (node_id + 1) % num_numa_nodes;
149 } else {
150 break;
151 }
152 } while (node_id != start);
153 } else {
154 // If not numa aware, just randomly pick a slot.
155 std::uniform_int_distribution<size_t> distribution(0, num_segments - 1);
156 size_t start_slot = distribution(mt);
157 size_t slot = start_slot;
158 do {
159 std::unique_lock lock_x(mux_[slot]);
160 auto &impl = arena_list_.at(slot);
161 rc = impl->Allocate(n, &ptr);
162 if (rc.IsOk()) {
163 *p = ptr;
164 break;
165 } else if (rc == StatusCode::kMDOutOfMemory) {
166 // Make the next arena and continue.
167 slot = (slot + 1) % num_segments;
168 } else {
169 return rc;
170 }
171 } while (slot != start_slot);
172 }
173 // Handle the case we have done one round robin search.
174 if (ptr == nullptr) {
175 return Status(StatusCode::kMDOutOfMemory, __LINE__, __FILE__);
176 }
177 return rc;
178 }
179
Deallocate(void * p)180 void NumaMemoryPool::Deallocate(void *p) {
181 // Find out which numa slot it comes from.
182 auto slot = Locate(p);
183 MS_ASSERT(slot != -1);
184 std::unique_lock lock_x(mux_[slot]);
185 auto &impl = arena_list_.at(slot);
186 impl->Deallocate(p);
187 }
188
PercentFree() const189 int NumaMemoryPool::PercentFree() const {
190 int percent_free = 0;
191 int num_arena = 0;
192 for (auto const &p : arena_list_) {
193 percent_free += p->PercentFree();
194 num_arena++;
195 }
196 if (num_arena) {
197 return percent_free / num_arena;
198 } else {
199 return 100;
200 }
201 }
202
Locate(void * p) const203 int32_t NumaMemoryPool::Locate(void *p) const {
204 int32_t slot = 0;
205 char *mem = reinterpret_cast<char *>(p);
206 for (slot = 0; slot < memory_segments_.size(); ++slot) {
207 auto elem = memory_segments_.at(slot);
208 char *q = reinterpret_cast<char *>(elem.first);
209 if (mem >= q && mem < q + elem.second) {
210 return slot;
211 }
212 }
213 return -1;
214 }
215
GetAvailableNodes() const216 std::vector<numa_id_t> NumaMemoryPool::GetAvailableNodes() const {
217 std::vector<numa_id_t> v;
218 std::transform(numa_map_.begin(), numa_map_.end(), std::back_inserter(v),
219 [](const std::pair<numa_id_t, std::vector<int32_t>> &v) { return v.first; });
220 return v;
221 }
222
223 } // namespace dataset
224 } // namespace mindspore
225