• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020 Huawei Technologies Co., Ltd
3 
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7 
8  * http://www.apache.org/licenses/LICENSE-2.0
9 
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15 */
16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_COMMON_H_
17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_COMMON_H_
18 
19 /// \note This header file contains common header files and some inlines used by
20 /// both client and server side codes. Do not put code that is not common here.
21 /// There are client and server specific header files.
22 
23 #ifdef ENABLE_CACHE
24 #include <grpcpp/grpcpp.h>
25 #endif
26 #include <string>
27 #include <thread>
28 #ifdef ENABLE_CACHE
29 #include "proto/cache_grpc.grpc.pb.h"
30 #endif
31 #include "proto/cache_grpc.pb.h"
32 #include "minddata/dataset/engine/cache/cache_request.h"
33 #include "minddata/dataset/engine/cache/de_tensor_generated.h"
34 namespace mindspore {
35 namespace dataset {
36 /// \brief CacheRow and BatchFetch requests will switch to use shared memory method (if supported
37 /// on the platform) when the amount of bytes sent is greater than the following number.
38 /// For too small amount, we won't get any benefit using shared memory method because we need
39 /// two rpc requests to use shared memory method.
40 constexpr static int32_t kLocalByPassThreshold = 64 * 1024;
41 /// \brief Default size (in GB) of shared memory we are going to create
42 constexpr static int32_t kDefaultSharedMemorySize = 4;
43 /// \brief Memory Cap ratio used by the server
44 constexpr static float kDefaultMemoryCapRatio = 0.8;
45 /// \brief Default log level of the server
46 constexpr static int32_t kDefaultLogLevel = 1;
47 /// \brief Set num workers to half of num_cpus as the default
48 static const int32_t kDefaultNumWorkers = std::thread::hardware_concurrency() > 2
49                                             ? std::thread::hardware_concurrency() / 2
50                                             : 1;
51 /// \brief A flag used by the BatchFetch request (client side) if it can support local bypass
52 constexpr static uint32_t kLocalClientSupport = 1;
53 /// \brief A flag used by CacheRow request (client side) and BatchFetch (server side) reply to indicate if the data is
54 /// inline in the protobuf. This also implies kLocalClientSupport is also true.
55 constexpr static uint32_t kDataIsInSharedMemory = 2;
56 /// \brief Size of each message used in message queue.
57 constexpr static int32_t kSharedMessageSize = 2048;
58 /// \brief The default common path for all users
59 const char kDefaultCommonPath[] = "/tmp/mindspore";
60 
61 /// \brief State of CacheService at the server.
62 enum class CacheServiceState : int8_t {
63   kNone = 0,
64   kBuildPhase = 1,
65   kFetchPhase = 2,
66   kNoLocking = 3,
67   kOutOfMemory = 4,
68   kNoSpace = 5,
69   kError = 127
70 };
71 
72 /// \brief Convert a Status object into a protobuf
73 /// \param rc[in] Status object
74 /// \param reply[in/out] pointer to pre-allocated protobuf object
Status2CacheReply(const Status & rc,CacheReply * reply)75 inline void Status2CacheReply(const Status &rc, CacheReply *reply) {
76   reply->set_rc(static_cast<int32_t>(rc.StatusCode()));
77   reply->set_msg(rc.ToString());
78 }
79 
80 /// Return the default cache path for a user
DefaultUserDir()81 inline std::string DefaultUserDir() {
82 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
83   char user[LOGIN_NAME_MAX];
84   int rc = getlogin_r(user, sizeof(user));
85   if (rc == 0) {
86     return kDefaultCommonPath + std::string("/") + std::string(user);
87   } else {
88     return kDefaultCommonPath;
89   }
90 #else
91   return kDefaultCommonPath;
92 #endif
93 }
94 
95 /// \brief Generate the unix socket file we use on both client/server side given a tcp/ip port number
96 /// \param port
97 /// \return unix socket url
PortToUnixSocketPath(int port)98 inline std::string PortToUnixSocketPath(int port) {
99   return DefaultUserDir() + std::string("/cache_server_p") + std::to_string(port);
100 }
101 
102 /// \brief Round up to the next 4k
round_up_4K(int64_t sz)103 inline int64_t round_up_4K(int64_t sz) {
104   // Since 4096 is a power of 2, a simple way to round up is add 4095 and mask off all the
105   // bits of 4095
106   return static_cast<uint64_t>(sz + 4095) & ~static_cast<uint64_t>(4095);
107 }
108 
109 /// Memory policy
110 enum CachePoolPolicy : int8_t { kOnNode, kPreferred, kLocal, kInterleave, kNone };
111 
112 /// Misc typedef
113 using worker_id_t = int32_t;
114 using numa_id_t = int32_t;
115 using cpu_id_t = int32_t;
116 
117 /// Return the default log dir for cache
DefaultLogDir()118 inline std::string DefaultLogDir() { return DefaultUserDir() + std::string("/cache/log"); }
119 }  // namespace dataset
120 }  // namespace mindspore
121 #endif  // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_COMMON_H_
122