1 /** 2 * Copyright 2020 Huawei Technologies Co., Ltd 3 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 8 * http://www.apache.org/licenses/LICENSE-2.0 9 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_GRPC_SERVER_H_ 17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_GRPC_SERVER_H_ 18 19 #include <atomic> 20 #include <memory> 21 #include <mutex> 22 #include <string> 23 #include <utility> 24 #include <vector> 25 #include "minddata/dataset/engine/cache/cache_common.h" 26 #include "minddata/dataset/engine/cache/cache_ipc.h" 27 #include "minddata/dataset/util/allocator.h" 28 #include "minddata/dataset/util/arena.h" 29 #include "minddata/dataset/util/status.h" 30 #include "minddata/dataset/util/task_manager.h" 31 32 namespace mindspore { 33 namespace dataset { 34 /// \brief Server side view of BaseRequest. Incoming request are in the form of protobuf objects 35 /// and this class is used to translate from protobuf to structures understood by CacheService class. 36 /// \see CacheService 37 class CacheServerRequest : public BaseRequest { 38 public: 39 friend class CacheServer; 40 friend class CacheService; 41 friend class CacheServerGreeterImpl; 42 enum class STATE : int8_t { CREATE = 1, PROCESS = 2, FINISH = 3 }; CacheServerRequest()43 CacheServerRequest() 44 : BaseRequest::BaseRequest(BaseRequest::RequestType::kRequestUnknown), st_(STATE::CREATE), responder_(&ctx_) {} 45 46 ~CacheServerRequest() override = default; 47 48 /// \brief Functor. Used mainly by CacheServerGreeterImpl class to tag each incoming request and this 49 /// functor will translate each protobuf into some form understood by by CacheService class. 50 /// \param svc Async service 51 /// \param cq Completion queue 52 /// \return Status object 53 Status operator()(CacheServerGreeter::AsyncService *svc, grpc::ServerCompletionQueue *cq); 54 55 /// \brief Override the base class Print method 56 /// \param out 57 void Print(std::ostream &out) const override; 58 59 private: 60 Status rc_; 61 STATE st_; 62 grpc::ServerContext ctx_; 63 grpc::ServerAsyncResponseWriter<CacheReply> responder_; 64 }; 65 66 /// \brief Implementation of CacheServerGreeter 67 /// \note It is an async server 68 /// \see cache_grpc.proto 69 class CacheServerGreeterImpl final { 70 friend class CacheServer; 71 72 public: 73 constexpr static int32_t kMonitorIntervalInSec = 5; 74 explicit CacheServerGreeterImpl(int32_t port); 75 virtual ~CacheServerGreeterImpl(); 76 /// \brief Brings up gRPC server 77 /// \return none 78 Status Run(); 79 /// \brief Entry function to handle cache server request 80 Status HandleRequest(int32_t worker_id); 81 82 /// \brief Montor the status of the unix socket in case it is gone. 83 Status MonitorUnixSocket(); 84 85 /// \brief This shutdown down the comm layer 86 void Shutdown(); 87 88 private: 89 int32_t port_; 90 std::string unix_socket_; 91 CacheServerGreeter::AsyncService svc_; 92 std::unique_ptr<grpc::ServerCompletionQueue> cq_; 93 std::unique_ptr<grpc::Server> server_; 94 }; 95 } // namespace dataset 96 } // namespace mindspore 97 #endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_GRPC_SERVER_H_ 98