1 /** 2 * Copyright 2020 Huawei Technologies Co., Ltd 3 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 8 * http://www.apache.org/licenses/LICENSE-2.0 9 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_GRPC_CLIENT_H_ 17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_GRPC_CLIENT_H_ 18 19 #include <atomic> 20 #include <map> 21 #include <memory> 22 #include <mutex> 23 #include <string> 24 #include <utility> 25 #include "minddata/dataset/engine/cache/cache_common.h" 26 #include "minddata/dataset/engine/cache/cache_ipc.h" 27 #include "minddata/dataset/util/service.h" 28 #include "minddata/dataset/util/task_manager.h" 29 namespace mindspore { 30 namespace dataset { 31 /// \brief A client view of gRPC request 32 /// Like the class CacheServerRequest, this is used as a tag to inject into the gRPC 33 /// completion queue. The thread that makes the rpc request will wait on a wait post 34 /// area for the reply to come back. Since this tag will be deleted from memory and 35 /// we thus we need to work on a shared pointer of the BaseRequest such that its 36 /// use count is at least two. Otherwise either thread will be referencing stale memory. 37 /// \see CacheServerRequest 38 class CacheClientRequestTag { 39 public: 40 friend class CacheClientGreeter; CacheClientRequestTag(std::shared_ptr<BaseRequest> rq,int64_t seqNo)41 explicit CacheClientRequestTag(std::shared_ptr<BaseRequest> rq, int64_t seqNo) 42 : base_rq_(std::move(rq)), seq_no_(seqNo) {} 43 ~CacheClientRequestTag() = default; 44 45 /// \brief Notify the client that a result has come back from the server Notify()46 void Notify() { base_rq_->wp_.Set(); } 47 48 private: 49 std::shared_ptr<BaseRequest> base_rq_; 50 grpc::Status rc_; 51 grpc::ClientContext ctx_; 52 std::unique_ptr<grpc::ClientAsyncResponseReader<CacheReply>> rpc_; 53 int64_t seq_no_; 54 }; 55 56 /// \brief A GRPC layer to convert BaseRequest into protobuf and send to the cache server using gRPC 57 /// \see BaseRequest 58 class CacheClientGreeter : public Service { 59 friend class CacheClient; 60 61 public: 62 constexpr static int32_t kRequestTimeoutDeadlineInSec = 60; 63 constexpr static int32_t kWaitForNewEventDeadlineInSec = 1; 64 explicit CacheClientGreeter(const std::string &hostname, int32_t port, int32_t num_connections); 65 ~CacheClientGreeter(); 66 67 /// Override base Service class 68 Status DoServiceStart() override; 69 Status DoServiceStop() override; 70 71 /// \brief Send the request to the server 72 /// \return Status object 73 Status HandleRequest(std::shared_ptr<BaseRequest> rq); 74 75 /// \brief A handful of threads will be handling async reply from the server 76 /// \return 77 Status WorkerEntry(); 78 79 /// \brief Kick off threads to receive reply from the server 80 Status DispatchWorkers(int32_t num_workers); 81 82 /// \brief Attach to shared memory for local client 83 /// \note Called after we have established a connection. 84 /// \return Status object. 85 Status AttachToSharedMemory(bool *local_bypass); 86 87 /// \brief This returns where we attach to the shared memory. 88 /// \return Base address of the shared memory. SharedMemoryBaseAddr()89 const void *SharedMemoryBaseAddr() const { return mem_.SharedMemoryBaseAddr(); } 90 GetHostname()91 std::string GetHostname() const { return hostname_; } GetPort()92 int32_t GetPort() const { return port_; } 93 94 private: 95 std::shared_ptr<grpc::Channel> channel_; 96 std::unique_ptr<CacheServerGreeter::Stub> stub_; 97 grpc::CompletionQueue cq_; 98 TaskGroup vg_; 99 int32_t num_connections_; 100 std::atomic<int64_t> request_cnt_; 101 mutable std::mutex mux_; 102 std::map<int64_t, std::unique_ptr<CacheClientRequestTag>> req_; 103 SharedMemory mem_; 104 std::string hostname_; 105 int32_t port_; 106 }; 107 } // namespace dataset 108 } // namespace mindspore 109 #endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_GRPC_CLIENT_H_ 110