• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020-2021 Huawei Technologies Co., Ltd
3 
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7 
8  * http://www.apache.org/licenses/LICENSE-2.0
9 
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15 */
16 
17 #include "minddata/dataset/engine/cache/cache_server.h"
18 #include <sys/types.h>
19 #include <unistd.h>
20 #include <cstdlib>
21 #include <thread>
22 #include <chrono>
23 #include "minddata/dataset/engine/cache/cache_common.h"
24 #include "minddata/dataset/engine/cache/cache_ipc.h"
25 #include "minddata/dataset/include/dataset/constants.h"
26 #include "mindspore/core/utils/log_adapter.h"
27 namespace ms = mindspore;
28 namespace ds = mindspore::dataset;
29 
30 namespace {
31 const int32_t kTotalArgs = 8;
32 enum ArgIndex : uint8_t {
33   kProcessName = 0,
34   kRootDir = 1,
35   kNumWorkers = 2,
36   kPort = 3,
37   kSharedMemorySize = 4,
38   kLogLevel = 5,
39   kDemonize = 6,
40   kMemoryCapRatio = 7
41 };
42 }  // namespace
43 
44 /// Start the server
45 /// \param argv
46 /// \return Status object
StartServer(int argc,char ** argv)47 ms::Status StartServer(int argc, char **argv) {
48   ms::Status rc;
49   ds::CacheServer::Builder builder;
50   if (argc != kTotalArgs) {
51     return ms::Status(ms::StatusCode::kMDSyntaxError);
52   }
53 
54   int32_t port = static_cast<int32_t>(strtol(argv[ArgIndex::kPort], nullptr, ds::kDecimal));
55   builder.SetRootDirectory(argv[ArgIndex::kRootDir])
56     .SetNumWorkers(static_cast<int32_t>(strtol(argv[ArgIndex::kNumWorkers], nullptr, ds::kDecimal)))
57     .SetPort(port)
58     .SetSharedMemorySizeInGB(static_cast<int32_t>(strtol(argv[ArgIndex::kSharedMemorySize], nullptr, ds::kDecimal)))
59     .SetLogLevel(static_cast<int8_t>((strtol(argv[ArgIndex::kLogLevel], nullptr, ds::kDecimal))))
60     .SetMemoryCapRatio(strtof(argv[ArgIndex::kMemoryCapRatio], nullptr));
61 
62   auto daemonize_string = argv[ArgIndex::kDemonize];
63   bool daemonize = strcmp(daemonize_string, "true") == 0 || strcmp(daemonize_string, "TRUE") == 0 ||
64                    strcmp(daemonize_string, "t") == 0 || strcmp(daemonize_string, "T") == 0;
65 
66   // We always change directory to / on unix rather than using the directory where the cache_server
67   // is called. This is a standard procedure for daemonize a process on unix.
68   if (chdir("/") == -1) {
69     std::string errMsg = "Unable to change directory to /. Errno = " + std::to_string(errno);
70     return ms::Status(ms::StatusCode::kMDUnexpectedError, __LINE__, __FILE__, errMsg);
71   }
72 
73   // A message queue for communication between parent and child (if we fork).
74   ds::SharedMessage msg;
75   if (daemonize) {
76 #ifdef USE_GLOG
77 #define google mindspore_private
78     FLAGS_logtostderr = false;
79     FLAGS_log_dir = ds::DefaultLogDir();
80     // Create cache server default log dir
81     ds::Path log_dir = ds::Path(FLAGS_log_dir);
82     rc = log_dir.CreateDirectories();
83     if (rc.IsError()) {
84       return rc;
85     }
86     ms::g_ms_submodule_log_levels[SUBMODULE_ID] =
87       static_cast<int>(strtol(argv[ArgIndex::kLogLevel], nullptr, ds::kDecimal));
88     google::InitGoogleLogging(argv[ArgIndex::kProcessName]);
89 #undef google
90 #endif
91     rc = msg.Create();
92     if (rc.IsError()) {
93       return rc;
94     }
95     pid_t pid = fork();
96     // failed to fork
97     if (pid < 0) {
98       std::string errMsg = "Failed to fork process for cache server. Errno = " + std::to_string(errno);
99       return ms::Status(ms::StatusCode::kMDUnexpectedError, __LINE__, __FILE__, errMsg);
100     } else if (pid > 0) {
101       // Parent and will be responsible for remove the queue on exit.
102       msg.RemoveResourcesOnExit();
103       // Sleep one second and we attach to the msg que
104       std::this_thread::sleep_for(std::chrono::seconds(1));
105       ms::Status child_rc;
106       rc = msg.ReceiveStatus(&child_rc);
107       std::string warning_string;
108       if (rc.IsError()) {
109         return rc;
110       }
111       if (child_rc.IsError()) {
112         return child_rc;
113       }
114       warning_string = child_rc.ToString();
115       std::cout << "Cache server startup completed successfully!\n";
116       std::cout << "The cache server daemon has been created as process id " << pid << " and listening on port " << port
117                 << ".\n";
118       if (!warning_string.empty()) std::cout << "WARNING: " << warning_string;
119       std::cout << "\nRecommendation:\nSince the server is detached into its own daemon process, monitor the server "
120                    "logs (under "
121                 << ds::DefaultLogDir() << ") for any issues that may happen after startup\n";
122       signal(SIGCHLD, SIG_IGN);  // ignore sig child signal.
123       return ms::Status::OK();
124     } else {
125       // Child process will continue from here if daemonize and parent has already exited.
126       // If we are running in the foreground, none of the code in block below will be run.
127       pid_t sid;
128       umask(0);
129       sid = setsid();
130       if (sid < 0) {
131         std::string errMsg = "Failed to setsid(). Errno = " + std::to_string(errno);
132         return ms::Status(ms::StatusCode::kMDUnexpectedError, __LINE__, __FILE__, errMsg);
133       }
134       (void)close(STDIN_FILENO);
135       (void)close(STDOUT_FILENO);
136       (void)close(STDERR_FILENO);
137     }
138   }
139 
140   // Create the instance with some sanity checks built in
141   rc = builder.Build();
142   // Dump the summary
143   MS_LOG(INFO) << "Cache server has started successfully and is listening on port " << port << std::endl;
144   MS_LOG(INFO) << builder << std::endl;
145   if (rc.IsOk()) {
146     if (daemonize && !rc.ToString().empty()) {
147       // If we have adjusted the number of workers provided by users, use the message queue to send the warning
148       // message if this is the child daemon.
149       (void)msg.SendStatus(rc);
150     }
151     // If all goes well, kick off the threads. Loop forever and never return unless error.
152     ds::CacheServer &cs = ds::CacheServer::GetInstance();
153     rc = cs.Run(msg.GetMsgQueueId());
154   } else if (daemonize) {
155     // If we didn't pass the sanity check to at least create the instance, use
156     // the message queue to return the error message if this is the child daemon.
157     return msg.SendStatus(rc);
158   }
159   return rc;
160 }
161 
main(int argc,char ** argv)162 int main(int argc, char **argv) {
163   // Create the common path for all users
164   ds::Path common_dir = ds::Path(ds::kDefaultCommonPath);
165   ms::Status rc = common_dir.CreateCommonDirectories();
166   if (rc.IsError()) {
167     std::cerr << rc.ToString() << std::endl;
168     return 1;
169   }
170   // This executable is not to be called directly, and should be invoked by cache_admin executable.
171   rc = StartServer(argc, argv);
172   // Check result
173   if (rc.IsError()) {
174     auto errCode = rc.StatusCode();
175     auto errMsg = rc.ToString();
176     std::cerr << errMsg << std::endl;
177     return static_cast<int>(errCode);
178   }
179   return 0;
180 }
181