1 /**
2 * Copyright 2023 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #ifndef MINDSPORE_CCSRC_DISTRIBUTED_RPC_RDMA_CONSTANTS_H_
18 #define MINDSPORE_CCSRC_DISTRIBUTED_RPC_RDMA_CONSTANTS_H_
19
20 #include <urpc.h>
21 #include <map>
22 #include <mutex>
23 #include <string>
24 #include <condition_variable>
25
26 #include "utils/dlopen_macro.h"
27 #include "include/backend/distributed/constants.h"
28
29 namespace mindspore {
30 namespace distributed {
31 namespace rpc {
32 // Parse url string with format ip:port.
ParseURL(const std::string & url,std::string * ip,uint16_t * port)33 inline bool ParseURL(const std::string &url, std::string *ip, uint16_t *port) {
34 if (ip == nullptr || port == nullptr) {
35 MS_LOG(ERROR) << "Output ip or port is nullptr";
36 return false;
37 }
38
39 size_t index1 = url.find(URL_PROTOCOL_IP_SEPARATOR);
40 if (index1 == std::string::npos) {
41 index1 = 0;
42 } else {
43 index1 = index1 + sizeof(URL_PROTOCOL_IP_SEPARATOR) - 1;
44 }
45
46 size_t index2 = url.rfind(':');
47 if (index2 == std::string::npos) {
48 MS_LOG(ERROR) << "Couldn't find the character colon.";
49 return false;
50 }
51
52 *ip = url.substr(index1, index2 - index1);
53 if (ip->empty()) {
54 MS_LOG(ERROR) << "Couldn't find ip in url: " << url.c_str();
55 return false;
56 }
57
58 size_t idx = index2 + sizeof(URL_IP_PORT_SEPARATOR) - 1;
59 if (idx >= url.size()) {
60 MS_LOG(ERROR) << "The size of url is invalid";
61 return false;
62 }
63 try {
64 *port = static_cast<uint16_t>(std::stoul(url.substr(idx)));
65 } catch (const std::system_error &e) {
66 MS_LOG(ERROR) << "Couldn't find port in url: " << url.c_str();
67 return false;
68 }
69
70 MS_LOG(INFO) << "Parse URL for " << url << ". IP: " << *ip << ". Port: " << *port;
71 return true;
72 }
73
LoadURPC()74 inline void *LoadURPC() {
75 static void *urpc_handle = nullptr;
76 if (urpc_handle == nullptr) {
77 urpc_handle = dlopen("liburpc.so", RTLD_LAZY | RTLD_LOCAL);
78 if (urpc_handle == nullptr) {
79 auto err = GetDlErrorMsg();
80 MS_LOG(EXCEPTION) << "dlopen liburpc.so failed. Error message: " << err;
81 }
82 }
83 return urpc_handle;
84 }
85 inline const void *kURPCHandle = LoadURPC();
86
87 #define REG_URPC_METHOD(name, return_type, ...) \
88 constexpr const char *k##name##Name = #name; \
89 using name##FunObj = std::function<return_type(__VA_ARGS__)>; \
90 using name##FunPtr = return_type (*)(__VA_ARGS__); \
91 const name##FunPtr name##_func = DlsymFuncObj(name, const_cast<void *>(kURPCHandle));
92
93 // The symbols of liburpc.so to be dynamically loaded.
94 REG_URPC_METHOD(urpc_init, int, struct urpc_config *)
95 REG_URPC_METHOD(urpc_uninit, void)
96 REG_URPC_METHOD(urpc_connect, urpc_session_t *, const char *, uint16_t, urma_jfs_t *)
97 REG_URPC_METHOD(urpc_close, void, urpc_session_t *)
98 REG_URPC_METHOD(urpc_register_memory, int, void *, int)
99 REG_URPC_METHOD(urpc_register_serdes, int, const char *, const urpc_serdes_t *, urpc_tx_cb_t, void *)
100 REG_URPC_METHOD(urpc_register_handler, int, urpc_handler_info_t *, uint32_t *)
101 REG_URPC_METHOD(urpc_register_raw_handler_explicit, int, urpc_raw_handler_t, void *, urpc_tx_cb_t, void *, uint32_t)
102 REG_URPC_METHOD(urpc_unregister_handler, void, const char *, uint32_t)
103 REG_URPC_METHOD(urpc_query_capability, int, struct urpc_cap *)
104 REG_URPC_METHOD(urpc_send_request, int, urpc_session_t *, struct urpc_send_wr *, struct urpc_send_option *)
105 REG_URPC_METHOD(urpc_call, int, urpc_session_t *, const char *, void *, void **, struct urpc_send_option *)
106 REG_URPC_METHOD(urpc_call_sgl, int, urpc_session_t *, const char *, void *, void **, struct urpc_send_option *)
107 REG_URPC_METHOD(urpc_get_default_allocator, struct urpc_buffer_allocator *)
108
109 constexpr int kURPCSuccess = 0;
110 constexpr uint32_t kInterProcessDataHandleID = 0;
111
112 constexpr uint32_t kServerWorkingThreadNum = 4;
113 constexpr uint32_t kClientPollingThreadNum = 4;
114
115 // Set URPC buffer to 1GB.
116 inline uint32_t kLargeBuffSizes[1] = {1 << 30};
117
118 // URPC is glabally unique because for each process it could be only initialized once.
119 // We need this flag to judge whether URPC is initialized.
120 inline bool kURPCInited = false;
121 // Initialize urpc configuration according to dev_name, ip_addr and port.
InitializeURPC(const std::string & dev_name,const std::string & ip_addr,uint16_t port)122 inline bool InitializeURPC(const std::string &dev_name, const std::string &ip_addr, uint16_t port) {
123 if (!kURPCInited) {
124 // Init URPC if necessary.
125 struct urpc_config urpc_cfg = {};
126 urpc_cfg.mode = URPC_MODE_SERVER_CLIENT;
127 urpc_cfg.model = URPC_THREAD_MODEL_R2C;
128 urpc_cfg.sfeature = URPC_FEATURE_REQ_DISPATCH;
129 urpc_cfg.cfeature = URPC_FEATURE_REQ_DISPATCH;
130 urpc_cfg.worker_num = kServerWorkingThreadNum;
131 urpc_cfg.polling_num = kClientPollingThreadNum;
132 urpc_cfg.transport.dev_name = const_cast<char *>(dev_name.c_str());
133 urpc_cfg.transport.ip_addr = const_cast<char *>(ip_addr.c_str());
134 urpc_cfg.transport.port = port;
135 urpc_cfg.transport.max_sge = 0;
136 urpc_cfg.allocator = nullptr;
137 if (urpc_init_func(&urpc_cfg) != kURPCSuccess) {
138 MS_LOG(WARNING) << "Failed to call urpc_init. Device name: " << dev_name << ", ip address: " << ip_addr
139 << ", port: " << port << ". Please refer to URPC log directory: /var/log/umdk/urpc.";
140 return false;
141 } else {
142 MS_LOG(INFO) << "URPC is successfully initialized. Device name: " << dev_name << ", ip address: " << ip_addr
143 << ", port: " << port;
144 kURPCInited = true;
145 }
146 } else {
147 MS_LOG(INFO)
148 << "This process has already initialized URPC. Please check whether 'URPC is successfully initialized' is "
149 "printed in MindSpore INFO log";
150 }
151 return true;
152 }
153
154 // The map whose key is the server url, value is the URPC session returned by connecting function.
155 // The represents whether this process has already connected to the server.
156 // RDMA clients connecting the same server should reuse URPC session.
157 inline std::map<std::string, urpc_session_t *> kConnectedSession = {};
158
159 // Callback arguments for URPC sending operations.
160 struct req_cb_arg {
161 // The flag represents request is successfully received by peer.
162 bool rsp_received;
163 // Pointer to URPC sending data which needs to be freed after it's sent.
164 void *data_to_free;
165 // URPC allocator which is used to release data.
166 struct urpc_buffer_allocator *allocator;
167 // Variables for synchronizing in async scenario.
168 std::mutex *mtx;
169 std::condition_variable *cv;
170 };
171 } // namespace rpc
172 } // namespace distributed
173 } // namespace mindspore
174 #endif // MINDSPORE_CCSRC_DISTRIBUTED_RPC_RDMA_CONSTANTS_H_
175