• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020 Huawei Technologies Co., Ltd
3 
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7 
8  * http://www.apache.org/licenses/LICENSE-2.0
9 
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15 */
16 #include "minddata/dataset/engine/cache/cache_ipc.h"
17 #include <sys/stat.h>
18 
19 namespace mindspore {
20 namespace dataset {
PortToFtok(int port,SharedMemory::shm_key_t * out)21 Status PortToFtok(int port, SharedMemory::shm_key_t *out) {
22   RETURN_UNEXPECTED_IF_NULL(out);
23   key_t shmkey = -1;
24   const std::string unix_path = PortToUnixSocketPath(port);
25   shmkey = ftok(unix_path.data(), 'a');
26   if (shmkey == (key_t)-1) {
27     std::string errMsg = "Unable to create a ftok token. Errno = " + std::to_string(errno);
28     return Status(errno == ENOENT ? StatusCode::kMDFileNotExist : StatusCode::kMDUnexpectedError, errMsg);
29   }
30   *out = shmkey;
31   return Status::OK();
32 }
33 
~SharedMessage()34 SharedMessage::~SharedMessage() {
35   // Only remove the queue if we are asked to.
36   if (remove_ipc_on_exit_ && msg_qid_ != -1) {
37     // Remove the message que and never mind about the return code.
38     (void)msgctl(msg_qid_, IPC_RMID, nullptr);
39     msg_qid_ = -1;
40   }
41 }
42 
Create()43 Status SharedMessage::Create() {
44   CHECK_FAIL_RETURN_UNEXPECTED(msg_qid_ == -1, "Message queue already created");
45   auto access_mode = S_IRUSR | S_IWUSR;
46   msg_qid_ = msgget(IPC_PRIVATE, IPC_CREAT | IPC_EXCL | access_mode);
47   if (msg_qid_ == -1) {
48     std::string errMsg = "Unable to create a message queue. Errno = " + std::to_string(errno);
49     RETURN_STATUS_UNEXPECTED(errMsg);
50   }
51   return Status::OK();
52 }
53 
SendStatus(const Status & rc)54 Status SharedMessage::SendStatus(const Status &rc) {
55   CHECK_FAIL_RETURN_UNEXPECTED(msg_qid_ != -1, "Invalid message queue id");
56   CacheMsgBuf msg{
57     1,
58   };
59   msg.body.status.err_code = static_cast<int32_t>(rc.StatusCode());
60   auto err = memcpy_s(msg.body.status.err_msg, kSharedMessageSize, rc.ToString().data(), rc.ToString().size());
61   CHECK_FAIL_RETURN_UNEXPECTED(err == EOK, "memcpy_s failed. err = " + std::to_string(err));
62   msg.body.status.err_msg[rc.ToString().size()] = '\0';
63   err = msgsnd(msg_qid_, reinterpret_cast<void *>(&msg), sizeof(msg.body.status), IPC_NOWAIT);
64   if (err == -1) {
65     std::string errMsg = "Failed to call msgsnd. Errno = " + std::to_string(errno);
66     RETURN_STATUS_UNEXPECTED(errMsg);
67   }
68   return Status::OK();
69 }
70 
ReceiveStatus(Status * rc)71 Status SharedMessage::ReceiveStatus(Status *rc) {
72   RETURN_UNEXPECTED_IF_NULL(rc);
73   CHECK_FAIL_RETURN_UNEXPECTED(msg_qid_ != -1, "Invalid message queue id");
74   struct CacheMsgBuf msg {};
75   auto err = msgrcv(msg_qid_, reinterpret_cast<void *>(&msg), sizeof(msg.body.status), 0, MSG_NOERROR);
76   if (err == -1) {
77     std::string errMsg = "Failed to call msgrcv. Errno = " + std::to_string(errno);
78     RETURN_STATUS_UNEXPECTED(errMsg);
79   }
80 
81   Status rc_recv(static_cast<StatusCode>(msg.body.status.err_code), msg.body.status.err_msg);
82   *rc = std::move(rc_recv);
83   return Status::OK();
84 }
85 
~SharedMemory()86 SharedMemory::~SharedMemory() {
87   if (shmat_addr_) {
88     (void)Detach();
89   }
90   if (remove_ipc_on_exit_ && shm_id_ != -1) {
91     // Remove the shared memory and never mind about the return code.
92     Status rc = Destroy();
93     if (rc.IsError()) {
94       MS_LOG(ERROR) << rc.ToString();
95     }
96   }
97   shm_id_ = -1;
98   shmat_addr_ = nullptr;
99 }
100 
Create(int64_t sz)101 Status SharedMemory::Create(int64_t sz) {
102   auto access_mode = S_IRUSR | S_IWUSR;
103   shm_id_ = shmget(shm_key_, sz, IPC_CREAT | IPC_EXCL | access_mode);
104   if (shm_id_ == -1) {
105     RETURN_STATUS_UNEXPECTED("Shared memory creation failed. Errno " + std::to_string(errno));
106   } else {
107     shmat_addr_ = shmat(shm_id_, nullptr, 0);
108     if (shmat_addr_ == reinterpret_cast<void *>(-1)) {
109       RETURN_STATUS_UNEXPECTED("Shared memory attach failed. Errno " + std::to_string(errno));
110     }
111   }
112   return Status::OK();
113 }
114 
Attach()115 Status SharedMemory::Attach() {
116   shm_id_ = shmget(shm_key_, 0, 0);
117   if (shm_id_ == -1) {
118     RETURN_STATUS_UNEXPECTED("Shmget failed. Errno " + std::to_string(errno));
119   }
120   shmat_addr_ = shmat(shm_id_, nullptr, 0);
121   if (shmat_addr_ == reinterpret_cast<void *>(-1)) {
122     RETURN_STATUS_UNEXPECTED("Shared memory attach failed. Errno " + std::to_string(errno));
123   }
124   return Status::OK();
125 }
126 
Detach()127 Status SharedMemory::Detach() {
128   if (shmat_addr_) {
129     auto err = shmdt(shmat_addr_);
130     if (err == -1) {
131       RETURN_STATUS_UNEXPECTED("Shared memory detach failed. Errno " + std::to_string(errno));
132     }
133   }
134   shmat_addr_ = nullptr;
135   return Status::OK();
136 }
137 
Destroy()138 Status SharedMemory::Destroy() {
139   // Remove the shared memory and never mind about the return code.
140   auto err = shmctl(shm_id_, IPC_RMID, nullptr);
141   if (err == -1) {
142     std::string errMsg = "Unable to remove shared memory with id " + std::to_string(shm_id_);
143     errMsg += ". Errno :" + std::to_string(errno);
144     errMsg += "\nPlesae remove it manually using ipcrm -m command";
145     RETURN_STATUS_UNEXPECTED(errMsg);
146   }
147   return Status::OK();
148 }
149 
GetNumAttached(int32_t * num)150 Status SharedMemory::GetNumAttached(int32_t *num) {
151   RETURN_UNEXPECTED_IF_NULL(num);
152   struct shmid_ds ds {};
153   auto err = shmctl(shm_id_, IPC_STAT, &ds);
154   if (err == -1) {
155     std::string errMsg = "Unable to query shared memory with id " + std::to_string(shm_id_);
156     errMsg += "\nPlease remove it manually using ipcrm -m command";
157     RETURN_STATUS_UNEXPECTED(errMsg);
158   }
159   *num = ds.shm_nattch;
160   return Status::OK();
161 }
162 }  // namespace dataset
163 }  // namespace mindspore
164