1 /**
2 * Copyright 2020 Huawei Technologies Co., Ltd
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/engine/cache/cache_ipc.h"
17 #include <sys/stat.h>
18
19 namespace mindspore {
20 namespace dataset {
PortToFtok(int port,SharedMemory::shm_key_t * out)21 Status PortToFtok(int port, SharedMemory::shm_key_t *out) {
22 RETURN_UNEXPECTED_IF_NULL(out);
23 key_t shmkey = -1;
24 const std::string unix_path = PortToUnixSocketPath(port);
25 shmkey = ftok(unix_path.data(), 'a');
26 if (shmkey == (key_t)-1) {
27 std::string errMsg = "Unable to create a ftok token. Errno = " + std::to_string(errno);
28 return Status(errno == ENOENT ? StatusCode::kMDFileNotExist : StatusCode::kMDUnexpectedError, errMsg);
29 }
30 *out = shmkey;
31 return Status::OK();
32 }
33
~SharedMessage()34 SharedMessage::~SharedMessage() {
35 // Only remove the queue if we are asked to.
36 if (remove_ipc_on_exit_ && msg_qid_ != -1) {
37 // Remove the message que and never mind about the return code.
38 (void)msgctl(msg_qid_, IPC_RMID, nullptr);
39 msg_qid_ = -1;
40 }
41 }
42
Create()43 Status SharedMessage::Create() {
44 CHECK_FAIL_RETURN_UNEXPECTED(msg_qid_ == -1, "Message queue already created");
45 auto access_mode = S_IRUSR | S_IWUSR;
46 msg_qid_ = msgget(IPC_PRIVATE, IPC_CREAT | IPC_EXCL | access_mode);
47 if (msg_qid_ == -1) {
48 std::string errMsg = "Unable to create a message queue. Errno = " + std::to_string(errno);
49 RETURN_STATUS_UNEXPECTED(errMsg);
50 }
51 return Status::OK();
52 }
53
SendStatus(const Status & rc)54 Status SharedMessage::SendStatus(const Status &rc) {
55 CHECK_FAIL_RETURN_UNEXPECTED(msg_qid_ != -1, "Invalid message queue id");
56 CacheMsgBuf msg{
57 1,
58 };
59 msg.body.status.err_code = static_cast<int32_t>(rc.StatusCode());
60 auto err = memcpy_s(msg.body.status.err_msg, kSharedMessageSize, rc.ToString().data(), rc.ToString().size());
61 CHECK_FAIL_RETURN_UNEXPECTED(err == EOK, "memcpy_s failed. err = " + std::to_string(err));
62 msg.body.status.err_msg[rc.ToString().size()] = '\0';
63 err = msgsnd(msg_qid_, reinterpret_cast<void *>(&msg), sizeof(msg.body.status), IPC_NOWAIT);
64 if (err == -1) {
65 std::string errMsg = "Failed to call msgsnd. Errno = " + std::to_string(errno);
66 RETURN_STATUS_UNEXPECTED(errMsg);
67 }
68 return Status::OK();
69 }
70
ReceiveStatus(Status * rc)71 Status SharedMessage::ReceiveStatus(Status *rc) {
72 RETURN_UNEXPECTED_IF_NULL(rc);
73 CHECK_FAIL_RETURN_UNEXPECTED(msg_qid_ != -1, "Invalid message queue id");
74 struct CacheMsgBuf msg {};
75 auto err = msgrcv(msg_qid_, reinterpret_cast<void *>(&msg), sizeof(msg.body.status), 0, MSG_NOERROR);
76 if (err == -1) {
77 std::string errMsg = "Failed to call msgrcv. Errno = " + std::to_string(errno);
78 RETURN_STATUS_UNEXPECTED(errMsg);
79 }
80
81 Status rc_recv(static_cast<StatusCode>(msg.body.status.err_code), msg.body.status.err_msg);
82 *rc = std::move(rc_recv);
83 return Status::OK();
84 }
85
~SharedMemory()86 SharedMemory::~SharedMemory() {
87 if (shmat_addr_) {
88 (void)Detach();
89 }
90 if (remove_ipc_on_exit_ && shm_id_ != -1) {
91 // Remove the shared memory and never mind about the return code.
92 Status rc = Destroy();
93 if (rc.IsError()) {
94 MS_LOG(ERROR) << rc.ToString();
95 }
96 }
97 shm_id_ = -1;
98 shmat_addr_ = nullptr;
99 }
100
Create(int64_t sz)101 Status SharedMemory::Create(int64_t sz) {
102 auto access_mode = S_IRUSR | S_IWUSR;
103 shm_id_ = shmget(shm_key_, sz, IPC_CREAT | IPC_EXCL | access_mode);
104 if (shm_id_ == -1) {
105 RETURN_STATUS_UNEXPECTED("Shared memory creation failed. Errno " + std::to_string(errno));
106 } else {
107 shmat_addr_ = shmat(shm_id_, nullptr, 0);
108 if (shmat_addr_ == reinterpret_cast<void *>(-1)) {
109 RETURN_STATUS_UNEXPECTED("Shared memory attach failed. Errno " + std::to_string(errno));
110 }
111 }
112 return Status::OK();
113 }
114
Attach()115 Status SharedMemory::Attach() {
116 shm_id_ = shmget(shm_key_, 0, 0);
117 if (shm_id_ == -1) {
118 RETURN_STATUS_UNEXPECTED("Shmget failed. Errno " + std::to_string(errno));
119 }
120 shmat_addr_ = shmat(shm_id_, nullptr, 0);
121 if (shmat_addr_ == reinterpret_cast<void *>(-1)) {
122 RETURN_STATUS_UNEXPECTED("Shared memory attach failed. Errno " + std::to_string(errno));
123 }
124 return Status::OK();
125 }
126
Detach()127 Status SharedMemory::Detach() {
128 if (shmat_addr_) {
129 auto err = shmdt(shmat_addr_);
130 if (err == -1) {
131 RETURN_STATUS_UNEXPECTED("Shared memory detach failed. Errno " + std::to_string(errno));
132 }
133 }
134 shmat_addr_ = nullptr;
135 return Status::OK();
136 }
137
Destroy()138 Status SharedMemory::Destroy() {
139 // Remove the shared memory and never mind about the return code.
140 auto err = shmctl(shm_id_, IPC_RMID, nullptr);
141 if (err == -1) {
142 std::string errMsg = "Unable to remove shared memory with id " + std::to_string(shm_id_);
143 errMsg += ". Errno :" + std::to_string(errno);
144 errMsg += "\nPlesae remove it manually using ipcrm -m command";
145 RETURN_STATUS_UNEXPECTED(errMsg);
146 }
147 return Status::OK();
148 }
149
GetNumAttached(int32_t * num)150 Status SharedMemory::GetNumAttached(int32_t *num) {
151 RETURN_UNEXPECTED_IF_NULL(num);
152 struct shmid_ds ds {};
153 auto err = shmctl(shm_id_, IPC_STAT, &ds);
154 if (err == -1) {
155 std::string errMsg = "Unable to query shared memory with id " + std::to_string(shm_id_);
156 errMsg += "\nPlease remove it manually using ipcrm -m command";
157 RETURN_STATUS_UNEXPECTED(errMsg);
158 }
159 *num = ds.shm_nattch;
160 return Status::OK();
161 }
162 } // namespace dataset
163 } // namespace mindspore
164