1 /**
2 * Copyright 2020 Huawei Technologies Co., Ltd
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include <chrono>
17 #include <limits>
18 #include "minddata/dataset/engine/cache/cache_grpc_server.h"
19 #include "minddata/dataset/engine/cache/cache_server.h"
20 #include "minddata/dataset/util/path.h"
21 #include "minddata/dataset/util/task_manager.h"
22 #include "minddata/dataset/util/log_adapter.h"
23
24 namespace mindspore {
25 namespace dataset {
CacheServerGreeterImpl(int32_t port)26 CacheServerGreeterImpl::CacheServerGreeterImpl(int32_t port) : port_(port) {
27 // Setup a path for unix socket.
28 unix_socket_ = PortToUnixSocketPath(port);
29 // We can't generate the ftok key yet until the unix_socket_ is created
30 }
31
Shutdown()32 void CacheServerGreeterImpl::Shutdown() {
33 if (server_) {
34 auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(1);
35 server_->Shutdown(deadline);
36 }
37 // Always shutdown the completion queue after the server.
38 if (cq_) {
39 cq_->Shutdown();
40 void *tag;
41 bool success;
42 while (cq_->Next(&tag, &success)) {
43 delete reinterpret_cast<CacheServerRequest *>(tag);
44 }
45 }
46 }
47
~CacheServerGreeterImpl()48 CacheServerGreeterImpl::~CacheServerGreeterImpl() { Shutdown(); }
49
Run()50 Status CacheServerGreeterImpl::Run() {
51 // To listen on all interfaces, use 0.0.0.0
52 // Future, allow the user to choose listening interface. For now, default to localhost
53 std::string host("127.0.0.1");
54 std::string server_address = host + ":" + std::to_string(port_);
55 grpc::ServerBuilder builder;
56 // Default message size for gRPC is 4MB. Increase it to 2g-1
57 builder.SetMaxReceiveMessageSize(std::numeric_limits<int32_t>::max());
58 #ifdef CACHE_LOCAL_CLIENT
59 int port_local = 0;
60 // We also optimize on local clients on the same machine using unix socket
61 builder.AddListeningPort("unix://" + unix_socket_, grpc::InsecureServerCredentials(), &port_local);
62 #endif
63 builder.RegisterService(&svc_);
64 cq_ = builder.AddCompletionQueue();
65 server_ = builder.BuildAndStart();
66 if (server_) {
67 MS_LOG(INFO) << "Server listening on " << server_address;
68 } else {
69 std::string errMsg = "Fail to start server. ";
70 #ifdef CACHE_LOCAL_CLIENT
71 if (port_local == 0) {
72 errMsg += " Unable to create unix socket " + unix_socket_ + ".";
73 }
74 #endif
75 RETURN_STATUS_UNEXPECTED(errMsg);
76 }
77 return Status::OK();
78 }
79
HandleRequest(int32_t worker_id)80 Status CacheServerGreeterImpl::HandleRequest(int32_t worker_id) {
81 bool success;
82 void *tag;
83 // We loop through the grpc queue. Each connection if successful
84 // will come back with our own tag which is an instance of CacheServerRequest
85 // and we simply call its functor. But first we need to create these instances
86 // and inject them into the grpc queue.
87 CacheServerRequest *p;
88 // Get a free tag from my free list.
89 RETURN_IF_NOT_OK(CacheServer::GetFreeRequestTag(&p));
90 RETURN_IF_NOT_OK((*p)(&svc_, cq_.get()));
91 do {
92 auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(1);
93 // Set a timeout for one second. Check for interrupt if we need to do early exit.
94 auto r = cq_->AsyncNext(&tag, &success, deadline);
95 if (r == grpc::CompletionQueue::NextStatus::GOT_EVENT) {
96 auto rq = static_cast<CacheServerRequest *>(tag);
97 if (success) {
98 if (rq->st_ == CacheServerRequest::STATE::PROCESS) {
99 RETURN_IF_NOT_OK((*rq)(&svc_, cq_.get()));
100 } else if (rq->st_ == CacheServerRequest::STATE::FINISH) {
101 MS_LOG(DEBUG) << *rq << " Finished.";
102 if (rq->type_ == BaseRequest::RequestType::kStopService) {
103 // For cache_admin --stop, ProcessRequest is just acknowledging we receive the request. Now
104 // we call the real function.
105 auto &cs = CacheServer::GetInstance();
106 cs.GlobalShutdown();
107 }
108 RETURN_IF_NOT_OK(CacheServer::ReturnRequestTag(rq));
109 }
110 } else {
111 RETURN_IF_NOT_OK(CacheServer::ReturnRequestTag(rq));
112 }
113 } else if (r == grpc::CompletionQueue::NextStatus::TIMEOUT) {
114 // If we are interrupted, exit. Otherwise wait again.
115 } else {
116 // Queue is drained.
117 break;
118 }
119 } while (!this_thread::is_interrupted());
120 return Status::OK();
121 }
122
operator ()(CacheServerGreeter::AsyncService * svc,grpc::ServerCompletionQueue * cq)123 Status CacheServerRequest::operator()(CacheServerGreeter::AsyncService *svc, grpc::ServerCompletionQueue *cq) {
124 if (st_ == STATE::CREATE) {
125 st_ = STATE::PROCESS;
126 svc->RequestCacheServerRequest(&ctx_, &rq_, &responder_, cq, cq, this);
127 } else if (st_ == STATE::PROCESS) {
128 auto &cs = CacheServer::GetInstance();
129 // Get a new tag and handle the next request before we serve the current request.
130 // The tag will be recycled when its state is changed to FINISH.
131 // The number of free list queues is the same as the number of grpc threads.
132 // Where we get the free list it doesn't matter (as long we return it back to the right queue).
133 // We can round robin, use the qid or even use the worker id. We will use the free list queue
134 // where the current request comes from.
135 CacheServerRequest *next_rq;
136 RETURN_IF_NOT_OK(CacheServer::GetFreeRequestTag(&next_rq));
137 RETURN_IF_NOT_OK((*next_rq)(svc, cq));
138 // Now we continue with the current request.
139 // First thing we need to extract the type from the incoming request.
140 // When this object was first created (i.e. STATE::CREATE), we set the type to UNKNOWN.
141 type_ = static_cast<RequestType>(rq_.type());
142 // Now we pass the address of this instance to CacheServer's main loop.
143 MS_LOG(DEBUG) << "Handle request " << *this;
144 // We will distribute the request evenly (or randomly) over all the numa nodes.
145 // The exception is BatchFetch and BatchCache which we need to pre-process here.
146 // Also some requests are urgent that we want to process them here too.
147 if (type_ == BaseRequest::RequestType::kBatchFetchRows || type_ == BaseRequest::RequestType::kBatchCacheRows ||
148 type_ == BaseRequest::RequestType::kStopService || type_ == BaseRequest::RequestType::kAllocateSharedBlock ||
149 type_ == BaseRequest::RequestType::kFreeSharedBlock) {
150 RETURN_IF_NOT_OK(cs.ProcessRequest(this));
151 // WARNING. After we call ProcessRequest, the memory of 'this' is being recycled by ReturnRequestTag
152 // asynchronously. Further access of 'this' is unpredictable.
153 } else {
154 RETURN_IF_NOT_OK(cs.PushRequest(cs.GetRandomWorker(), this));
155 }
156 } else if (st_ == STATE::FINISH) {
157 // We don't have logic here but moved to the caller.
158 }
159 return Status::OK();
160 }
161
Print(std::ostream & out) const162 void CacheServerRequest::Print(std::ostream &out) const {
163 if (rq_.has_connection_info()) {
164 out << "Session Id: " << rq_.connection_info().session_id() << " CRC: " << rq_.connection_info().crc();
165 } else {
166 out << "Connection Id: " << rq_.connection_id();
167 }
168 out << " ";
169 BaseRequest::Print(out);
170 }
171
MonitorUnixSocket()172 Status CacheServerGreeterImpl::MonitorUnixSocket() {
173 TaskManager::FindMe()->Post();
174 #ifdef CACHE_LOCAL_CLIENT
175 Path p(unix_socket_);
176 do {
177 RETURN_IF_INTERRUPTED();
178 // If the unix socket is recreated for whatever reason, this server instance will be stale and
179 // no other process and communicate with us. In this case we need to shutdown ourselves.
180 if (p.Exists()) {
181 auto &cs = CacheServer::GetInstance();
182 SharedMemory::shm_key_t key;
183 RETURN_IF_NOT_OK(PortToFtok(port_, &key));
184 auto shm_key = cs.GetKey();
185 if (key != shm_key) {
186 std::string errMsg = "Detecting unix socket has changed. Previous key " + std::to_string(shm_key) +
187 ". New key " + std::to_string(key) + ". Shutting down server";
188 MS_LOG(ERROR) << errMsg;
189 RETURN_STATUS_UNEXPECTED(errMsg);
190 }
191 } else {
192 MS_LOG(WARNING) << "Unix socket is removed.";
193 TaskManager::WakeUpWatchDog();
194 }
195 std::this_thread::sleep_for(std::chrono::seconds(kMonitorIntervalInSec));
196 } while (true);
197 #endif
198 return Status::OK();
199 }
200 } // namespace dataset
201 } // namespace mindspore
202