1 /** 2 * Copyright 2020-2021 Huawei Technologies Co., Ltd 3 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 8 * http://www.apache.org/licenses/LICENSE-2.0 9 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_SERVER_H_ 18 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_SERVER_H_ 19 20 #include <stdlib.h> 21 #include <string.h> 22 #include <unistd.h> 23 #include <algorithm> 24 #include <atomic> 25 #include <chrono> 26 #include <iostream> 27 #include <memory> 28 #include <mutex> 29 #include <string> 30 #include <utility> 31 #include <vector> 32 #include <map> 33 #include <set> 34 #include <thread> 35 #include "minddata/dataset/engine/cache/cache_arena.h" 36 #include "minddata/dataset/engine/cache/cache_hw.h" 37 #include "minddata/dataset/engine/cache/cache_numa.h" 38 #include "minddata/dataset/engine/cache/cache_service.h" 39 #include "minddata/dataset/engine/cache/cache_grpc_server.h" 40 #include "minddata/dataset/engine/cache/cache_pool.h" 41 #include "minddata/dataset/core/tensor.h" 42 #include "minddata/dataset/util/allocator.h" 43 #include "minddata/dataset/util/arena.h" 44 #include "minddata/dataset/util/lock.h" 45 #include "minddata/dataset/util/random.h" 46 #include "minddata/dataset/util/semaphore.h" 47 #include "minddata/dataset/util/service.h" 48 #include "minddata/dataset/util/services.h" 49 #include "minddata/dataset/util/system_pool.h" 50 #include "minddata/dataset/util/queue.h" 51 #include "minddata/dataset/util/task_manager.h" 52 53 namespace mindspore { 54 namespace dataset { 55 /// \brief A server which provides CacheService services. 56 class CacheServer : public Service { 57 public: 58 friend class Services; 59 using cache_index = std::map<connection_id_type, std::unique_ptr<CacheService>>; 60 // Only allow new service to be created if left memory is more than 15% of our hard memory cap 61 constexpr static float kMemoryBottomLineForNewService = 0.15; 62 63 class Builder { 64 public: 65 Builder(); 66 67 ~Builder() = default; 68 69 /// \brief Getter functions GetTop()70 const std::string &GetTop() const { return top_; } GetNumWorkers()71 int32_t GetNumWorkers() const { return num_workers_; } GetPort()72 int32_t GetPort() const { return port_; } GetSharedMemorySzInGb()73 int32_t GetSharedMemorySzInGb() const { return shared_memory_sz_in_gb_; } GetMemoryCapRatio()74 float GetMemoryCapRatio() const { return memory_cap_ratio_; } GetLogLevel()75 int8_t GetLogLevel() const { return log_level_; } 76 SetRootDirectory(std::string root)77 Builder &SetRootDirectory(std::string root) { 78 top_ = std::move(root); 79 return *this; 80 } SetNumWorkers(int32_t n)81 Builder &SetNumWorkers(int32_t n) { 82 num_workers_ = n; 83 return *this; 84 } SetPort(int32_t p)85 Builder &SetPort(int32_t p) { 86 port_ = p; 87 return *this; 88 } SetSharedMemorySizeInGB(int32_t sz)89 Builder &SetSharedMemorySizeInGB(int32_t sz) { 90 shared_memory_sz_in_gb_ = sz; 91 return *this; 92 } SetMemoryCapRatio(float ratio)93 Builder &SetMemoryCapRatio(float ratio) { 94 memory_cap_ratio_ = ratio; 95 return *this; 96 } SetLogLevel(int8_t log_level)97 Builder &SetLogLevel(int8_t log_level) { 98 log_level_ = log_level; 99 return *this; 100 } 101 102 Status SanityCheck(); 103 Print(std::ostream & out)104 void Print(std::ostream &out) const { 105 out << "Summary of the cache server configuration\n" 106 << "Spill directory: " << (GetTop().empty() ? "None" : GetTop()) << "\n" 107 << "Number of parallel workers: " << GetNumWorkers() << "\n" 108 << "Tcp/ip port: " << GetPort() << "\n" 109 << "Shared memory size (in GB): " << GetSharedMemorySzInGb() << "\n" 110 << "Memory cap ratio: " << GetMemoryCapRatio() << "\n" 111 << "Log level: " << std::to_string(GetLogLevel()); 112 } 113 114 friend std::ostream &operator<<(std::ostream &out, const Builder &bld) { 115 bld.Print(out); 116 return out; 117 } 118 Build()119 Status Build() { 120 // Get information of numa architecture and adjust num_workers_ based on numa count 121 hw_info_ = std::make_shared<CacheServerHW>(); 122 RETURN_IF_NOT_OK(hw_info_->GetNumaNodeInfo()); 123 std::string warning_string; 124 if (num_workers_ == -1) { 125 // if the user did not provide a value for num_workers, set it to half of num_cpu as default and adjust it if 126 // the default is not the optimal. 127 int32_t dft_num_workers = std::thread::hardware_concurrency() > 2 ? std::thread::hardware_concurrency() / 2 : 1; 128 num_workers_ = AdjustNumWorkers(dft_num_workers); 129 } else { 130 // if the users have given their own value, adjust it and provide a warning if it got changed. 131 int32_t num_workers_new = AdjustNumWorkers(num_workers_); 132 if (num_workers_ != num_workers_new) { 133 warning_string = 134 "The configuration of workers on the cache server is dependent on the NUMA architecture of the server. " 135 "The current setting is not the optimal for the NUMA architecture. Re-adjusting the number of workers " 136 "to optimal setting of " + 137 std::to_string(num_workers_new) + ".\n"; 138 MS_LOG(INFO) << warning_string; 139 } 140 num_workers_ = num_workers_new; 141 } 142 RETURN_IF_NOT_OK(SanityCheck()); 143 // We need to bring up the Task Manager by bringing up the Services singleton. 144 RETURN_IF_NOT_OK(Services::CreateInstance()); 145 RETURN_IF_NOT_OK(CacheServer::CreateInstance(top_, num_workers_, port_, shared_memory_sz_in_gb_, 146 memory_cap_ratio_, log_level_, std::move(hw_info_))); 147 return Status(StatusCode::kSuccess, warning_string); 148 } 149 150 private: 151 std::string top_; 152 int32_t num_workers_; 153 int32_t port_; 154 int32_t shared_memory_sz_in_gb_; 155 float memory_cap_ratio_; 156 int8_t log_level_; 157 std::shared_ptr<CacheServerHW> hw_info_; 158 159 /// \brief Sanity checks on the shared memory. 160 /// \return Status object 161 Status IpcResourceCleanup(); 162 163 /// \brief Adjust the value of num_workers if it's not the optimal to NUMA architecture. 164 int32_t AdjustNumWorkers(int32_t num_workers); 165 }; 166 167 CacheServer(const CacheServer &) = delete; 168 CacheServer &operator=(const CacheServer &) = delete; 169 CacheServer(CacheServer &&) = delete; 170 CacheServer &operator=(CacheServer &) = delete; 171 Status DoServiceStart() override; 172 Status DoServiceStop() override; ~CacheServer()173 ~CacheServer() override { (void)ServiceStop(); } 174 CreateInstance(const std::string & spill_path,int32_t num_workers,int32_t port,int32_t shared_memory_sz,float memory_cap_ratio,int8_t log_level,std::shared_ptr<CacheServerHW> hw_info)175 static Status CreateInstance(const std::string &spill_path, int32_t num_workers, int32_t port, 176 int32_t shared_memory_sz, float memory_cap_ratio, int8_t log_level, 177 std::shared_ptr<CacheServerHW> hw_info) { 178 std::call_once(init_instance_flag_, [&]() -> Status { 179 auto &SvcManager = Services::GetInstance(); 180 RETURN_IF_NOT_OK(SvcManager.AddHook(&instance_, spill_path, num_workers, port, shared_memory_sz, memory_cap_ratio, 181 log_level, hw_info)); 182 return Status::OK(); 183 }); 184 return Status::OK(); 185 } 186 GetInstance()187 static CacheServer &GetInstance() { return *instance_; } 188 189 /// \brief For the current demonstration, a cache client contacts cache server using a Queue. 190 /// \param rq 191 /// \return Status object PushRequest(int32_t queue_id,CacheServerRequest * rq)192 Status PushRequest(int32_t queue_id, CacheServerRequest *rq) { 193 RETURN_UNEXPECTED_IF_NULL(rq); 194 RETURN_IF_NOT_OK(cache_q_->operator[](queue_id)->Add(rq)); 195 return Status::OK(); 196 } 197 198 /// \\brief Kick off server threads. Never return unless error out. 199 Status Run(SharedMessage::queue_id_t msg_qid); 200 201 /// \brief Get a free tag 202 /// \param q[in] pointer to a pointer to a CacheServerRequest 203 /// \return Status object 204 static Status GetFreeRequestTag(CacheServerRequest **q); 205 206 /// \brief Return a tag to the free list 207 /// \param p[in] pointer to already finished CacheServerRequest tag 208 /// \return Status object 209 static Status ReturnRequestTag(CacheServerRequest *p); 210 211 /// Return an instance of the numa control GetHWControl()212 std::shared_ptr<CacheServerHW> GetHWControl() { return hw_info_; } 213 214 /// \brief Set CPU affinity SetAffinity(const Task & tk,numa_id_t numa_node)215 Status SetAffinity(const Task &tk, numa_id_t numa_node) { return hw_info_->SetAffinity(tk, numa_node); } 216 217 /// \brief return number of workers GetNumWorkers()218 auto GetNumWorkers() const { return num_workers_; } 219 220 /// \brief return number of grpc workers GetNumGrpcWorkers()221 auto GetNumGrpcWorkers() const { return num_grpc_workers_; } 222 223 /// \brief return number of numa nodes GetNumaNodeCount()224 auto GetNumaNodeCount() const { return hw_info_->GetNumaNodeCount(); } 225 226 /// \brief Assign a worker by a numa id 227 /// \return worker id 228 worker_id_t GetWorkerByNumaId(numa_id_t node_id) const; 229 230 /// \brief Randomly pick a worker 231 /// \return worker id 232 worker_id_t GetRandomWorker() const; 233 234 /// \brief Check if we bind threads to numa cores IsNumaAffinityOn()235 bool IsNumaAffinityOn() const { return numa_affinity_; } 236 237 /// \brief Return the memory cap ratio GetMemoryCapRatio()238 float GetMemoryCapRatio() const { return memory_cap_ratio_; } 239 240 /// \brief Function to handle a row request 241 /// \param[in] cache_req A row request to handle 242 /// \param[out] internal_request Indicator if the request is an internal request 243 /// \return Status object 244 Status ProcessRowRequest(CacheServerRequest *cache_req, bool *internal_request); 245 246 /// \brief Function to handle an admin request 247 /// \param[in] cache_req An admin request to handle 248 /// \return Status object 249 Status ProcessAdminRequest(CacheServerRequest *cache_req); 250 251 /// \brief Function to handle a session request 252 /// \param[in] cache_req A session request to handle 253 /// \return Status object 254 Status ProcessSessionRequest(CacheServerRequest *cache_req); 255 256 /// \brief How a request is handled. 257 /// \note that it can be process immediately by a grpc thread or routed to a server thread 258 /// which is pinned to some numa node core. 259 Status ProcessRequest(CacheServerRequest *cache_req); 260 261 void GlobalShutdown(); 262 263 /// \brief This returns where we attach to the shared memory. 264 /// Some gRPC requests will ask for a shared memory block, and 265 /// we can't return the absolute address as this makes no sense 266 /// in the client. So instead we will return an address relative 267 /// to the base address of the shared memory where we attach to. 268 /// \return Base address of the shared memory. SharedMemoryBaseAddr()269 const void *SharedMemoryBaseAddr() const { return shm_->SharedMemoryBaseAddr(); } 270 271 /// \brief Return the public key of the shared memory. GetKey()272 int32_t GetKey() const { return shm_->GetKey(); } 273 274 Status AllocateSharedMemory(int32_t client_id, size_t sz, void **p); 275 276 void DeallocateSharedMemory(int32_t client_id, void *p); 277 278 private: 279 static std::once_flag init_instance_flag_; 280 static CacheServer *instance_; 281 mutable RWLock rwLock_; 282 mutable RWLock sessions_lock_; 283 std::string top_; 284 cache_index all_caches_; 285 std::set<session_id_type> active_sessions_; 286 std::shared_ptr<QueueList<CacheServerRequest *>> cache_q_; 287 std::shared_ptr<CacheServerGreeterImpl> comm_layer_; 288 TaskGroup vg_; 289 int32_t num_workers_; 290 int32_t num_grpc_workers_; 291 int32_t port_; 292 int32_t shared_memory_sz_in_gb_; 293 int8_t log_level_; // log_level is saved here for informational purpose only. It's not a functional field. 294 std::atomic<bool> global_shutdown_; 295 float memory_cap_ratio_; 296 std::shared_ptr<CacheServerHW> hw_info_; 297 std::map<worker_id_t, Task *> numa_tasks_; 298 bool numa_affinity_; 299 std::vector<int32_t> shutdown_qIDs_; 300 std::unique_ptr<CachedSharedMemory> shm_; 301 302 /// \brief Constructor 303 /// \param spill_path Top directory for spilling buffers to. 304 /// \param num_workers Number of threads for handling requests. 305 explicit CacheServer(const std::string &spill_path, int32_t num_workers, int32_t port, int32_t share_memory_sz_in_gb, 306 float memory_cap_ratio, int8_t log_level, std::shared_ptr<CacheServerHW> hw_info); 307 308 /// \brief Locate a cache service from connection id. 309 /// \return Pointer to cache service. Null if not found 310 CacheService *GetService(connection_id_type id) const; 311 312 /// \brief Going over existing cache service and calculate how much we have consumed so far, a new cache service 313 /// can only be created if there is still enough avail memory left 314 /// \param cache_mem_sz Requested memory for a new cache service 315 /// \return Status object 316 Status GlobalMemoryCheck(uint64_t cache_mem_sz); 317 318 /// \brief Create a cache service. We allow multiple clients to create the same cache service. 319 /// Subsequent duplicate requests are ignored. The first cache client to create the service will be given 320 /// a special unique cookie. 321 /// \return Status object 322 Status CreateService(CacheRequest *rq, CacheReply *reply); 323 324 /// \brief Destroy a cache service 325 /// \param rq 326 /// \return Status object 327 Status DestroyCache(CacheRequest *rq); 328 329 /// \brief Entry point for all internal server threads. 330 Status ServerRequest(worker_id_t worker_id); 331 332 /// \brief Entry point for all grpc threads. 333 /// \return 334 Status RpcRequest(worker_id_t worker_id); 335 336 Status DestroySession(CacheRequest *rq); 337 338 /// \brief Create a connection id from a session id and a crc 339 /// \param session_id 340 /// \param crc 341 /// \return connection id 342 connection_id_type GetConnectionID(session_id_type session_id, uint32_t crc) const; 343 344 /// \brief Extract the session id from a connection id 345 /// \param connection_id 346 /// \return session id 347 session_id_type GetSessionID(connection_id_type connection_id) const; 348 349 /// \brief Generate a session ID for the client 350 /// \return Session ID 351 session_id_type GenerateSessionID(); 352 353 /// \brief Handle kAllocateSharedBlock request 354 /// \param rq CacheRequest 355 /// \param reply CacheReply 356 /// \return Status object 357 Status AllocateSharedMemory(CacheRequest *rq, CacheReply *reply); 358 359 /// \brief Handle kFreeSharedBlock request 360 /// \param rq 361 /// \return Status object 362 Status FreeSharedMemory(CacheRequest *rq); 363 364 /// \brief Handle CacheRow request 365 /// \note There are two different implementation depends if shared memory is used for transportation. 366 /// \return Status object 367 Status FastCacheRow(CacheRequest *rq, CacheReply *reply); 368 Status CacheRow(CacheRequest *rq, CacheReply *reply); 369 370 /// \brief Internal function to get statistics 371 /// \param rq 372 /// \param reply 373 /// \return Status object 374 Status GetStat(CacheRequest *rq, CacheReply *reply); 375 376 /// \brief Internal function to get cache state 377 /// \param rq 378 /// \param reply 379 /// \return Status object 380 Status GetCacheState(CacheRequest *rq, CacheReply *reply); 381 382 /// \brief Cache a schema request 383 /// \param rq 384 /// \return Status object 385 Status CacheSchema(CacheRequest *rq); 386 387 /// \brief Fetch a schema request 388 /// \param rq 389 /// \param reply 390 /// \return Status object 391 Status FetchSchema(CacheRequest *rq, CacheReply *reply); 392 393 /// \brief Mark Build phase done (for non-mappable case) 394 /// \param rq 395 /// \return Status object 396 Status BuildPhaseDone(CacheRequest *rq); 397 398 /// \brief A proper shutdown of the server 399 /// \return Status object 400 Status AcknowledgeShutdown(CacheServerRequest *cache_req); 401 402 /// \brief Find keys that will be cache miss 403 /// \return Status object 404 Status GetCacheMissKeys(CacheRequest *rq, CacheReply *reply); 405 406 /// \brief Toggle write mode for a service 407 Status ToggleWriteMode(CacheRequest *rq); 408 409 /// \brief List the sessions and their caches 410 /// \param reply 411 /// \return Status object 412 Status ListSessions(CacheReply *reply); 413 414 /// \brief Connect request by a pipeline 415 Status ConnectReset(CacheRequest *rq); 416 417 /// \brief This is an internal structure used by Batch processing. 418 /// This is how it works internally. For batch fetch/cache, the grpc thread 419 /// will intercept the request and breaks it down into multiple internal requests 420 /// and spread over all the server workers. But each internal request consumes 421 /// one free tag and we may run out of free tags if they don't return promptly. 422 /// So we will let the server thread return the free tag immediately but the put 423 /// the return code in this following structure. GRPC thread must wait until all 424 /// the rc come back. 425 class BatchWait : public std::enable_shared_from_this<BatchWait> { 426 public: BatchWait(int n)427 explicit BatchWait(int n) : expected_(n), num_back_(0) { 428 expected_ = n; 429 rc_lists_.reserve(expected_); 430 } 431 ~BatchWait() = default; 432 GetBatchWait()433 std::weak_ptr<BatchWait> GetBatchWait() { return weak_from_this(); } 434 Set(Status rc)435 Status Set(Status rc) { 436 CHECK_FAIL_RETURN_UNEXPECTED(expected_ > num_back_, "Programming error"); 437 std::unique_lock<std::mutex> lck(mux_); 438 rc_lists_.push_back(std::move(rc)); 439 ++num_back_; 440 if (num_back_ == expected_) { 441 wp_.Set(); 442 } 443 return Status::OK(); 444 } 445 Wait()446 Status Wait() { return wp_.Wait(); } 447 GetRc()448 Status GetRc() { 449 Status rc; 450 for (auto &cache_rc : rc_lists_) { 451 if (cache_rc.IsError() && cache_rc != StatusCode::kMDInterrupted && rc.IsOk()) { 452 rc = cache_rc; 453 } 454 } 455 return rc; 456 } 457 458 private: 459 std::mutex mux_; 460 WaitPost wp_; 461 int64_t expected_; 462 int64_t num_back_; 463 std::vector<Status> rc_lists_; 464 }; 465 466 /// \brief Internal function to do row batch fetch 467 /// \param rq Request 468 /// \param reply Reply 469 /// \return Status object 470 Status BatchFetchRows(CacheRequest *rq, CacheReply *reply); 471 472 /// \brief Main function to fetch rows in batch. The output is a contiguous memory which will be decoded 473 /// by the CacheClient. Cache miss is not an error, and will be coded in the output to mark an empty row. 474 /// \param[in] v A vector of row id. 475 /// \param[out] out A contiguous memory buffer that holds the requested rows. 476 /// \return Status object 477 Status BatchFetch(const std::shared_ptr<flatbuffers::FlatBufferBuilder> &fbb, WritableSlice *out); 478 Status BatchCacheRows(CacheRequest *rq); 479 480 Status InternalFetchRow(CacheRequest *rq); 481 Status InternalCacheRow(CacheRequest *rq, CacheReply *reply); 482 }; 483 } // namespace dataset 484 } // namespace mindspore 485 #endif // MINDSPORE_CCSRC_MINDDATA_DATASET_CORE_CACHE_TENSOR_H_ 486