• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020-2021 Huawei Technologies Co., Ltd
3 
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7 
8  * http://www.apache.org/licenses/LICENSE-2.0
9 
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15 */
16 
17 #include "minddata/dataset/engine/cache/cache_server.h"
18 
19 #include <sys/types.h>
20 #include <unistd.h>
21 #include <cstdlib>
22 #include <thread>
23 #include <chrono>
24 
25 #include "minddata/dataset/engine/cache/cache_common.h"
26 #include "minddata/dataset/engine/cache/cache_ipc.h"
27 #include "minddata/dataset/include/dataset/constants.h"
28 #include "minddata/dataset/util/log_adapter.h"
29 
30 namespace ms = mindspore;
31 namespace ds = mindspore::dataset;
32 
33 namespace {
34 const int32_t kTotalArgs = 8;
35 enum ArgIndex : uint8_t {
36   kProcessName = 0,
37   kRootDir = 1,
38   kNumWorkers = 2,
39   kPort = 3,
40   kSharedMemorySize = 4,
41   kLogLevel = 5,
42   kDemonize = 6,
43   kMemoryCapRatio = 7
44 };
45 
BuildServer(ds::CacheServer::Builder * builder,ds::SharedMessage * msg,int32_t port,bool daemonize)46 ms::Status BuildServer(ds::CacheServer::Builder *builder, ds::SharedMessage *msg, int32_t port, bool daemonize) {
47   if (msg == nullptr) {
48     RETURN_STATUS_UNEXPECTED("msg pointer is nullptr.");
49   }
50   if (builder == nullptr) {
51     RETURN_STATUS_UNEXPECTED("builder pointer is nullptr.");
52   }
53   // Create the instance with some sanity checks built in
54   ms::Status rc = builder->Build();
55   // Dump the summary
56   MS_LOG(INFO) << "Cache server has started successfully and is listening on port " << port << std::endl;
57   MS_LOG(INFO) << builder << std::endl;
58   if (rc.IsOk()) {
59     if (daemonize && !rc.ToString().empty()) {
60       // If we have adjusted the number of workers provided by users, use the message queue to send the warning
61       // message if this is the child daemon.
62       (void)msg->SendStatus(rc);
63     }
64     // If all goes well, kick off the threads. Loop forever and never return unless error.
65     ds::CacheServer &cs = ds::CacheServer::GetInstance();
66     rc = cs.Run(msg->GetMsgQueueId());
67   } else if (daemonize) {
68     // If we didn't pass the sanity check to at least create the instance, use
69     // the message queue to return the error message if this is the child daemon.
70     return msg->SendStatus(rc);
71   }
72   return rc;
73 }
74 
Fork(ds::CacheServer::Builder * builder,ds::SharedMessage * msg,int32_t port)75 ms::Status Fork(ds::CacheServer::Builder *builder, ds::SharedMessage *msg, int32_t port) {
76   if (msg == nullptr) {
77     RETURN_STATUS_UNEXPECTED("msg pointer is nullptr.");
78   }
79   pid_t pid = fork();
80   if (pid > 0) {
81     // Parent and will be responsible for remove the queue on exit.
82     msg->RemoveResourcesOnExit();
83     // Sleep one second and we attach to the msg que
84     std::this_thread::sleep_for(std::chrono::seconds(1));
85     ms::Status child_rc;
86     auto rc = msg->ReceiveStatus(&child_rc);
87     std::string warning_string;
88     if (rc.IsError()) {
89       return rc;
90     }
91     if (child_rc.IsError()) {
92       return child_rc;
93     }
94     warning_string = child_rc.ToString();
95     std::cout << "Cache server startup completed successfully!\n";
96     std::cout << "The cache server daemon has been created as process id " << pid << " and listening on port " << port
97               << ".\n";
98     if (!warning_string.empty() && warning_string != ms::Status::OK().ToString()) {
99       std::cout << "WARNING: " << warning_string;
100     }
101     std::cout << "\nRecommendation:\nSince the server is detached into its own daemon process, monitor the server "
102                  "logs (under "
103               << ds::DefaultLogDir() << ") for any issues that may happen after startup\n";
104     signal(SIGCHLD, SIG_IGN);  // ignore sig child signal.
105     return ms::Status::OK();
106   } else if (pid == 0) {
107     // Child process will continue from here if daemonize and parent has already exited.
108     // If we are running in the foreground, none of the code in block below will be run.
109     pid_t sid;
110     umask(0);
111     sid = setsid();
112     if (sid < 0) {
113       std::string errMsg = "Failed to setsid(). Errno = " + std::to_string(errno);
114       RETURN_STATUS_UNEXPECTED(errMsg);
115     }
116     (void)close(STDIN_FILENO);
117     (void)close(STDOUT_FILENO);
118     (void)close(STDERR_FILENO);
119     return BuildServer(builder, msg, port, true);
120   }
121   // failed to fork
122   std::string errMsg = "Failed to fork process for cache server. Errno = " + std::to_string(errno);
123   RETURN_STATUS_UNEXPECTED(errMsg);
124 }
125 }  // namespace
126 
127 /// Start the server
128 /// \param argv
129 /// \return Status object
StartServer(int argc,char ** argv)130 ms::Status StartServer(int argc, char **argv) {
131   ms::Status rc;
132   ds::CacheServer::Builder builder;
133   if (argc != kTotalArgs) {
134     return ms::Status(ms::StatusCode::kMDSyntaxError);
135   }
136 
137   int32_t port = static_cast<int32_t>(strtol(argv[ArgIndex::kPort], nullptr, ds::kDecimal));
138   builder.SetRootDirectory(argv[ArgIndex::kRootDir])
139     .SetNumWorkers(static_cast<int32_t>(strtol(argv[ArgIndex::kNumWorkers], nullptr, ds::kDecimal)))
140     .SetPort(port)
141     .SetSharedMemorySizeInGB(static_cast<int32_t>(strtol(argv[ArgIndex::kSharedMemorySize], nullptr, ds::kDecimal)))
142     .SetLogLevel(static_cast<int8_t>((strtol(argv[ArgIndex::kLogLevel], nullptr, ds::kDecimal))))
143     .SetMemoryCapRatio(strtof(argv[ArgIndex::kMemoryCapRatio], nullptr));
144 
145   auto daemonize_string = argv[ArgIndex::kDemonize];
146   bool daemonize = strcmp(daemonize_string, "true") == 0 || strcmp(daemonize_string, "TRUE") == 0 ||
147                    strcmp(daemonize_string, "t") == 0 || strcmp(daemonize_string, "T") == 0;
148 
149   // We always change directory to / on unix rather than using the directory where the cache_server
150   // is called. This is a standard procedure for daemonize a process on unix.
151   if (chdir("/") == -1) {
152     std::string errMsg = "Unable to change directory to /. Errno = " + std::to_string(errno);
153     RETURN_STATUS_UNEXPECTED(errMsg);
154   }
155 
156   // A message queue for communication between parent and child (if we fork).
157   ds::SharedMessage msg;
158   if (daemonize) {
159 #ifdef USE_GLOG
160 #define google mindspore_private
161     FLAGS_logtostderr = false;
162     FLAGS_log_dir = ds::DefaultLogDir();
163     // Create cache server default log dir
164     ds::Path log_dir = ds::Path(FLAGS_log_dir);
165     rc = log_dir.CreateDirectories();
166     if (rc.IsError()) {
167       return rc;
168     }
169     ms::g_ms_submodule_log_levels[SUBMODULE_ID] =
170       static_cast<int>(strtol(argv[ArgIndex::kLogLevel], nullptr, ds::kDecimal));
171 #undef google
172 #endif
173     rc = msg.Create();
174     if (rc.IsError()) {
175       return rc;
176     }
177     return Fork(&builder, &msg, port);
178   }
179 
180   return BuildServer(&builder, &msg, port, daemonize);
181 }
182 
main(int argc,char ** argv)183 int main(int argc, char **argv) {
184   // Create the common path for all users
185   ds::Path common_dir = ds::Path(ds::kDefaultCommonPath);
186   ms::Status rc = common_dir.CreateCommonDirectories();
187   if (rc.IsError()) {
188     std::cerr << rc.ToString() << std::endl;
189     return 1;
190   }
191   // This executable is not to be called directly, and should be invoked by cache_admin executable.
192   rc = StartServer(argc, argv);
193   // Check result
194   if (rc.IsError()) {
195     auto errCode = rc.StatusCode();
196     auto errMsg = rc.ToString();
197     std::cerr << errMsg << std::endl;
198     return static_cast<int>(errCode);
199   }
200   return 0;
201 }
202