• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/engine/cache/cache_arena.h"
17 #include "minddata/dataset/engine/cache/cache_server.h"
18 #include "minddata/dataset/util/path.h"
19 namespace mindspore {
20 namespace dataset {
CachedSharedMemory(int32_t port,size_t val_in_GB)21 CachedSharedMemory::CachedSharedMemory(int32_t port, size_t val_in_GB)
22     : shared_memory_sz_in_gb_(val_in_GB), port_(port), num_numa_nodes_(-1), sub_pool_sz_(-1) {
23   // We create the shared memory and we will destroy it. All other client just detach only.
24   shm_.RemoveResourcesOnExit();
25 }
26 CachedSharedMemory::~CachedSharedMemory() = default;
27 
Init()28 Status CachedSharedMemory::Init() {
29   CacheServer &cs = CacheServer::GetInstance();
30   num_numa_nodes_ = cs.GetNumaNodeCount();
31   // Generate the ftok using a combination of port.
32   SharedMemory::shm_key_t shm_key;
33   RETURN_IF_NOT_OK(PortToFtok(port_, &shm_key));
34   shm_.SetPublicKey(shm_key);
35   // Value is in GB. Convert into bytes.
36   int64_t shm_mem_sz = shared_memory_sz_in_gb_ * 1073741824L;
37   RETURN_IF_NOT_OK(shm_.Create(shm_mem_sz));
38   MS_LOG(INFO) << "Creation of shared memory successful. Shared memory key " << shm_.GetKey();
39   // Interleave the memory.
40   cs.GetHWControl()->InterleaveMemory(shm_.SharedMemoryBaseAddr(), shm_mem_sz);
41   // We will create a number of sub pool out of shared memory to reduce latch contention
42   int32_t num_of_pools = num_numa_nodes_;
43   if (num_numa_nodes_ == 1) {
44     constexpr int32_t kNumPoolMultiplier = 2;
45     num_of_pools = shared_memory_sz_in_gb_ * kNumPoolMultiplier;
46   }
47   sub_pool_sz_ = shm_mem_sz / num_of_pools;
48   // If each subpool is too small, readjust the number of pools
49   constexpr int64 min_subpool_sz = 512 * 1048576L;
50   if (sub_pool_sz_ < min_subpool_sz) {
51     sub_pool_sz_ = min_subpool_sz;
52     num_of_pools = shm_mem_sz / min_subpool_sz;
53   }
54   shm_pool_.reserve(num_of_pools);
55   for (auto i = 0; i < num_of_pools; ++i) {
56     void *ptr = static_cast<char *>(shm_.SharedMemoryBaseAddr()) + i * sub_pool_sz_;
57     shm_pool_.push_back(std::make_unique<ArenaImpl>(ptr, sub_pool_sz_));
58   }
59   mux_ = std::make_unique<std::mutex[]>(num_of_pools);
60   return Status::OK();
61 }
62 
CreateArena(std::unique_ptr<CachedSharedMemory> * out,int32_t port,size_t val_in_GB)63 Status CachedSharedMemory::CreateArena(std::unique_ptr<CachedSharedMemory> *out, int32_t port, size_t val_in_GB) {
64   RETURN_UNEXPECTED_IF_NULL(out);
65   auto mem_pool = std::unique_ptr<CachedSharedMemory>(new CachedSharedMemory(port, val_in_GB));
66   RETURN_IF_NOT_OK(mem_pool->Init());
67   *out = std::move(mem_pool);
68   return Status::OK();
69 }
70 
AllocateSharedMemory(int32_t client_id,size_t sz,void ** p)71 Status CachedSharedMemory::AllocateSharedMemory(int32_t client_id, size_t sz, void **p) {
72   Status rc;
73   RETURN_UNEXPECTED_IF_NULL(p);
74   auto begin_slot = client_id % shm_pool_.size();
75   auto slot = begin_slot;
76   do {
77     std::unique_lock<std::mutex> lock(mux_[slot]);
78     rc = shm_pool_[slot]->Allocate(sz, p);
79     if (rc == StatusCode::kMDOutOfMemory) {
80       slot = (slot + 1) % shm_pool_.size();
81     }
82   } while (rc.IsError() && slot != begin_slot);
83   if (rc.IsError()) {
84     return rc;
85   }
86   return Status::OK();
87 }
88 
DeallocateSharedMemory(int32_t client_id,void * p)89 void CachedSharedMemory::DeallocateSharedMemory(int32_t client_id, void *p) {
90   auto begin_slot = client_id % shm_pool_.size();
91   auto slot = begin_slot;
92   auto start_addr = static_cast<char *>(SharedMemoryBaseAddr());
93   bool found = false;
94   do {
95     auto ptr = start_addr + slot * sub_pool_sz_;
96     if (ptr <= p && p < (ptr + sub_pool_sz_)) {
97       std::unique_lock<std::mutex> lock(mux_[slot]);
98       shm_pool_[slot]->Deallocate(p);
99       found = true;
100       break;
101     } else {
102       slot = (slot + 1) % shm_pool_.size();
103     }
104   } while (slot != begin_slot);
105   if (!found) {
106     MS_LOG(ERROR) << "Programming error. Can't find the arena the pointer " << p << " comes from";
107   }
108 }
109 }  // namespace dataset
110 }  // namespace mindspore
111