• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020 Huawei Technologies Co., Ltd
3 
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7 
8  * http://www.apache.org/licenses/LICENSE-2.0
9 
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15 */
16 #include <algorithm>
17 #include <iterator>
18 #include <limits>
19 #include "minddata/dataset/engine/cache/cache_hw.h"
20 #include "minddata/dataset/engine/cache/cache_numa.h"
21 #include "minddata/dataset/util/random.h"
22 namespace mindspore {
23 namespace dataset {
NumaMemoryPool(std::shared_ptr<CacheServerHW> hw,float memory_cap_ratio)24 NumaMemoryPool::NumaMemoryPool(std::shared_ptr<CacheServerHW> hw, float memory_cap_ratio)
25     : hw_(std::move(hw)), memory_cap_ratio_(memory_cap_ratio) {
26   int64_t total_avail = 0;
27   // We will create a number of small Arenas to spread out the server threads so it
28   // will be less contention. If we link with the numa library, i.e. if
29   // NUMA_ENABLED is defined, we will make use of the low level numa library such that
30   // each Arena solely comes from one particular socket.
31   // The total number of Arenas will be controlled under the number of cpus.
32   auto num_cpus = hw_->GetCpuCount();
33   memory_segments_.reserve(num_cpus);
34   arena_list_.reserve(num_cpus);
35   mux_ = std::make_unique<std::mutex[]>(num_cpus);
36   auto num_memory_nodes = num_cpus;
37   int64_t max_avail = CacheServerHW::GetTotalSystemMemory() * memory_cap_ratio_;
38   int64_t arena_sz = max_avail / num_memory_nodes;
39   // If arena_sz is too small, lower the number of Arenas.
40   if (arena_sz < std::numeric_limits<int32_t>::max()) {
41     arena_sz = round_up_4K(std::numeric_limits<int32_t>::max());
42     num_memory_nodes = max_avail / arena_sz;
43     if (num_memory_nodes == 0) {
44       num_memory_nodes = 1;
45       arena_sz = max_avail;
46     }
47   }
48   MS_LOG(INFO) << "Creating " << num_memory_nodes << " number of arena. Each one of size " << arena_sz;
49 
50 #ifdef NUMA_ENABLED
51   if (numa_available() != -1) {
52     auto num_numa_nodes = hw_->GetNumaNodeCount();
53     numa_id_t node_id = 0;
54     for (auto i = 0; i < num_memory_nodes; ++i) {
55       auto success = CreateMultipleArenas(arena_sz, node_id++ % num_numa_nodes, 1);
56       total_avail += success * arena_sz;
57     }
58   } else {
59     auto success = CreateMultipleArenas(arena_sz, 0, num_memory_nodes);
60     total_avail += success * arena_sz;
61   }
62 #else
63   auto success = CreateMultipleArenas(arena_sz, 0, num_memory_nodes);
64   total_avail += success * arena_sz;
65 #endif
66   memory_cap_ = total_avail;
67   MS_LOG(WARNING) << "Memory pool created. Total available memory " << memory_cap_ << " spread in " << nodes_.size()
68                   << " arenas";
69   int32_t slot = 0;
70   // Set up a map for future easy access.
71   for (auto node_id : nodes_) {
72     numa_map_[node_id].push_back(slot);
73     ++slot;
74   }
75 }
76 
CreateMultipleArenas(int64_t segment_sz,numa_id_t node_id,int32_t repeat_count)77 int32_t NumaMemoryPool::CreateMultipleArenas(int64_t segment_sz, numa_id_t node_id, int32_t repeat_count) {
78   int32_t success = 0;
79   for (auto i = 0; i < repeat_count; ++i) {
80 #ifdef NUMA_ENABLED
81     void *ptr = numa_alloc_onnode(segment_sz, node_id);
82 #else
83     void *ptr = malloc(segment_sz);
84 #endif
85     if (ptr != nullptr) {
86       memory_segments_.emplace_back(ptr, segment_sz);
87       arena_list_.push_back(std::make_unique<ArenaImpl>(ptr, segment_sz));
88       nodes_.push_back(node_id);
89       ++success;
90     } else {
91       // Skip the rest.
92       break;
93     }
94   }
95   MS_LOG(DEBUG) << "Allocate " << success << " arenas from node " << node_id;
96   return success;
97 }
98 
~NumaMemoryPool()99 NumaMemoryPool::~NumaMemoryPool() {
100   if (!memory_segments_.empty()) {
101     for (auto &s : memory_segments_) {
102 #ifdef NUMA_ENABLED
103       numa_free(s.first, s.second);
104 #else
105       free(s.first);
106 #endif
107     }
108   }
109 }
110 
Allocate(size_t n,void ** p)111 Status NumaMemoryPool::Allocate(size_t n, void **p) {
112   RETURN_UNEXPECTED_IF_NULL(p);
113   auto mt = GetRandomDevice();
114   Status rc;
115   void *ptr = nullptr;
116   size_t num_segments = memory_segments_.size();
117   CHECK_FAIL_RETURN_UNEXPECTED(num_segments > 0, "No numa nodes available");
118   if (NumaAware()) {
119     auto num_numa_nodes = hw_->GetNumaNodeCount();
120     // We will start from the numa node this worker id is running on and do a round robin search.
121     numa_id_t start = hw_->GetMyNode();
122     numa_id_t node_id = start;
123     do {
124       auto it = numa_map_.find(node_id);
125       if (it != numa_map_.end()) {
126         auto &slots = it->second;
127         size_t num_slots = slots.size();
128         std::uniform_int_distribution<size_t> distribution(0, num_slots - 1);
129         size_t start_slot = distribution(mt);
130         size_t inx = start_slot;
131         do {
132           size_t k = slots.at(inx);
133           std::unique_lock lock_x(mux_[k]);
134           auto &impl = arena_list_.at(k);
135           rc = impl->Allocate(n, &ptr);
136           if (rc.IsOk()) {
137             *p = ptr;
138             break;
139           } else if (rc == StatusCode::kMDOutOfMemory) {
140             inx = (inx + 1) % num_slots;
141           } else {
142             return rc;
143           }
144         } while (inx != start_slot);
145       }
146       // We have done searching for this numa node. If not found, move to the next node.
147       if (ptr == nullptr) {
148         node_id = (node_id + 1) % num_numa_nodes;
149       } else {
150         break;
151       }
152     } while (node_id != start);
153   } else {
154     // If not numa aware, just randomly pick a slot.
155     std::uniform_int_distribution<size_t> distribution(0, num_segments - 1);
156     size_t start_slot = distribution(mt);
157     size_t slot = start_slot;
158     do {
159       std::unique_lock lock_x(mux_[slot]);
160       auto &impl = arena_list_.at(slot);
161       rc = impl->Allocate(n, &ptr);
162       if (rc.IsOk()) {
163         *p = ptr;
164         break;
165       } else if (rc == StatusCode::kMDOutOfMemory) {
166         // Make the next arena and continue.
167         slot = (slot + 1) % num_segments;
168       } else {
169         return rc;
170       }
171     } while (slot != start_slot);
172   }
173   // Handle the case we have done one round robin search.
174   if (ptr == nullptr) {
175     return Status(StatusCode::kMDOutOfMemory, __LINE__, __FILE__);
176   }
177   return rc;
178 }
179 
Deallocate(void * p)180 void NumaMemoryPool::Deallocate(void *p) {
181   // Find out which numa slot it comes from.
182   auto slot = Locate(p);
183   MS_ASSERT(slot != -1);
184   std::unique_lock lock_x(mux_[slot]);
185   auto &impl = arena_list_.at(slot);
186   impl->Deallocate(p);
187 }
188 
PercentFree() const189 int NumaMemoryPool::PercentFree() const {
190   int percent_free = 0;
191   int num_arena = 0;
192   for (auto const &p : arena_list_) {
193     percent_free += p->PercentFree();
194     num_arena++;
195   }
196   if (num_arena) {
197     return percent_free / num_arena;
198   } else {
199     return 100;
200   }
201 }
202 
Locate(void * p) const203 int32_t NumaMemoryPool::Locate(void *p) const {
204   int32_t slot = 0;
205   char *mem = reinterpret_cast<char *>(p);
206   for (slot = 0; slot < memory_segments_.size(); ++slot) {
207     auto elem = memory_segments_.at(slot);
208     char *q = reinterpret_cast<char *>(elem.first);
209     if (mem >= q && mem < q + elem.second) {
210       return slot;
211     }
212   }
213   return -1;
214 }
215 
GetAvailableNodes() const216 std::vector<numa_id_t> NumaMemoryPool::GetAvailableNodes() const {
217   std::vector<numa_id_t> v;
218   std::transform(numa_map_.begin(), numa_map_.end(), std::back_inserter(v),
219                  [](const std::pair<numa_id_t, std::vector<int32_t>> &v) { return v.first; });
220   return v;
221 }
222 
223 }  // namespace dataset
224 }  // namespace mindspore
225