• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020 Huawei Technologies Co., Ltd
3 
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7 
8  * http://www.apache.org/licenses/LICENSE-2.0
9 
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15 */
16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_IPC_H_
17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_IPC_H_
18 
19 #include <sys/types.h>
20 #include <sys/ipc.h>
21 #include <sys/shm.h>
22 #include <sys/msg.h>
23 #include <string>
24 #include <utility>
25 #include "minddata/dataset/engine/cache/cache_common.h"
26 #include "minddata/dataset/util/status.h"
27 
28 namespace mindspore {
29 namespace dataset {
30 /// A message queue structure between the parent and the child process
31 struct CacheMsgBuf {
32   int64_t mtype;
33   union {
34     char mtext[1];
35     struct {
36       int32_t err_code;
37       char err_msg[kSharedMessageSize];
38     } status;
39   } body;
40 };
41 
42 class BaseIPC {
43  public:
BaseIPC()44   BaseIPC() : remove_ipc_on_exit_(false) {}
~BaseIPC()45   virtual ~BaseIPC() {}
46   /// Indicate if we should remove the ipc resource on exit. Usually this is done by parent process.
RemoveResourcesOnExit()47   void RemoveResourcesOnExit() { remove_ipc_on_exit_ = true; }
48   /// Copy constructors
BaseIPC(const BaseIPC & rhs)49   BaseIPC(const BaseIPC &rhs) : remove_ipc_on_exit_(false) {}
50   BaseIPC &operator=(const BaseIPC &rhs) {
51     if (&rhs != this) {
52       remove_ipc_on_exit_ = false;
53     }
54     return *this;
55   }
56   /// Move constructors
BaseIPC(BaseIPC && rhs)57   BaseIPC(BaseIPC &&rhs) noexcept : remove_ipc_on_exit_(rhs.remove_ipc_on_exit_) { rhs.remove_ipc_on_exit_ = false; }
58   BaseIPC &operator=(BaseIPC &&rhs) noexcept {
59     if (&rhs != this) {
60       remove_ipc_on_exit_ = rhs.remove_ipc_on_exit_;
61       rhs.remove_ipc_on_exit_ = false;
62     }
63     return *this;
64   }
65 
66  protected:
67   bool remove_ipc_on_exit_;
68 };
69 
70 /// \brief This wraps a shared message for the communication between processes. It is used primarily
71 /// for starting and stopping a server.
72 class SharedMessage : public BaseIPC {
73  public:
74   using queue_id_t = int;
SharedMessage()75   SharedMessage() : msg_qid_(-1) {}
SharedMessage(queue_id_t qid)76   explicit SharedMessage(queue_id_t qid) : msg_qid_(qid) {}
77   ~SharedMessage() override;
78 
79   /// Copy constructors
SharedMessage(const SharedMessage & rhs)80   SharedMessage(const SharedMessage &rhs) : BaseIPC(rhs), msg_qid_(rhs.msg_qid_) {}
81   SharedMessage &operator=(const SharedMessage &rhs) {
82     if (&rhs != this) {
83       msg_qid_ = rhs.msg_qid_;
84       BaseIPC::operator=(rhs);
85     }
86     return *this;
87   }
88   /// Move constructors
SharedMessage(SharedMessage && rhs)89   SharedMessage(SharedMessage &&rhs) noexcept : BaseIPC(std::move(rhs)) {
90     msg_qid_ = rhs.msg_qid_;
91     rhs.msg_qid_ = -1;
92   }
93   SharedMessage &operator=(SharedMessage &&rhs) noexcept {
94     if (&rhs != this) {
95       msg_qid_ = rhs.msg_qid_;
96       rhs.msg_qid_ = -1;
97       BaseIPC::operator=(std::move(rhs));
98     }
99     return *this;
100   }
101 
102   /// Return the private id
GetMsgQueueId()103   queue_id_t GetMsgQueueId() const { return msg_qid_; }
104 
105   /// \brief Create a private message queue
106   Status Create();
107 
108   /// Send a Status object
109   Status SendStatus(const Status &rc);
110 
111   /// Retrieve a Status object
112   Status ReceiveStatus(Status *rc);
113 
114  private:
115   queue_id_t msg_qid_;
116 };
117 
118 /// \brief This wraps a shared memory for the communication between processes. It is used primarily
119 /// for transporting large tensor rows.
120 class SharedMemory : public BaseIPC {
121  public:
122   using shm_key_t = int;
123   using shm_id_t = int;
SharedMemory()124   SharedMemory() : shm_id_(-1), shm_key_(-1), shmat_addr_(nullptr) {}
SharedMemory(shm_key_t public_key)125   explicit SharedMemory(shm_key_t public_key) : shm_id_(-1), shm_key_(public_key), shmat_addr_(nullptr) {}
126   ~SharedMemory() override;
127   /// Copy constructors
SharedMemory(const SharedMemory & rhs)128   SharedMemory(const SharedMemory &rhs)
129       : BaseIPC(rhs), shm_id_(rhs.shm_id_), shm_key_(rhs.shm_key_), shmat_addr_(rhs.shmat_addr_) {}
130   SharedMemory &operator=(const SharedMemory &rhs) {
131     if (&rhs != this) {
132       shm_id_ = rhs.shm_id_;
133       shm_key_ = rhs.shm_key_;
134       shmat_addr_ = rhs.shmat_addr_;
135       BaseIPC::operator=(rhs);
136     }
137     return *this;
138   }
139   /// Move constructors
SharedMemory(SharedMemory && rhs)140   SharedMemory(SharedMemory &&rhs) noexcept : BaseIPC(std::move(rhs)) {
141     shm_id_ = rhs.shm_id_;
142     shm_key_ = rhs.shm_key_;
143     shmat_addr_ = rhs.shmat_addr_;
144     rhs.shm_id_ = -1;
145     rhs.shm_key_ = -1;
146     rhs.shmat_addr_ = nullptr;
147   }
148   SharedMemory &operator=(SharedMemory &&rhs) noexcept {
149     if (&rhs != this) {
150       shm_id_ = rhs.shm_id_;
151       shm_key_ = rhs.shm_key_;
152       shmat_addr_ = rhs.shmat_addr_;
153       rhs.shm_id_ = -1;
154       rhs.shm_key_ = -1;
155       rhs.shmat_addr_ = nullptr;
156       BaseIPC::operator=(std::move(rhs));
157     }
158     return *this;
159   }
160   /// \brief Set the public key
SetPublicKey(key_t public_key)161   void SetPublicKey(key_t public_key) { shm_key_ = public_key; }
162 
163   /// \brief Retrieve the key
GetKey()164   shm_key_t GetKey() const { return shm_key_; }
165 
166   /// \brief This returns where we attach to the shared memory.
167   /// \return Base address of the shared memory.
SharedMemoryBaseAddr()168   const void *SharedMemoryBaseAddr() const { return shmat_addr_; }
SharedMemoryBaseAddr()169   void *SharedMemoryBaseAddr() { return shmat_addr_; }
170 
171   /// \brief Attach to shared memory
172   /// \return Status object
173   Status Attach();
174 
175   /// Detach from shared memory
176   /// \return Status object
177   Status Detach();
178 
179   /// Create shared memory
180   /// \return Status object
181   Status Create(int64_t sz);
182 
183   /// Destroy shared memory
184   /// \return Status object
185   Status Destroy();
186 
187   /// \brief Return the shared memory id
GetSharedMemoryId()188   shm_id_t GetSharedMemoryId() const { return shm_id_; }
189 
190   /// \brief Get number of processes attached to the shared memory
191   /// \return Status object
192   Status GetNumAttached(int32_t *num);
193 
194  private:
195   shm_id_t shm_id_;
196   shm_key_t shm_key_;
197   void *shmat_addr_;
198 };
199 
200 /// \brief Generate a shared memory key using the tcp/ip port.
201 /// \note It must be called after the cache server generates the unix socket or ftok will fail.
202 /// \note Caller must check the return value. -1 means ftok failed.
203 /// \param[in] port
204 /// \param[out] err. If not null and ftok fails, this will contain the value of errno
205 /// \return key
206 Status PortToFtok(int port, SharedMemory::shm_key_t *);
207 
208 }  // namespace dataset
209 }  // namespace mindspore
210 #endif  // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_IPC_H_
211