1 /** 2 * Copyright 2020 Huawei Technologies Co., Ltd 3 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 8 * http://www.apache.org/licenses/LICENSE-2.0 9 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_IPC_H_ 17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_IPC_H_ 18 19 #include <sys/types.h> 20 #include <sys/ipc.h> 21 #include <sys/shm.h> 22 #include <sys/msg.h> 23 #include <string> 24 #include <utility> 25 #include "minddata/dataset/engine/cache/cache_common.h" 26 #include "minddata/dataset/util/status.h" 27 28 namespace mindspore { 29 namespace dataset { 30 /// A message queue structure between the parent and the child process 31 struct CacheMsgBuf { 32 int64_t mtype; 33 union { 34 char mtext[1]; 35 struct { 36 int32_t err_code; 37 char err_msg[kSharedMessageSize]; 38 } status; 39 } body; 40 }; 41 42 class BaseIPC { 43 public: BaseIPC()44 BaseIPC() : remove_ipc_on_exit_(false) {} ~BaseIPC()45 virtual ~BaseIPC() {} 46 /// Indicate if we should remove the ipc resource on exit. Usually this is done by parent process. RemoveResourcesOnExit()47 void RemoveResourcesOnExit() { remove_ipc_on_exit_ = true; } 48 /// Copy constructors BaseIPC(const BaseIPC & rhs)49 BaseIPC(const BaseIPC &rhs) : remove_ipc_on_exit_(false) {} 50 BaseIPC &operator=(const BaseIPC &rhs) { 51 if (&rhs != this) { 52 remove_ipc_on_exit_ = false; 53 } 54 return *this; 55 } 56 /// Move constructors BaseIPC(BaseIPC && rhs)57 BaseIPC(BaseIPC &&rhs) noexcept : remove_ipc_on_exit_(rhs.remove_ipc_on_exit_) { rhs.remove_ipc_on_exit_ = false; } 58 BaseIPC &operator=(BaseIPC &&rhs) noexcept { 59 if (&rhs != this) { 60 remove_ipc_on_exit_ = rhs.remove_ipc_on_exit_; 61 rhs.remove_ipc_on_exit_ = false; 62 } 63 return *this; 64 } 65 66 protected: 67 bool remove_ipc_on_exit_; 68 }; 69 70 /// \brief This wraps a shared message for the communication between processes. It is used primarily 71 /// for starting and stopping a server. 72 class SharedMessage : public BaseIPC { 73 public: 74 using queue_id_t = int; SharedMessage()75 SharedMessage() : msg_qid_(-1) {} SharedMessage(queue_id_t qid)76 explicit SharedMessage(queue_id_t qid) : msg_qid_(qid) {} 77 ~SharedMessage() override; 78 79 /// Copy constructors SharedMessage(const SharedMessage & rhs)80 SharedMessage(const SharedMessage &rhs) : BaseIPC(rhs), msg_qid_(rhs.msg_qid_) {} 81 SharedMessage &operator=(const SharedMessage &rhs) { 82 if (&rhs != this) { 83 msg_qid_ = rhs.msg_qid_; 84 BaseIPC::operator=(rhs); 85 } 86 return *this; 87 } 88 /// Move constructors SharedMessage(SharedMessage && rhs)89 SharedMessage(SharedMessage &&rhs) noexcept : BaseIPC(std::move(rhs)) { 90 msg_qid_ = rhs.msg_qid_; 91 rhs.msg_qid_ = -1; 92 } 93 SharedMessage &operator=(SharedMessage &&rhs) noexcept { 94 if (&rhs != this) { 95 msg_qid_ = rhs.msg_qid_; 96 rhs.msg_qid_ = -1; 97 BaseIPC::operator=(std::move(rhs)); 98 } 99 return *this; 100 } 101 102 /// Return the private id GetMsgQueueId()103 queue_id_t GetMsgQueueId() const { return msg_qid_; } 104 105 /// \brief Create a private message queue 106 Status Create(); 107 108 /// Send a Status object 109 Status SendStatus(const Status &rc); 110 111 /// Retrieve a Status object 112 Status ReceiveStatus(Status *rc); 113 114 private: 115 queue_id_t msg_qid_; 116 }; 117 118 /// \brief This wraps a shared memory for the communication between processes. It is used primarily 119 /// for transporting large tensor rows. 120 class SharedMemory : public BaseIPC { 121 public: 122 using shm_key_t = int; 123 using shm_id_t = int; SharedMemory()124 SharedMemory() : shm_id_(-1), shm_key_(-1), shmat_addr_(nullptr) {} SharedMemory(shm_key_t public_key)125 explicit SharedMemory(shm_key_t public_key) : shm_id_(-1), shm_key_(public_key), shmat_addr_(nullptr) {} 126 ~SharedMemory() override; 127 /// Copy constructors SharedMemory(const SharedMemory & rhs)128 SharedMemory(const SharedMemory &rhs) 129 : BaseIPC(rhs), shm_id_(rhs.shm_id_), shm_key_(rhs.shm_key_), shmat_addr_(rhs.shmat_addr_) {} 130 SharedMemory &operator=(const SharedMemory &rhs) { 131 if (&rhs != this) { 132 shm_id_ = rhs.shm_id_; 133 shm_key_ = rhs.shm_key_; 134 shmat_addr_ = rhs.shmat_addr_; 135 BaseIPC::operator=(rhs); 136 } 137 return *this; 138 } 139 /// Move constructors SharedMemory(SharedMemory && rhs)140 SharedMemory(SharedMemory &&rhs) noexcept : BaseIPC(std::move(rhs)) { 141 shm_id_ = rhs.shm_id_; 142 shm_key_ = rhs.shm_key_; 143 shmat_addr_ = rhs.shmat_addr_; 144 rhs.shm_id_ = -1; 145 rhs.shm_key_ = -1; 146 rhs.shmat_addr_ = nullptr; 147 } 148 SharedMemory &operator=(SharedMemory &&rhs) noexcept { 149 if (&rhs != this) { 150 shm_id_ = rhs.shm_id_; 151 shm_key_ = rhs.shm_key_; 152 shmat_addr_ = rhs.shmat_addr_; 153 rhs.shm_id_ = -1; 154 rhs.shm_key_ = -1; 155 rhs.shmat_addr_ = nullptr; 156 BaseIPC::operator=(std::move(rhs)); 157 } 158 return *this; 159 } 160 /// \brief Set the public key SetPublicKey(key_t public_key)161 void SetPublicKey(key_t public_key) { shm_key_ = public_key; } 162 163 /// \brief Retrieve the key GetKey()164 shm_key_t GetKey() const { return shm_key_; } 165 166 /// \brief This returns where we attach to the shared memory. 167 /// \return Base address of the shared memory. SharedMemoryBaseAddr()168 const void *SharedMemoryBaseAddr() const { return shmat_addr_; } SharedMemoryBaseAddr()169 void *SharedMemoryBaseAddr() { return shmat_addr_; } 170 171 /// \brief Attach to shared memory 172 /// \return Status object 173 Status Attach(); 174 175 /// Detach from shared memory 176 /// \return Status object 177 Status Detach(); 178 179 /// Create shared memory 180 /// \return Status object 181 Status Create(int64_t sz); 182 183 /// Destroy shared memory 184 /// \return Status object 185 Status Destroy(); 186 187 /// \brief Return the shared memory id GetSharedMemoryId()188 shm_id_t GetSharedMemoryId() const { return shm_id_; } 189 190 /// \brief Get number of processes attached to the shared memory 191 /// \return Status object 192 Status GetNumAttached(int32_t *num); 193 194 private: 195 shm_id_t shm_id_; 196 shm_key_t shm_key_; 197 void *shmat_addr_; 198 }; 199 200 /// \brief Generate a shared memory key using the tcp/ip port. 201 /// \note It must be called after the cache server generates the unix socket or ftok will fail. 202 /// \note Caller must check the return value. -1 means ftok failed. 203 /// \param[in] port 204 /// \param[out] err. If not null and ftok fails, this will contain the value of errno 205 /// \return key 206 Status PortToFtok(int port, SharedMemory::shm_key_t *); 207 208 } // namespace dataset 209 } // namespace mindspore 210 #endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_IPC_H_ 211