• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020 Huawei Technologies Co., Ltd
3 
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7 
8  * http://www.apache.org/licenses/LICENSE-2.0
9 
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15 */
16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_GRPC_SERVER_H_
17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_GRPC_SERVER_H_
18 
19 #include <atomic>
20 #include <memory>
21 #include <mutex>
22 #include <string>
23 #include <utility>
24 #include <vector>
25 #include "minddata/dataset/engine/cache/cache_common.h"
26 #include "minddata/dataset/engine/cache/cache_ipc.h"
27 #include "minddata/dataset/util/allocator.h"
28 #include "minddata/dataset/util/arena.h"
29 #include "minddata/dataset/util/status.h"
30 #include "minddata/dataset/util/task_manager.h"
31 
32 namespace mindspore {
33 namespace dataset {
34 /// \brief Server side view of BaseRequest. Incoming request are in the form of protobuf objects
35 /// and this class is used to translate from protobuf to structures understood by CacheService class.
36 /// \see CacheService
37 class CacheServerRequest : public BaseRequest {
38  public:
39   friend class CacheServer;
40   friend class CacheService;
41   friend class CacheServerGreeterImpl;
42   enum class STATE : int8_t { CREATE = 1, PROCESS = 2, FINISH = 3 };
CacheServerRequest()43   CacheServerRequest()
44       : BaseRequest::BaseRequest(BaseRequest::RequestType::kRequestUnknown), st_(STATE::CREATE), responder_(&ctx_) {}
45 
46   ~CacheServerRequest() override = default;
47 
48   /// \brief Functor. Used mainly by CacheServerGreeterImpl class to tag each incoming request and this
49   /// functor will translate each protobuf into some form understood by by CacheService class.
50   /// \param svc Async service
51   /// \param cq Completion queue
52   /// \return Status object
53   Status operator()(CacheServerGreeter::AsyncService *svc, grpc::ServerCompletionQueue *cq);
54 
55   /// \brief Override the base class Print method
56   /// \param out
57   void Print(std::ostream &out) const override;
58 
59  private:
60   Status rc_;
61   STATE st_;
62   grpc::ServerContext ctx_;
63   grpc::ServerAsyncResponseWriter<CacheReply> responder_;
64 };
65 
66 /// \brief Implementation of CacheServerGreeter
67 /// \note It is an async server
68 /// \see cache_grpc.proto
69 class CacheServerGreeterImpl final {
70   friend class CacheServer;
71 
72  public:
73   constexpr static int32_t kMonitorIntervalInSec = 5;
74   explicit CacheServerGreeterImpl(int32_t port);
75   virtual ~CacheServerGreeterImpl();
76   /// \brief Brings up gRPC server
77   /// \return none
78   Status Run();
79   /// \brief Entry function to handle cache server request
80   Status HandleRequest(int32_t worker_id);
81 
82   /// \brief Montor the status of the unix socket in case it is gone.
83   Status MonitorUnixSocket();
84 
85   /// \brief This shutdown down the comm layer
86   void Shutdown();
87 
88  private:
89   int32_t port_;
90   std::string unix_socket_;
91   CacheServerGreeter::AsyncService svc_;
92   std::unique_ptr<grpc::ServerCompletionQueue> cq_;
93   std::unique_ptr<grpc::Server> server_;
94 };
95 }  // namespace dataset
96 }  // namespace mindspore
97 #endif  // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_GRPC_SERVER_H_
98