• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020 Huawei Technologies Co., Ltd
3 
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7 
8  * http://www.apache.org/licenses/LICENSE-2.0
9 
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15 */
16 #include <chrono>
17 #include <limits>
18 #include "minddata/dataset/engine/cache/cache_grpc_server.h"
19 #include "minddata/dataset/engine/cache/cache_server.h"
20 #include "minddata/dataset/util/path.h"
21 #include "minddata/dataset/util/task_manager.h"
22 #include "minddata/dataset/util/log_adapter.h"
23 
24 namespace mindspore {
25 namespace dataset {
CacheServerGreeterImpl(int32_t port)26 CacheServerGreeterImpl::CacheServerGreeterImpl(int32_t port) : port_(port) {
27   // Setup a path for unix socket.
28   unix_socket_ = PortToUnixSocketPath(port);
29   // We can't generate the ftok key yet until the unix_socket_ is created
30 }
31 
Shutdown()32 void CacheServerGreeterImpl::Shutdown() {
33   if (server_) {
34     auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(1);
35     server_->Shutdown(deadline);
36   }
37   // Always shutdown the completion queue after the server.
38   if (cq_) {
39     cq_->Shutdown();
40     void *tag;
41     bool success;
42     while (cq_->Next(&tag, &success)) {
43       delete reinterpret_cast<CacheServerRequest *>(tag);
44     }
45   }
46 }
47 
~CacheServerGreeterImpl()48 CacheServerGreeterImpl::~CacheServerGreeterImpl() { Shutdown(); }
49 
Run()50 Status CacheServerGreeterImpl::Run() {
51   // To listen on all interfaces, use 0.0.0.0
52   // Future, allow the user to choose listening interface.  For now, default to localhost
53   std::string host("127.0.0.1");
54   std::string server_address = host + ":" + std::to_string(port_);
55   grpc::ServerBuilder builder;
56   // Default message size for gRPC is 4MB. Increase it to 2g-1
57   builder.SetMaxReceiveMessageSize(std::numeric_limits<int32_t>::max());
58   int port_tcpip = 0;
59 #ifdef CACHE_LOCAL_CLIENT
60   int port_local = 0;
61   // We also optimize on local clients on the same machine using unix socket
62   builder.AddListeningPort("unix://" + unix_socket_, grpc::InsecureServerCredentials(), &port_local);
63 #endif
64   builder.AddListeningPort(server_address, grpc::InsecureServerCredentials(), &port_tcpip);
65   builder.RegisterService(&svc_);
66   cq_ = builder.AddCompletionQueue();
67   server_ = builder.BuildAndStart();
68   if (server_) {
69     MS_LOG(INFO) << "Server listening on " << server_address;
70   } else {
71     std::string errMsg = "Fail to start server. ";
72     if (port_tcpip != port_) {
73       errMsg += "Unable to bind to tcpip port " + std::to_string(port_) + ".";
74     }
75 #ifdef CACHE_LOCAL_CLIENT
76     if (port_local == 0) {
77       errMsg += " Unable to create unix socket " + unix_socket_ + ".";
78     }
79 #endif
80     RETURN_STATUS_UNEXPECTED(errMsg);
81   }
82   return Status::OK();
83 }
84 
HandleRequest(int32_t worker_id)85 Status CacheServerGreeterImpl::HandleRequest(int32_t worker_id) {
86   bool success;
87   void *tag;
88   // We loop through the grpc queue. Each connection if successful
89   // will come back with our own tag which is an instance of CacheServerRequest
90   // and we simply call its functor. But first we need to create these instances
91   // and inject them into the grpc queue.
92   CacheServerRequest *p;
93   // Get a free tag from my free list.
94   RETURN_IF_NOT_OK(CacheServer::GetFreeRequestTag(&p));
95   RETURN_IF_NOT_OK((*p)(&svc_, cq_.get()));
96   do {
97     auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(1);
98     // Set a timeout for one second. Check for interrupt if we need to do early exit.
99     auto r = cq_->AsyncNext(&tag, &success, deadline);
100     if (r == grpc::CompletionQueue::NextStatus::GOT_EVENT) {
101       auto rq = static_cast<CacheServerRequest *>(tag);
102       if (success) {
103         if (rq->st_ == CacheServerRequest::STATE::PROCESS) {
104           RETURN_IF_NOT_OK((*rq)(&svc_, cq_.get()));
105         } else if (rq->st_ == CacheServerRequest::STATE::FINISH) {
106           MS_LOG(DEBUG) << *rq << " Finished.";
107           if (rq->type_ == BaseRequest::RequestType::kStopService) {
108             // For cache_admin --stop, ProcessRequest is just acknowledging we receive the request. Now
109             // we call the real function.
110             auto &cs = CacheServer::GetInstance();
111             cs.GlobalShutdown();
112           }
113           RETURN_IF_NOT_OK(CacheServer::ReturnRequestTag(rq));
114         }
115       } else {
116         RETURN_IF_NOT_OK(CacheServer::ReturnRequestTag(rq));
117       }
118     } else if (r == grpc::CompletionQueue::NextStatus::TIMEOUT) {
119       // If we are interrupted, exit. Otherwise wait again.
120     } else {
121       // Queue is drained.
122       break;
123     }
124   } while (!this_thread::is_interrupted());
125   return Status::OK();
126 }
127 
operator ()(CacheServerGreeter::AsyncService * svc,grpc::ServerCompletionQueue * cq)128 Status CacheServerRequest::operator()(CacheServerGreeter::AsyncService *svc, grpc::ServerCompletionQueue *cq) {
129   if (st_ == STATE::CREATE) {
130     st_ = STATE::PROCESS;
131     svc->RequestCacheServerRequest(&ctx_, &rq_, &responder_, cq, cq, this);
132   } else if (st_ == STATE::PROCESS) {
133     auto &cs = CacheServer::GetInstance();
134     // Get a new tag and handle the next request before we serve the current request.
135     // The tag will be recycled when its state is changed to FINISH.
136     // The number of free list queues is the same as the number of grpc threads.
137     // Where we get the free list it doesn't matter (as long we return it back to the right queue).
138     // We can round robin, use the qid or even use the worker id. We will use the free list queue
139     // where the current request comes from.
140     CacheServerRequest *next_rq;
141     RETURN_IF_NOT_OK(CacheServer::GetFreeRequestTag(&next_rq));
142     RETURN_IF_NOT_OK((*next_rq)(svc, cq));
143     // Now we continue with the current request.
144     // First thing we need to extract the type from the incoming request.
145     // When this object was first created (i.e. STATE::CREATE), we set the type to UNKNOWN.
146     type_ = static_cast<RequestType>(rq_.type());
147     // Now we pass the address of this instance to CacheServer's main loop.
148     MS_LOG(DEBUG) << "Handle request " << *this;
149     // We will distribute the request evenly (or randomly) over all the numa nodes.
150     // The exception is BatchFetch and BatchCache which we need to pre-process here.
151     // Also some requests are urgent that we want to process them here too.
152     if (type_ == BaseRequest::RequestType::kBatchFetchRows || type_ == BaseRequest::RequestType::kBatchCacheRows ||
153         type_ == BaseRequest::RequestType::kStopService || type_ == BaseRequest::RequestType::kAllocateSharedBlock ||
154         type_ == BaseRequest::RequestType::kFreeSharedBlock) {
155       RETURN_IF_NOT_OK(cs.ProcessRequest(this));
156       // WARNING. After we call ProcessRequest, the memory of 'this' is being recycled by ReturnRequestTag
157       // asynchronously. Further access of 'this' is unpredictable.
158     } else {
159       RETURN_IF_NOT_OK(cs.PushRequest(cs.GetRandomWorker(), this));
160     }
161   } else if (st_ == STATE::FINISH) {
162     // We don't have logic here but moved to the caller.
163   }
164   return Status::OK();
165 }
166 
Print(std::ostream & out) const167 void CacheServerRequest::Print(std::ostream &out) const {
168   if (rq_.has_connection_info()) {
169     out << "Session Id: " << rq_.connection_info().session_id() << " CRC: " << rq_.connection_info().crc();
170   } else {
171     out << "Connection Id: " << rq_.connection_id();
172   }
173   out << " ";
174   BaseRequest::Print(out);
175 }
176 
MonitorUnixSocket()177 Status CacheServerGreeterImpl::MonitorUnixSocket() {
178   TaskManager::FindMe()->Post();
179 #ifdef CACHE_LOCAL_CLIENT
180   Path p(unix_socket_);
181   do {
182     RETURN_IF_INTERRUPTED();
183     // If the unix socket is recreated for whatever reason, this server instance will be stale and
184     // no other process and communicate with us. In this case we need to shutdown ourselves.
185     if (p.Exists()) {
186       auto &cs = CacheServer::GetInstance();
187       SharedMemory::shm_key_t key;
188       RETURN_IF_NOT_OK(PortToFtok(port_, &key));
189       auto shm_key = cs.GetKey();
190       if (key != shm_key) {
191         std::string errMsg = "Detecting unix socket has changed. Previous key " + std::to_string(shm_key) +
192                              ". New key " + std::to_string(key) + ". Shutting down server";
193         MS_LOG(ERROR) << errMsg;
194         RETURN_STATUS_UNEXPECTED(errMsg);
195       }
196     } else {
197       MS_LOG(WARNING) << "Unix socket is removed.";
198       TaskManager::WakeUpWatchDog();
199     }
200     std::this_thread::sleep_for(std::chrono::seconds(kMonitorIntervalInSec));
201   } while (true);
202 #endif
203   return Status::OK();
204 }
205 }  // namespace dataset
206 }  // namespace mindspore
207