1 /**
2 * Copyright 2020 Huawei Technologies Co., Ltd
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include <chrono>
17 #include <limits>
18 #include "minddata/dataset/engine/cache/cache_grpc_server.h"
19 #include "minddata/dataset/engine/cache/cache_server.h"
20 #include "minddata/dataset/util/path.h"
21 #include "minddata/dataset/util/task_manager.h"
22 #include "minddata/dataset/util/log_adapter.h"
23
24 namespace mindspore {
25 namespace dataset {
CacheServerGreeterImpl(int32_t port)26 CacheServerGreeterImpl::CacheServerGreeterImpl(int32_t port) : port_(port) {
27 // Setup a path for unix socket.
28 unix_socket_ = PortToUnixSocketPath(port);
29 // We can't generate the ftok key yet until the unix_socket_ is created
30 }
31
Shutdown()32 void CacheServerGreeterImpl::Shutdown() {
33 if (server_) {
34 auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(1);
35 server_->Shutdown(deadline);
36 }
37 // Always shutdown the completion queue after the server.
38 if (cq_) {
39 cq_->Shutdown();
40 void *tag;
41 bool success;
42 while (cq_->Next(&tag, &success)) {
43 delete reinterpret_cast<CacheServerRequest *>(tag);
44 }
45 }
46 }
47
~CacheServerGreeterImpl()48 CacheServerGreeterImpl::~CacheServerGreeterImpl() { Shutdown(); }
49
Run()50 Status CacheServerGreeterImpl::Run() {
51 // To listen on all interfaces, use 0.0.0.0
52 // Future, allow the user to choose listening interface. For now, default to localhost
53 std::string host("127.0.0.1");
54 std::string server_address = host + ":" + std::to_string(port_);
55 grpc::ServerBuilder builder;
56 // Default message size for gRPC is 4MB. Increase it to 2g-1
57 builder.SetMaxReceiveMessageSize(std::numeric_limits<int32_t>::max());
58 int port_tcpip = 0;
59 #ifdef CACHE_LOCAL_CLIENT
60 int port_local = 0;
61 // We also optimize on local clients on the same machine using unix socket
62 builder.AddListeningPort("unix://" + unix_socket_, grpc::InsecureServerCredentials(), &port_local);
63 #endif
64 builder.AddListeningPort(server_address, grpc::InsecureServerCredentials(), &port_tcpip);
65 builder.RegisterService(&svc_);
66 cq_ = builder.AddCompletionQueue();
67 server_ = builder.BuildAndStart();
68 if (server_) {
69 MS_LOG(INFO) << "Server listening on " << server_address;
70 } else {
71 std::string errMsg = "Fail to start server. ";
72 if (port_tcpip != port_) {
73 errMsg += "Unable to bind to tcpip port " + std::to_string(port_) + ".";
74 }
75 #ifdef CACHE_LOCAL_CLIENT
76 if (port_local == 0) {
77 errMsg += " Unable to create unix socket " + unix_socket_ + ".";
78 }
79 #endif
80 RETURN_STATUS_UNEXPECTED(errMsg);
81 }
82 return Status::OK();
83 }
84
HandleRequest(int32_t worker_id)85 Status CacheServerGreeterImpl::HandleRequest(int32_t worker_id) {
86 bool success;
87 void *tag;
88 // We loop through the grpc queue. Each connection if successful
89 // will come back with our own tag which is an instance of CacheServerRequest
90 // and we simply call its functor. But first we need to create these instances
91 // and inject them into the grpc queue.
92 CacheServerRequest *p;
93 // Get a free tag from my free list.
94 RETURN_IF_NOT_OK(CacheServer::GetFreeRequestTag(&p));
95 RETURN_IF_NOT_OK((*p)(&svc_, cq_.get()));
96 do {
97 auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(1);
98 // Set a timeout for one second. Check for interrupt if we need to do early exit.
99 auto r = cq_->AsyncNext(&tag, &success, deadline);
100 if (r == grpc::CompletionQueue::NextStatus::GOT_EVENT) {
101 auto rq = static_cast<CacheServerRequest *>(tag);
102 if (success) {
103 if (rq->st_ == CacheServerRequest::STATE::PROCESS) {
104 RETURN_IF_NOT_OK((*rq)(&svc_, cq_.get()));
105 } else if (rq->st_ == CacheServerRequest::STATE::FINISH) {
106 MS_LOG(DEBUG) << *rq << " Finished.";
107 if (rq->type_ == BaseRequest::RequestType::kStopService) {
108 // For cache_admin --stop, ProcessRequest is just acknowledging we receive the request. Now
109 // we call the real function.
110 auto &cs = CacheServer::GetInstance();
111 cs.GlobalShutdown();
112 }
113 RETURN_IF_NOT_OK(CacheServer::ReturnRequestTag(rq));
114 }
115 } else {
116 RETURN_IF_NOT_OK(CacheServer::ReturnRequestTag(rq));
117 }
118 } else if (r == grpc::CompletionQueue::NextStatus::TIMEOUT) {
119 // If we are interrupted, exit. Otherwise wait again.
120 } else {
121 // Queue is drained.
122 break;
123 }
124 } while (!this_thread::is_interrupted());
125 return Status::OK();
126 }
127
operator ()(CacheServerGreeter::AsyncService * svc,grpc::ServerCompletionQueue * cq)128 Status CacheServerRequest::operator()(CacheServerGreeter::AsyncService *svc, grpc::ServerCompletionQueue *cq) {
129 if (st_ == STATE::CREATE) {
130 st_ = STATE::PROCESS;
131 svc->RequestCacheServerRequest(&ctx_, &rq_, &responder_, cq, cq, this);
132 } else if (st_ == STATE::PROCESS) {
133 auto &cs = CacheServer::GetInstance();
134 // Get a new tag and handle the next request before we serve the current request.
135 // The tag will be recycled when its state is changed to FINISH.
136 // The number of free list queues is the same as the number of grpc threads.
137 // Where we get the free list it doesn't matter (as long we return it back to the right queue).
138 // We can round robin, use the qid or even use the worker id. We will use the free list queue
139 // where the current request comes from.
140 CacheServerRequest *next_rq;
141 RETURN_IF_NOT_OK(CacheServer::GetFreeRequestTag(&next_rq));
142 RETURN_IF_NOT_OK((*next_rq)(svc, cq));
143 // Now we continue with the current request.
144 // First thing we need to extract the type from the incoming request.
145 // When this object was first created (i.e. STATE::CREATE), we set the type to UNKNOWN.
146 type_ = static_cast<RequestType>(rq_.type());
147 // Now we pass the address of this instance to CacheServer's main loop.
148 MS_LOG(DEBUG) << "Handle request " << *this;
149 // We will distribute the request evenly (or randomly) over all the numa nodes.
150 // The exception is BatchFetch and BatchCache which we need to pre-process here.
151 // Also some requests are urgent that we want to process them here too.
152 if (type_ == BaseRequest::RequestType::kBatchFetchRows || type_ == BaseRequest::RequestType::kBatchCacheRows ||
153 type_ == BaseRequest::RequestType::kStopService || type_ == BaseRequest::RequestType::kAllocateSharedBlock ||
154 type_ == BaseRequest::RequestType::kFreeSharedBlock) {
155 RETURN_IF_NOT_OK(cs.ProcessRequest(this));
156 // WARNING. After we call ProcessRequest, the memory of 'this' is being recycled by ReturnRequestTag
157 // asynchronously. Further access of 'this' is unpredictable.
158 } else {
159 RETURN_IF_NOT_OK(cs.PushRequest(cs.GetRandomWorker(), this));
160 }
161 } else if (st_ == STATE::FINISH) {
162 // We don't have logic here but moved to the caller.
163 }
164 return Status::OK();
165 }
166
Print(std::ostream & out) const167 void CacheServerRequest::Print(std::ostream &out) const {
168 if (rq_.has_connection_info()) {
169 out << "Session Id: " << rq_.connection_info().session_id() << " CRC: " << rq_.connection_info().crc();
170 } else {
171 out << "Connection Id: " << rq_.connection_id();
172 }
173 out << " ";
174 BaseRequest::Print(out);
175 }
176
MonitorUnixSocket()177 Status CacheServerGreeterImpl::MonitorUnixSocket() {
178 TaskManager::FindMe()->Post();
179 #ifdef CACHE_LOCAL_CLIENT
180 Path p(unix_socket_);
181 do {
182 RETURN_IF_INTERRUPTED();
183 // If the unix socket is recreated for whatever reason, this server instance will be stale and
184 // no other process and communicate with us. In this case we need to shutdown ourselves.
185 if (p.Exists()) {
186 auto &cs = CacheServer::GetInstance();
187 SharedMemory::shm_key_t key;
188 RETURN_IF_NOT_OK(PortToFtok(port_, &key));
189 auto shm_key = cs.GetKey();
190 if (key != shm_key) {
191 std::string errMsg = "Detecting unix socket has changed. Previous key " + std::to_string(shm_key) +
192 ". New key " + std::to_string(key) + ". Shutting down server";
193 MS_LOG(ERROR) << errMsg;
194 RETURN_STATUS_UNEXPECTED(errMsg);
195 }
196 } else {
197 MS_LOG(WARNING) << "Unix socket is removed.";
198 TaskManager::WakeUpWatchDog();
199 }
200 std::this_thread::sleep_for(std::chrono::seconds(kMonitorIntervalInSec));
201 } while (true);
202 #endif
203 return Status::OK();
204 }
205 } // namespace dataset
206 } // namespace mindspore
207