1 /**
2 * Copyright 2020 Huawei Technologies Co., Ltd
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_COMMON_H_
17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_COMMON_H_
18
19 /// \note This header file contains common header files and some inlines used by
20 /// both client and server side codes. Do not put code that is not common here.
21 /// There are client and server specific header files.
22
23 #ifdef ENABLE_CACHE
24 #include <grpcpp/grpcpp.h>
25 #endif
26 #include <string>
27 #include <thread>
28 #ifdef ENABLE_CACHE
29 #include "proto/cache_grpc.grpc.pb.h"
30 #endif
31 #include "proto/cache_grpc.pb.h"
32 #include "minddata/dataset/engine/cache/cache_request.h"
33 #include "minddata/dataset/engine/cache/de_tensor_generated.h"
34 namespace mindspore {
35 namespace dataset {
36 /// \brief CacheRow and BatchFetch requests will switch to use shared memory method (if supported
37 /// on the platform) when the amount of bytes sent is greater than the following number.
38 /// For too small amount, we won't get any benefit using shared memory method because we need
39 /// two rpc requests to use shared memory method.
40 constexpr static int32_t kLocalByPassThreshold = 64 * 1024;
41 /// \brief Default size (in GB) of shared memory we are going to create
42 constexpr static int32_t kDefaultSharedMemorySize = 4;
43 /// \brief Memory Cap ratio used by the server
44 constexpr static float kDefaultMemoryCapRatio = 0.8;
45 /// \brief Default log level of the server
46 constexpr static int32_t kDefaultLogLevel = 1;
47 /// \brief Set num workers to half of num_cpus as the default
48 static const int32_t kDefaultNumWorkers = std::thread::hardware_concurrency() > 2
49 ? std::thread::hardware_concurrency() / 2
50 : 1;
51 /// \brief A flag used by the BatchFetch request (client side) if it can support local bypass
52 constexpr static uint32_t kLocalClientSupport = 1;
53 /// \brief A flag used by CacheRow request (client side) and BatchFetch (server side) reply to indicate if the data is
54 /// inline in the protobuf. This also implies kLocalClientSupport is also true.
55 constexpr static uint32_t kDataIsInSharedMemory = 2;
56 /// \brief Size of each message used in message queue.
57 constexpr static int32_t kSharedMessageSize = 2048;
58 /// \brief The default common path for all users
59 const char kDefaultCommonPath[] = "/tmp/mindspore";
60
61 /// \brief State of CacheService at the server.
62 enum class CacheServiceState : int8_t {
63 kNone = 0,
64 kBuildPhase = 1,
65 kFetchPhase = 2,
66 kNoLocking = 3,
67 kOutOfMemory = 4,
68 kNoSpace = 5,
69 kError = 127
70 };
71
72 /// \brief Convert a Status object into a protobuf
73 /// \param rc[in] Status object
74 /// \param reply[in/out] pointer to pre-allocated protobuf object
Status2CacheReply(const Status & rc,CacheReply * reply)75 inline void Status2CacheReply(const Status &rc, CacheReply *reply) {
76 reply->set_rc(static_cast<int32_t>(rc.StatusCode()));
77 reply->set_msg(rc.ToString());
78 }
79
80 /// Return the default cache path for a user
DefaultUserDir()81 inline std::string DefaultUserDir() {
82 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
83 char user[LOGIN_NAME_MAX];
84 int rc = getlogin_r(user, sizeof(user));
85 if (rc == 0) {
86 return kDefaultCommonPath + std::string("/") + std::string(user);
87 } else {
88 return kDefaultCommonPath;
89 }
90 #else
91 return kDefaultCommonPath;
92 #endif
93 }
94
95 /// \brief Generate the unix socket file we use on both client/server side given a tcp/ip port number
96 /// \param port
97 /// \return unix socket url
PortToUnixSocketPath(int port)98 inline std::string PortToUnixSocketPath(int port) {
99 return DefaultUserDir() + std::string("/cache_server_p") + std::to_string(port);
100 }
101
102 /// \brief Round up to the next 4k
round_up_4K(int64_t sz)103 inline int64_t round_up_4K(int64_t sz) {
104 // Since 4096 is a power of 2, a simple way to round up is add 4095 and mask off all the
105 // bits of 4095
106 return static_cast<uint64_t>(sz + 4095) & ~static_cast<uint64_t>(4095);
107 }
108
109 /// Memory policy
110 enum CachePoolPolicy : int8_t { kOnNode, kPreferred, kLocal, kInterleave, kNone };
111
112 /// Misc typedef
113 using worker_id_t = int32_t;
114 using numa_id_t = int32_t;
115 using cpu_id_t = int32_t;
116
117 /// Return the default log dir for cache
DefaultLogDir()118 inline std::string DefaultLogDir() { return DefaultUserDir() + std::string("/cache/log"); }
119 } // namespace dataset
120 } // namespace mindspore
121 #endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_COMMON_H_
122