• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020-2021 Huawei Technologies Co., Ltd
3 
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7 
8  * http://www.apache.org/licenses/LICENSE-2.0
9 
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15 */
16 
17 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_SERVER_H_
18 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_SERVER_H_
19 
20 #include <stdlib.h>
21 #include <string.h>
22 #include <unistd.h>
23 #include <algorithm>
24 #include <atomic>
25 #include <chrono>
26 #include <iostream>
27 #include <memory>
28 #include <mutex>
29 #include <string>
30 #include <utility>
31 #include <vector>
32 #include <map>
33 #include <set>
34 #include <thread>
35 #include "minddata/dataset/engine/cache/cache_arena.h"
36 #include "minddata/dataset/engine/cache/cache_hw.h"
37 #include "minddata/dataset/engine/cache/cache_numa.h"
38 #include "minddata/dataset/engine/cache/cache_service.h"
39 #include "minddata/dataset/engine/cache/cache_grpc_server.h"
40 #include "minddata/dataset/engine/cache/cache_pool.h"
41 #include "minddata/dataset/core/tensor.h"
42 #include "minddata/dataset/util/allocator.h"
43 #include "minddata/dataset/util/arena.h"
44 #include "minddata/dataset/util/lock.h"
45 #include "minddata/dataset/util/random.h"
46 #include "minddata/dataset/util/semaphore.h"
47 #include "minddata/dataset/util/service.h"
48 #include "minddata/dataset/util/services.h"
49 #include "minddata/dataset/util/system_pool.h"
50 #include "minddata/dataset/util/queue.h"
51 #include "minddata/dataset/util/task_manager.h"
52 
53 namespace mindspore {
54 namespace dataset {
55 /// \brief A server which provides CacheService services.
56 class CacheServer : public Service {
57  public:
58   friend class Services;
59   using cache_index = std::map<connection_id_type, std::unique_ptr<CacheService>>;
60   // Only allow new service to be created if left memory is more than 15% of our hard memory cap
61   constexpr static float kMemoryBottomLineForNewService = 0.15;
62 
63   class Builder {
64    public:
65     Builder();
66 
67     ~Builder() = default;
68 
69     /// \brief Getter functions
GetTop()70     const std::string &GetTop() const { return top_; }
GetNumWorkers()71     int32_t GetNumWorkers() const { return num_workers_; }
GetPort()72     int32_t GetPort() const { return port_; }
GetSharedMemorySzInGb()73     int32_t GetSharedMemorySzInGb() const { return shared_memory_sz_in_gb_; }
GetMemoryCapRatio()74     float GetMemoryCapRatio() const { return memory_cap_ratio_; }
GetLogLevel()75     int8_t GetLogLevel() const { return log_level_; }
76 
SetRootDirectory(std::string root)77     Builder &SetRootDirectory(std::string root) {
78       top_ = std::move(root);
79       return *this;
80     }
SetNumWorkers(int32_t n)81     Builder &SetNumWorkers(int32_t n) {
82       num_workers_ = n;
83       return *this;
84     }
SetPort(int32_t p)85     Builder &SetPort(int32_t p) {
86       port_ = p;
87       return *this;
88     }
SetSharedMemorySizeInGB(int32_t sz)89     Builder &SetSharedMemorySizeInGB(int32_t sz) {
90       shared_memory_sz_in_gb_ = sz;
91       return *this;
92     }
SetMemoryCapRatio(float ratio)93     Builder &SetMemoryCapRatio(float ratio) {
94       memory_cap_ratio_ = ratio;
95       return *this;
96     }
SetLogLevel(int8_t log_level)97     Builder &SetLogLevel(int8_t log_level) {
98       log_level_ = log_level;
99       return *this;
100     }
101 
102     Status SanityCheck();
103 
Print(std::ostream & out)104     void Print(std::ostream &out) const {
105       out << "Summary of the cache server configuration\n"
106           << "Spill directory: " << (GetTop().empty() ? "None" : GetTop()) << "\n"
107           << "Number of parallel workers: " << GetNumWorkers() << "\n"
108           << "Tcp/ip port: " << GetPort() << "\n"
109           << "Shared memory size (in GB): " << GetSharedMemorySzInGb() << "\n"
110           << "Memory cap ratio: " << GetMemoryCapRatio() << "\n"
111           << "Log level: " << std::to_string(GetLogLevel());
112     }
113 
114     friend std::ostream &operator<<(std::ostream &out, const Builder &bld) {
115       bld.Print(out);
116       return out;
117     }
118 
Build()119     Status Build() {
120       // Get information of numa architecture and adjust num_workers_ based on numa count
121       hw_info_ = std::make_shared<CacheServerHW>();
122       RETURN_IF_NOT_OK(hw_info_->GetNumaNodeInfo());
123       std::string warning_string;
124       if (num_workers_ == -1) {
125         // if the user did not provide a value for num_workers, set it to half of num_cpu as default and adjust it if
126         // the default is not the optimal.
127         int32_t dft_num_workers = std::thread::hardware_concurrency() > 2 ? std::thread::hardware_concurrency() / 2 : 1;
128         num_workers_ = AdjustNumWorkers(dft_num_workers);
129       } else {
130         // if the users have given their own value, adjust it and provide a warning if it got changed.
131         int32_t num_workers_new = AdjustNumWorkers(num_workers_);
132         if (num_workers_ != num_workers_new) {
133           warning_string =
134             "The configuration of workers on the cache server is dependent on the NUMA architecture of the server. "
135             "The current setting is not the optimal for the NUMA architecture. Re-adjusting the number of workers "
136             "to optimal setting of " +
137             std::to_string(num_workers_new) + ".\n";
138           MS_LOG(INFO) << warning_string;
139         }
140         num_workers_ = num_workers_new;
141       }
142       RETURN_IF_NOT_OK(SanityCheck());
143       // We need to bring up the Task Manager by bringing up the Services singleton.
144       RETURN_IF_NOT_OK(Services::CreateInstance());
145       RETURN_IF_NOT_OK(CacheServer::CreateInstance(top_, num_workers_, port_, shared_memory_sz_in_gb_,
146                                                    memory_cap_ratio_, log_level_, std::move(hw_info_)));
147       return Status(StatusCode::kSuccess, warning_string);
148     }
149 
150    private:
151     std::string top_;
152     int32_t num_workers_;
153     int32_t port_;
154     int32_t shared_memory_sz_in_gb_;
155     float memory_cap_ratio_;
156     int8_t log_level_;
157     std::shared_ptr<CacheServerHW> hw_info_;
158 
159     /// \brief Sanity checks on the shared memory.
160     /// \return Status object
161     Status IpcResourceCleanup();
162 
163     /// \brief Adjust the value of num_workers if it's not the optimal to NUMA architecture.
164     int32_t AdjustNumWorkers(int32_t num_workers);
165   };
166 
167   CacheServer(const CacheServer &) = delete;
168   CacheServer &operator=(const CacheServer &) = delete;
169   CacheServer(CacheServer &&) = delete;
170   CacheServer &operator=(CacheServer &) = delete;
171   Status DoServiceStart() override;
172   Status DoServiceStop() override;
~CacheServer()173   ~CacheServer() override { (void)ServiceStop(); }
174 
CreateInstance(const std::string & spill_path,int32_t num_workers,int32_t port,int32_t shared_memory_sz,float memory_cap_ratio,int8_t log_level,std::shared_ptr<CacheServerHW> hw_info)175   static Status CreateInstance(const std::string &spill_path, int32_t num_workers, int32_t port,
176                                int32_t shared_memory_sz, float memory_cap_ratio, int8_t log_level,
177                                std::shared_ptr<CacheServerHW> hw_info) {
178     std::call_once(init_instance_flag_, [&]() -> Status {
179       auto &SvcManager = Services::GetInstance();
180       RETURN_IF_NOT_OK(SvcManager.AddHook(&instance_, spill_path, num_workers, port, shared_memory_sz, memory_cap_ratio,
181                                           log_level, hw_info));
182       return Status::OK();
183     });
184     return Status::OK();
185   }
186 
GetInstance()187   static CacheServer &GetInstance() { return *instance_; }
188 
189   /// \brief For the current demonstration, a cache client contacts cache server using a Queue.
190   /// \param rq
191   /// \return Status object
PushRequest(int32_t queue_id,CacheServerRequest * rq)192   Status PushRequest(int32_t queue_id, CacheServerRequest *rq) {
193     RETURN_UNEXPECTED_IF_NULL(rq);
194     RETURN_IF_NOT_OK(cache_q_->operator[](queue_id)->Add(rq));
195     return Status::OK();
196   }
197 
198   /// \\brief Kick off server threads. Never return unless error out.
199   Status Run(SharedMessage::queue_id_t msg_qid);
200 
201   /// \brief Get a free tag
202   /// \param q[in] pointer to a pointer to a CacheServerRequest
203   /// \return Status object
204   static Status GetFreeRequestTag(CacheServerRequest **q);
205 
206   /// \brief Return a tag to the free list
207   /// \param p[in] pointer to already finished CacheServerRequest tag
208   /// \return Status object
209   static Status ReturnRequestTag(CacheServerRequest *p);
210 
211   /// Return an instance of the numa control
GetHWControl()212   std::shared_ptr<CacheServerHW> GetHWControl() { return hw_info_; }
213 
214   /// \brief Set CPU affinity
SetAffinity(const Task & tk,numa_id_t numa_node)215   Status SetAffinity(const Task &tk, numa_id_t numa_node) { return hw_info_->SetAffinity(tk, numa_node); }
216 
217   /// \brief return number of workers
GetNumWorkers()218   auto GetNumWorkers() const { return num_workers_; }
219 
220   /// \brief return number of grpc workers
GetNumGrpcWorkers()221   auto GetNumGrpcWorkers() const { return num_grpc_workers_; }
222 
223   /// \brief return number of numa nodes
GetNumaNodeCount()224   auto GetNumaNodeCount() const { return hw_info_->GetNumaNodeCount(); }
225 
226   /// \brief Assign a worker by a numa id
227   /// \return worker id
228   worker_id_t GetWorkerByNumaId(numa_id_t node_id) const;
229 
230   /// \brief Randomly pick a worker
231   /// \return worker id
232   worker_id_t GetRandomWorker() const;
233 
234   /// \brief Check if we bind threads to numa cores
IsNumaAffinityOn()235   bool IsNumaAffinityOn() const { return numa_affinity_; }
236 
237   /// \brief Return the memory cap ratio
GetMemoryCapRatio()238   float GetMemoryCapRatio() const { return memory_cap_ratio_; }
239 
240   /// \brief Function to handle a row request
241   /// \param[in] cache_req A row request to handle
242   /// \param[out] internal_request Indicator if the request is an internal request
243   /// \return Status object
244   Status ProcessRowRequest(CacheServerRequest *cache_req, bool *internal_request);
245 
246   /// \brief Function to handle an admin request
247   /// \param[in] cache_req An admin request to handle
248   /// \return Status object
249   Status ProcessAdminRequest(CacheServerRequest *cache_req);
250 
251   /// \brief Function to handle a session request
252   /// \param[in] cache_req A session request to handle
253   /// \return Status object
254   Status ProcessSessionRequest(CacheServerRequest *cache_req);
255 
256   /// \brief How a request is handled.
257   /// \note that it can be process immediately by a grpc thread or routed to a server thread
258   /// which is pinned to some numa node core.
259   Status ProcessRequest(CacheServerRequest *cache_req);
260 
261   void GlobalShutdown();
262 
263   /// \brief This returns where we attach to the shared memory.
264   /// Some gRPC requests will ask for a shared memory block, and
265   /// we can't return the absolute address as this makes no sense
266   /// in the client. So instead we will return an address relative
267   /// to the base address of the shared memory where we attach to.
268   /// \return Base address of the shared memory.
SharedMemoryBaseAddr()269   const void *SharedMemoryBaseAddr() const { return shm_->SharedMemoryBaseAddr(); }
270 
271   /// \brief Return the public key of the shared memory.
GetKey()272   int32_t GetKey() const { return shm_->GetKey(); }
273 
274   Status AllocateSharedMemory(int32_t client_id, size_t sz, void **p);
275 
276   void DeallocateSharedMemory(int32_t client_id, void *p);
277 
278  private:
279   static std::once_flag init_instance_flag_;
280   static CacheServer *instance_;
281   mutable RWLock rwLock_;
282   mutable RWLock sessions_lock_;
283   std::string top_;
284   cache_index all_caches_;
285   std::set<session_id_type> active_sessions_;
286   std::shared_ptr<QueueList<CacheServerRequest *>> cache_q_;
287   std::shared_ptr<CacheServerGreeterImpl> comm_layer_;
288   TaskGroup vg_;
289   int32_t num_workers_;
290   int32_t num_grpc_workers_;
291   int32_t port_;
292   int32_t shared_memory_sz_in_gb_;
293   int8_t log_level_;  // log_level is saved here for informational purpose only. It's not a functional field.
294   std::atomic<bool> global_shutdown_;
295   float memory_cap_ratio_;
296   std::shared_ptr<CacheServerHW> hw_info_;
297   std::map<worker_id_t, Task *> numa_tasks_;
298   bool numa_affinity_;
299   std::vector<int32_t> shutdown_qIDs_;
300   std::unique_ptr<CachedSharedMemory> shm_;
301 
302   /// \brief Constructor
303   /// \param spill_path Top directory for spilling buffers to.
304   /// \param num_workers Number of threads for handling requests.
305   explicit CacheServer(const std::string &spill_path, int32_t num_workers, int32_t port, int32_t share_memory_sz_in_gb,
306                        float memory_cap_ratio, int8_t log_level, std::shared_ptr<CacheServerHW> hw_info);
307 
308   /// \brief Locate a cache service from connection id.
309   /// \return Pointer to cache service. Null if not found
310   CacheService *GetService(connection_id_type id) const;
311 
312   /// \brief Going over existing cache service and calculate how much we have consumed so far, a new cache service
313   /// can only be created if there is still enough avail memory left
314   /// \param cache_mem_sz Requested memory for a new cache service
315   /// \return Status object
316   Status GlobalMemoryCheck(uint64_t cache_mem_sz);
317 
318   /// \brief Create a cache service. We allow multiple clients to create the same cache service.
319   /// Subsequent duplicate requests are ignored. The first cache client to create the service will be given
320   /// a special unique cookie.
321   /// \return Status object
322   Status CreateService(CacheRequest *rq, CacheReply *reply);
323 
324   /// \brief Destroy a cache service
325   /// \param rq
326   /// \return Status object
327   Status DestroyCache(CacheRequest *rq);
328 
329   /// \brief Entry point for all internal server threads.
330   Status ServerRequest(worker_id_t worker_id);
331 
332   /// \brief Entry point for all grpc threads.
333   /// \return
334   Status RpcRequest(worker_id_t worker_id);
335 
336   Status DestroySession(CacheRequest *rq);
337 
338   /// \brief Create a connection id from a session id and a crc
339   /// \param session_id
340   /// \param crc
341   /// \return connection id
342   connection_id_type GetConnectionID(session_id_type session_id, uint32_t crc) const;
343 
344   /// \brief Extract the session id from a connection id
345   /// \param connection_id
346   /// \return session id
347   session_id_type GetSessionID(connection_id_type connection_id) const;
348 
349   /// \brief Generate a session ID for the client
350   /// \return Session ID
351   session_id_type GenerateSessionID();
352 
353   /// \brief Handle kAllocateSharedBlock request
354   /// \param rq CacheRequest
355   /// \param reply CacheReply
356   /// \return Status object
357   Status AllocateSharedMemory(CacheRequest *rq, CacheReply *reply);
358 
359   /// \brief Handle kFreeSharedBlock request
360   /// \param rq
361   /// \return Status object
362   Status FreeSharedMemory(CacheRequest *rq);
363 
364   /// \brief Handle CacheRow request
365   /// \note There are two different implementation depends if shared memory is used for transportation.
366   /// \return Status object
367   Status FastCacheRow(CacheRequest *rq, CacheReply *reply);
368   Status CacheRow(CacheRequest *rq, CacheReply *reply);
369 
370   /// \brief Internal function to get statistics
371   /// \param rq
372   /// \param reply
373   /// \return Status object
374   Status GetStat(CacheRequest *rq, CacheReply *reply);
375 
376   /// \brief Internal function to get cache state
377   /// \param rq
378   /// \param reply
379   /// \return Status object
380   Status GetCacheState(CacheRequest *rq, CacheReply *reply);
381 
382   /// \brief Cache a schema request
383   /// \param rq
384   /// \return Status object
385   Status CacheSchema(CacheRequest *rq);
386 
387   /// \brief Fetch a schema request
388   /// \param rq
389   /// \param reply
390   /// \return Status object
391   Status FetchSchema(CacheRequest *rq, CacheReply *reply);
392 
393   /// \brief Mark Build phase done (for non-mappable case)
394   /// \param rq
395   /// \return Status object
396   Status BuildPhaseDone(CacheRequest *rq);
397 
398   /// \brief A proper shutdown of the server
399   /// \return Status object
400   Status AcknowledgeShutdown(CacheServerRequest *cache_req);
401 
402   /// \brief Find keys that will be cache miss
403   /// \return Status object
404   Status GetCacheMissKeys(CacheRequest *rq, CacheReply *reply);
405 
406   /// \brief Toggle write mode for a service
407   Status ToggleWriteMode(CacheRequest *rq);
408 
409   /// \brief List the sessions and their caches
410   /// \param reply
411   /// \return Status object
412   Status ListSessions(CacheReply *reply);
413 
414   /// \brief Connect request by a pipeline
415   Status ConnectReset(CacheRequest *rq);
416 
417   /// \brief This is an internal structure used by Batch processing.
418   /// This is how it works internally. For batch fetch/cache, the grpc thread
419   /// will intercept the request and breaks it down into multiple internal requests
420   /// and spread over all the server workers. But each internal request consumes
421   /// one free tag and we may run out of free tags if they don't return promptly.
422   /// So we will let the server thread return the free tag immediately but the put
423   /// the return code in this following structure. GRPC thread must wait until all
424   /// the rc come back.
425   class BatchWait : public std::enable_shared_from_this<BatchWait> {
426    public:
BatchWait(int n)427     explicit BatchWait(int n) : expected_(n), num_back_(0) {
428       expected_ = n;
429       rc_lists_.reserve(expected_);
430     }
431     ~BatchWait() = default;
432 
GetBatchWait()433     std::weak_ptr<BatchWait> GetBatchWait() { return weak_from_this(); }
434 
Set(Status rc)435     Status Set(Status rc) {
436       CHECK_FAIL_RETURN_UNEXPECTED(expected_ > num_back_, "Programming error");
437       std::unique_lock<std::mutex> lck(mux_);
438       rc_lists_.push_back(std::move(rc));
439       ++num_back_;
440       if (num_back_ == expected_) {
441         wp_.Set();
442       }
443       return Status::OK();
444     }
445 
Wait()446     Status Wait() { return wp_.Wait(); }
447 
GetRc()448     Status GetRc() {
449       Status rc;
450       for (auto &cache_rc : rc_lists_) {
451         if (cache_rc.IsError() && cache_rc != StatusCode::kMDInterrupted && rc.IsOk()) {
452           rc = cache_rc;
453         }
454       }
455       return rc;
456     }
457 
458    private:
459     std::mutex mux_;
460     WaitPost wp_;
461     int64_t expected_;
462     int64_t num_back_;
463     std::vector<Status> rc_lists_;
464   };
465 
466   /// \brief Internal function to do row batch fetch
467   /// \param rq Request
468   /// \param reply Reply
469   /// \return Status object
470   Status BatchFetchRows(CacheRequest *rq, CacheReply *reply);
471 
472   /// \brief Main function to fetch rows in batch. The output is a contiguous memory which will be decoded
473   /// by the CacheClient. Cache miss is not an error, and will be coded in the output to mark an empty row.
474   /// \param[in] v A vector of row id.
475   /// \param[out] out A contiguous memory buffer that holds the requested rows.
476   /// \return Status object
477   Status BatchFetch(const std::shared_ptr<flatbuffers::FlatBufferBuilder> &fbb, WritableSlice *out);
478   Status BatchCacheRows(CacheRequest *rq);
479 
480   Status InternalFetchRow(CacheRequest *rq);
481   Status InternalCacheRow(CacheRequest *rq, CacheReply *reply);
482 };
483 }  // namespace dataset
484 }  // namespace mindspore
485 #endif  // MINDSPORE_CCSRC_MINDDATA_DATASET_CORE_CACHE_TENSOR_H_
486