1 /**
2 * Copyright 2020-2021 Huawei Technologies Co., Ltd
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "minddata/dataset/engine/cache/cache_server.h"
18
19 #include <sys/types.h>
20 #include <unistd.h>
21 #include <cstdlib>
22 #include <thread>
23 #include <chrono>
24
25 #include "minddata/dataset/engine/cache/cache_common.h"
26 #include "minddata/dataset/engine/cache/cache_ipc.h"
27 #include "minddata/dataset/include/dataset/constants.h"
28 #include "minddata/dataset/util/log_adapter.h"
29
30 namespace ms = mindspore;
31 namespace ds = mindspore::dataset;
32
33 namespace {
34 const int32_t kTotalArgs = 8;
35 enum ArgIndex : uint8_t {
36 kProcessName = 0,
37 kRootDir = 1,
38 kNumWorkers = 2,
39 kPort = 3,
40 kSharedMemorySize = 4,
41 kLogLevel = 5,
42 kDemonize = 6,
43 kMemoryCapRatio = 7
44 };
45
BuildServer(ds::CacheServer::Builder * builder,ds::SharedMessage * msg,int32_t port,bool daemonize)46 ms::Status BuildServer(ds::CacheServer::Builder *builder, ds::SharedMessage *msg, int32_t port, bool daemonize) {
47 if (msg == nullptr) {
48 RETURN_STATUS_UNEXPECTED("msg pointer is nullptr.");
49 }
50 if (builder == nullptr) {
51 RETURN_STATUS_UNEXPECTED("builder pointer is nullptr.");
52 }
53 // Create the instance with some sanity checks built in
54 ms::Status rc = builder->Build();
55 // Dump the summary
56 MS_LOG(INFO) << "Cache server has started successfully and is listening on port " << port << std::endl;
57 MS_LOG(INFO) << builder << std::endl;
58 if (rc.IsOk()) {
59 if (daemonize && !rc.ToString().empty()) {
60 // If we have adjusted the number of workers provided by users, use the message queue to send the warning
61 // message if this is the child daemon.
62 (void)msg->SendStatus(rc);
63 }
64 // If all goes well, kick off the threads. Loop forever and never return unless error.
65 ds::CacheServer &cs = ds::CacheServer::GetInstance();
66 rc = cs.Run(msg->GetMsgQueueId());
67 } else if (daemonize) {
68 // If we didn't pass the sanity check to at least create the instance, use
69 // the message queue to return the error message if this is the child daemon.
70 return msg->SendStatus(rc);
71 }
72 return rc;
73 }
74
Fork(ds::CacheServer::Builder * builder,ds::SharedMessage * msg,int32_t port)75 ms::Status Fork(ds::CacheServer::Builder *builder, ds::SharedMessage *msg, int32_t port) {
76 if (msg == nullptr) {
77 RETURN_STATUS_UNEXPECTED("msg pointer is nullptr.");
78 }
79 pid_t pid = fork();
80 if (pid > 0) {
81 // Parent and will be responsible for remove the queue on exit.
82 msg->RemoveResourcesOnExit();
83 // Sleep one second and we attach to the msg que
84 std::this_thread::sleep_for(std::chrono::seconds(1));
85 ms::Status child_rc;
86 auto rc = msg->ReceiveStatus(&child_rc);
87 std::string warning_string;
88 if (rc.IsError()) {
89 return rc;
90 }
91 if (child_rc.IsError()) {
92 return child_rc;
93 }
94 warning_string = child_rc.ToString();
95 std::cout << "Cache server startup completed successfully!\n";
96 std::cout << "The cache server daemon has been created as process id " << pid << " and listening on port " << port
97 << ".\n";
98 if (!warning_string.empty() && warning_string != ms::Status::OK().ToString()) {
99 std::cout << "WARNING: " << warning_string;
100 }
101 std::cout << "\nRecommendation:\nSince the server is detached into its own daemon process, monitor the server "
102 "logs (under "
103 << ds::DefaultLogDir() << ") for any issues that may happen after startup\n";
104 signal(SIGCHLD, SIG_IGN); // ignore sig child signal.
105 return ms::Status::OK();
106 } else if (pid == 0) {
107 // Child process will continue from here if daemonize and parent has already exited.
108 // If we are running in the foreground, none of the code in block below will be run.
109 pid_t sid;
110 umask(0);
111 sid = setsid();
112 if (sid < 0) {
113 std::string errMsg = "Failed to setsid(). Errno = " + std::to_string(errno);
114 RETURN_STATUS_UNEXPECTED(errMsg);
115 }
116 (void)close(STDIN_FILENO);
117 (void)close(STDOUT_FILENO);
118 (void)close(STDERR_FILENO);
119 return BuildServer(builder, msg, port, true);
120 }
121 // failed to fork
122 std::string errMsg = "Failed to fork process for cache server. Errno = " + std::to_string(errno);
123 RETURN_STATUS_UNEXPECTED(errMsg);
124 }
125 } // namespace
126
127 /// Start the server
128 /// \param argv
129 /// \return Status object
StartServer(int argc,char ** argv)130 ms::Status StartServer(int argc, char **argv) {
131 ms::Status rc;
132 ds::CacheServer::Builder builder;
133 if (argc != kTotalArgs) {
134 return ms::Status(ms::StatusCode::kMDSyntaxError);
135 }
136
137 int32_t port = static_cast<int32_t>(strtol(argv[ArgIndex::kPort], nullptr, ds::kDecimal));
138 builder.SetRootDirectory(argv[ArgIndex::kRootDir])
139 .SetNumWorkers(static_cast<int32_t>(strtol(argv[ArgIndex::kNumWorkers], nullptr, ds::kDecimal)))
140 .SetPort(port)
141 .SetSharedMemorySizeInGB(static_cast<int32_t>(strtol(argv[ArgIndex::kSharedMemorySize], nullptr, ds::kDecimal)))
142 .SetLogLevel(static_cast<int8_t>((strtol(argv[ArgIndex::kLogLevel], nullptr, ds::kDecimal))))
143 .SetMemoryCapRatio(strtof(argv[ArgIndex::kMemoryCapRatio], nullptr));
144
145 auto daemonize_string = argv[ArgIndex::kDemonize];
146 bool daemonize = strcmp(daemonize_string, "true") == 0 || strcmp(daemonize_string, "TRUE") == 0 ||
147 strcmp(daemonize_string, "t") == 0 || strcmp(daemonize_string, "T") == 0;
148
149 // We always change directory to / on unix rather than using the directory where the cache_server
150 // is called. This is a standard procedure for daemonize a process on unix.
151 if (chdir("/") == -1) {
152 std::string errMsg = "Unable to change directory to /. Errno = " + std::to_string(errno);
153 RETURN_STATUS_UNEXPECTED(errMsg);
154 }
155
156 // A message queue for communication between parent and child (if we fork).
157 ds::SharedMessage msg;
158 if (daemonize) {
159 #ifdef USE_GLOG
160 #define google mindspore_private
161 FLAGS_logtostderr = false;
162 FLAGS_log_dir = ds::DefaultLogDir();
163 // Create cache server default log dir
164 ds::Path log_dir = ds::Path(FLAGS_log_dir);
165 rc = log_dir.CreateDirectories();
166 if (rc.IsError()) {
167 return rc;
168 }
169 ms::g_ms_submodule_log_levels[SUBMODULE_ID] =
170 static_cast<int>(strtol(argv[ArgIndex::kLogLevel], nullptr, ds::kDecimal));
171 #undef google
172 #endif
173 rc = msg.Create();
174 if (rc.IsError()) {
175 return rc;
176 }
177 return Fork(&builder, &msg, port);
178 }
179
180 return BuildServer(&builder, &msg, port, daemonize);
181 }
182
main(int argc,char ** argv)183 int main(int argc, char **argv) {
184 // Create the common path for all users
185 ds::Path common_dir = ds::Path(ds::kDefaultCommonPath);
186 ms::Status rc = common_dir.CreateCommonDirectories();
187 if (rc.IsError()) {
188 std::cerr << rc.ToString() << std::endl;
189 return 1;
190 }
191 // This executable is not to be called directly, and should be invoked by cache_admin executable.
192 rc = StartServer(argc, argv);
193 // Check result
194 if (rc.IsError()) {
195 auto errCode = rc.StatusCode();
196 auto errMsg = rc.ToString();
197 std::cerr << errMsg << std::endl;
198 return static_cast<int>(errCode);
199 }
200 return 0;
201 }
202