1 /**
2 * Copyright 2020-2021 Huawei Technologies Co., Ltd
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "minddata/dataset/engine/cache/cache_server.h"
18 #include <sys/types.h>
19 #include <unistd.h>
20 #include <cstdlib>
21 #include <thread>
22 #include <chrono>
23 #include "minddata/dataset/engine/cache/cache_common.h"
24 #include "minddata/dataset/engine/cache/cache_ipc.h"
25 #include "minddata/dataset/include/dataset/constants.h"
26 #include "mindspore/core/utils/log_adapter.h"
27 namespace ms = mindspore;
28 namespace ds = mindspore::dataset;
29
30 namespace {
31 const int32_t kTotalArgs = 8;
32 enum ArgIndex : uint8_t {
33 kProcessName = 0,
34 kRootDir = 1,
35 kNumWorkers = 2,
36 kPort = 3,
37 kSharedMemorySize = 4,
38 kLogLevel = 5,
39 kDemonize = 6,
40 kMemoryCapRatio = 7
41 };
42 } // namespace
43
44 /// Start the server
45 /// \param argv
46 /// \return Status object
StartServer(int argc,char ** argv)47 ms::Status StartServer(int argc, char **argv) {
48 ms::Status rc;
49 ds::CacheServer::Builder builder;
50 if (argc != kTotalArgs) {
51 return ms::Status(ms::StatusCode::kMDSyntaxError);
52 }
53
54 int32_t port = static_cast<int32_t>(strtol(argv[ArgIndex::kPort], nullptr, ds::kDecimal));
55 builder.SetRootDirectory(argv[ArgIndex::kRootDir])
56 .SetNumWorkers(static_cast<int32_t>(strtol(argv[ArgIndex::kNumWorkers], nullptr, ds::kDecimal)))
57 .SetPort(port)
58 .SetSharedMemorySizeInGB(static_cast<int32_t>(strtol(argv[ArgIndex::kSharedMemorySize], nullptr, ds::kDecimal)))
59 .SetLogLevel(static_cast<int8_t>((strtol(argv[ArgIndex::kLogLevel], nullptr, ds::kDecimal))))
60 .SetMemoryCapRatio(strtof(argv[ArgIndex::kMemoryCapRatio], nullptr));
61
62 auto daemonize_string = argv[ArgIndex::kDemonize];
63 bool daemonize = strcmp(daemonize_string, "true") == 0 || strcmp(daemonize_string, "TRUE") == 0 ||
64 strcmp(daemonize_string, "t") == 0 || strcmp(daemonize_string, "T") == 0;
65
66 // We always change directory to / on unix rather than using the directory where the cache_server
67 // is called. This is a standard procedure for daemonize a process on unix.
68 if (chdir("/") == -1) {
69 std::string errMsg = "Unable to change directory to /. Errno = " + std::to_string(errno);
70 return ms::Status(ms::StatusCode::kMDUnexpectedError, __LINE__, __FILE__, errMsg);
71 }
72
73 // A message queue for communication between parent and child (if we fork).
74 ds::SharedMessage msg;
75 if (daemonize) {
76 #ifdef USE_GLOG
77 #define google mindspore_private
78 FLAGS_logtostderr = false;
79 FLAGS_log_dir = ds::DefaultLogDir();
80 // Create cache server default log dir
81 ds::Path log_dir = ds::Path(FLAGS_log_dir);
82 rc = log_dir.CreateDirectories();
83 if (rc.IsError()) {
84 return rc;
85 }
86 ms::g_ms_submodule_log_levels[SUBMODULE_ID] =
87 static_cast<int>(strtol(argv[ArgIndex::kLogLevel], nullptr, ds::kDecimal));
88 google::InitGoogleLogging(argv[ArgIndex::kProcessName]);
89 #undef google
90 #endif
91 rc = msg.Create();
92 if (rc.IsError()) {
93 return rc;
94 }
95 pid_t pid = fork();
96 // failed to fork
97 if (pid < 0) {
98 std::string errMsg = "Failed to fork process for cache server. Errno = " + std::to_string(errno);
99 return ms::Status(ms::StatusCode::kMDUnexpectedError, __LINE__, __FILE__, errMsg);
100 } else if (pid > 0) {
101 // Parent and will be responsible for remove the queue on exit.
102 msg.RemoveResourcesOnExit();
103 // Sleep one second and we attach to the msg que
104 std::this_thread::sleep_for(std::chrono::seconds(1));
105 ms::Status child_rc;
106 rc = msg.ReceiveStatus(&child_rc);
107 std::string warning_string;
108 if (rc.IsError()) {
109 return rc;
110 }
111 if (child_rc.IsError()) {
112 return child_rc;
113 }
114 warning_string = child_rc.ToString();
115 std::cout << "Cache server startup completed successfully!\n";
116 std::cout << "The cache server daemon has been created as process id " << pid << " and listening on port " << port
117 << ".\n";
118 if (!warning_string.empty()) std::cout << "WARNING: " << warning_string;
119 std::cout << "\nRecommendation:\nSince the server is detached into its own daemon process, monitor the server "
120 "logs (under "
121 << ds::DefaultLogDir() << ") for any issues that may happen after startup\n";
122 signal(SIGCHLD, SIG_IGN); // ignore sig child signal.
123 return ms::Status::OK();
124 } else {
125 // Child process will continue from here if daemonize and parent has already exited.
126 // If we are running in the foreground, none of the code in block below will be run.
127 pid_t sid;
128 umask(0);
129 sid = setsid();
130 if (sid < 0) {
131 std::string errMsg = "Failed to setsid(). Errno = " + std::to_string(errno);
132 return ms::Status(ms::StatusCode::kMDUnexpectedError, __LINE__, __FILE__, errMsg);
133 }
134 (void)close(STDIN_FILENO);
135 (void)close(STDOUT_FILENO);
136 (void)close(STDERR_FILENO);
137 }
138 }
139
140 // Create the instance with some sanity checks built in
141 rc = builder.Build();
142 // Dump the summary
143 MS_LOG(INFO) << "Cache server has started successfully and is listening on port " << port << std::endl;
144 MS_LOG(INFO) << builder << std::endl;
145 if (rc.IsOk()) {
146 if (daemonize && !rc.ToString().empty()) {
147 // If we have adjusted the number of workers provided by users, use the message queue to send the warning
148 // message if this is the child daemon.
149 (void)msg.SendStatus(rc);
150 }
151 // If all goes well, kick off the threads. Loop forever and never return unless error.
152 ds::CacheServer &cs = ds::CacheServer::GetInstance();
153 rc = cs.Run(msg.GetMsgQueueId());
154 } else if (daemonize) {
155 // If we didn't pass the sanity check to at least create the instance, use
156 // the message queue to return the error message if this is the child daemon.
157 return msg.SendStatus(rc);
158 }
159 return rc;
160 }
161
main(int argc,char ** argv)162 int main(int argc, char **argv) {
163 // Create the common path for all users
164 ds::Path common_dir = ds::Path(ds::kDefaultCommonPath);
165 ms::Status rc = common_dir.CreateCommonDirectories();
166 if (rc.IsError()) {
167 std::cerr << rc.ToString() << std::endl;
168 return 1;
169 }
170 // This executable is not to be called directly, and should be invoked by cache_admin executable.
171 rc = StartServer(argc, argv);
172 // Check result
173 if (rc.IsError()) {
174 auto errCode = rc.StatusCode();
175 auto errMsg = rc.ToString();
176 std::cerr << errMsg << std::endl;
177 return static_cast<int>(errCode);
178 }
179 return 0;
180 }
181