• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020 Huawei Technologies Co., Ltd
3 
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7 
8  * http://www.apache.org/licenses/LICENSE-2.0
9 
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15 */
16 #include <chrono>
17 #include <limits>
18 #include "minddata/dataset/engine/cache/cache_grpc_server.h"
19 #include "minddata/dataset/engine/cache/cache_server.h"
20 #include "minddata/dataset/util/path.h"
21 #include "minddata/dataset/util/task_manager.h"
22 #include "minddata/dataset/util/log_adapter.h"
23 
24 namespace mindspore {
25 namespace dataset {
CacheServerGreeterImpl(int32_t port)26 CacheServerGreeterImpl::CacheServerGreeterImpl(int32_t port) : port_(port) {
27   // Setup a path for unix socket.
28   unix_socket_ = PortToUnixSocketPath(port);
29   // We can't generate the ftok key yet until the unix_socket_ is created
30 }
31 
Shutdown()32 void CacheServerGreeterImpl::Shutdown() {
33   if (server_) {
34     auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(1);
35     server_->Shutdown(deadline);
36   }
37   // Always shutdown the completion queue after the server.
38   if (cq_) {
39     cq_->Shutdown();
40     void *tag;
41     bool success;
42     while (cq_->Next(&tag, &success)) {
43       delete reinterpret_cast<CacheServerRequest *>(tag);
44     }
45   }
46 }
47 
~CacheServerGreeterImpl()48 CacheServerGreeterImpl::~CacheServerGreeterImpl() { Shutdown(); }
49 
Run()50 Status CacheServerGreeterImpl::Run() {
51   // To listen on all interfaces, use 0.0.0.0
52   // Future, allow the user to choose listening interface.  For now, default to localhost
53   std::string host("127.0.0.1");
54   std::string server_address = host + ":" + std::to_string(port_);
55   grpc::ServerBuilder builder;
56   // Default message size for gRPC is 4MB. Increase it to 2g-1
57   builder.SetMaxReceiveMessageSize(std::numeric_limits<int32_t>::max());
58 #ifdef CACHE_LOCAL_CLIENT
59   int port_local = 0;
60   // We also optimize on local clients on the same machine using unix socket
61   builder.AddListeningPort("unix://" + unix_socket_, grpc::InsecureServerCredentials(), &port_local);
62 #endif
63   builder.RegisterService(&svc_);
64   cq_ = builder.AddCompletionQueue();
65   server_ = builder.BuildAndStart();
66   if (server_) {
67     MS_LOG(INFO) << "Server listening on " << server_address;
68   } else {
69     std::string errMsg = "Fail to start server. ";
70 #ifdef CACHE_LOCAL_CLIENT
71     if (port_local == 0) {
72       errMsg += " Unable to create unix socket " + unix_socket_ + ".";
73     }
74 #endif
75     RETURN_STATUS_UNEXPECTED(errMsg);
76   }
77   return Status::OK();
78 }
79 
HandleRequest(int32_t worker_id)80 Status CacheServerGreeterImpl::HandleRequest(int32_t worker_id) {
81   bool success;
82   void *tag;
83   // We loop through the grpc queue. Each connection if successful
84   // will come back with our own tag which is an instance of CacheServerRequest
85   // and we simply call its functor. But first we need to create these instances
86   // and inject them into the grpc queue.
87   CacheServerRequest *p;
88   // Get a free tag from my free list.
89   RETURN_IF_NOT_OK(CacheServer::GetFreeRequestTag(&p));
90   RETURN_IF_NOT_OK((*p)(&svc_, cq_.get()));
91   do {
92     auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(1);
93     // Set a timeout for one second. Check for interrupt if we need to do early exit.
94     auto r = cq_->AsyncNext(&tag, &success, deadline);
95     if (r == grpc::CompletionQueue::NextStatus::GOT_EVENT) {
96       auto rq = static_cast<CacheServerRequest *>(tag);
97       if (success) {
98         if (rq->st_ == CacheServerRequest::STATE::PROCESS) {
99           RETURN_IF_NOT_OK((*rq)(&svc_, cq_.get()));
100         } else if (rq->st_ == CacheServerRequest::STATE::FINISH) {
101           MS_LOG(DEBUG) << *rq << " Finished.";
102           if (rq->type_ == BaseRequest::RequestType::kStopService) {
103             // For cache_admin --stop, ProcessRequest is just acknowledging we receive the request. Now
104             // we call the real function.
105             auto &cs = CacheServer::GetInstance();
106             cs.GlobalShutdown();
107           }
108           RETURN_IF_NOT_OK(CacheServer::ReturnRequestTag(rq));
109         }
110       } else {
111         RETURN_IF_NOT_OK(CacheServer::ReturnRequestTag(rq));
112       }
113     } else if (r == grpc::CompletionQueue::NextStatus::TIMEOUT) {
114       // If we are interrupted, exit. Otherwise wait again.
115     } else {
116       // Queue is drained.
117       break;
118     }
119   } while (!this_thread::is_interrupted());
120   return Status::OK();
121 }
122 
operator ()(CacheServerGreeter::AsyncService * svc,grpc::ServerCompletionQueue * cq)123 Status CacheServerRequest::operator()(CacheServerGreeter::AsyncService *svc, grpc::ServerCompletionQueue *cq) {
124   if (st_ == STATE::CREATE) {
125     st_ = STATE::PROCESS;
126     svc->RequestCacheServerRequest(&ctx_, &rq_, &responder_, cq, cq, this);
127   } else if (st_ == STATE::PROCESS) {
128     auto &cs = CacheServer::GetInstance();
129     // Get a new tag and handle the next request before we serve the current request.
130     // The tag will be recycled when its state is changed to FINISH.
131     // The number of free list queues is the same as the number of grpc threads.
132     // Where we get the free list it doesn't matter (as long we return it back to the right queue).
133     // We can round robin, use the qid or even use the worker id. We will use the free list queue
134     // where the current request comes from.
135     CacheServerRequest *next_rq;
136     RETURN_IF_NOT_OK(CacheServer::GetFreeRequestTag(&next_rq));
137     RETURN_IF_NOT_OK((*next_rq)(svc, cq));
138     // Now we continue with the current request.
139     // First thing we need to extract the type from the incoming request.
140     // When this object was first created (i.e. STATE::CREATE), we set the type to UNKNOWN.
141     type_ = static_cast<RequestType>(rq_.type());
142     // Now we pass the address of this instance to CacheServer's main loop.
143     MS_LOG(DEBUG) << "Handle request " << *this;
144     // We will distribute the request evenly (or randomly) over all the numa nodes.
145     // The exception is BatchFetch and BatchCache which we need to pre-process here.
146     // Also some requests are urgent that we want to process them here too.
147     if (type_ == BaseRequest::RequestType::kBatchFetchRows || type_ == BaseRequest::RequestType::kBatchCacheRows ||
148         type_ == BaseRequest::RequestType::kStopService || type_ == BaseRequest::RequestType::kAllocateSharedBlock ||
149         type_ == BaseRequest::RequestType::kFreeSharedBlock) {
150       RETURN_IF_NOT_OK(cs.ProcessRequest(this));
151       // WARNING. After we call ProcessRequest, the memory of 'this' is being recycled by ReturnRequestTag
152       // asynchronously. Further access of 'this' is unpredictable.
153     } else {
154       RETURN_IF_NOT_OK(cs.PushRequest(cs.GetRandomWorker(), this));
155     }
156   } else if (st_ == STATE::FINISH) {
157     // We don't have logic here but moved to the caller.
158   }
159   return Status::OK();
160 }
161 
Print(std::ostream & out) const162 void CacheServerRequest::Print(std::ostream &out) const {
163   if (rq_.has_connection_info()) {
164     out << "Session Id: " << rq_.connection_info().session_id() << " CRC: " << rq_.connection_info().crc();
165   } else {
166     out << "Connection Id: " << rq_.connection_id();
167   }
168   out << " ";
169   BaseRequest::Print(out);
170 }
171 
MonitorUnixSocket()172 Status CacheServerGreeterImpl::MonitorUnixSocket() {
173   TaskManager::FindMe()->Post();
174 #ifdef CACHE_LOCAL_CLIENT
175   Path p(unix_socket_);
176   do {
177     RETURN_IF_INTERRUPTED();
178     // If the unix socket is recreated for whatever reason, this server instance will be stale and
179     // no other process and communicate with us. In this case we need to shutdown ourselves.
180     if (p.Exists()) {
181       auto &cs = CacheServer::GetInstance();
182       SharedMemory::shm_key_t key;
183       RETURN_IF_NOT_OK(PortToFtok(port_, &key));
184       auto shm_key = cs.GetKey();
185       if (key != shm_key) {
186         std::string errMsg = "Detecting unix socket has changed. Previous key " + std::to_string(shm_key) +
187                              ". New key " + std::to_string(key) + ". Shutting down server";
188         MS_LOG(ERROR) << errMsg;
189         RETURN_STATUS_UNEXPECTED(errMsg);
190       }
191     } else {
192       MS_LOG(WARNING) << "Unix socket is removed.";
193       TaskManager::WakeUpWatchDog();
194     }
195     std::this_thread::sleep_for(std::chrono::seconds(kMonitorIntervalInSec));
196   } while (true);
197 #endif
198   return Status::OK();
199 }
200 }  // namespace dataset
201 }  // namespace mindspore
202