• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2023 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CCSRC_DISTRIBUTED_RPC_RDMA_CONSTANTS_H_
18 #define MINDSPORE_CCSRC_DISTRIBUTED_RPC_RDMA_CONSTANTS_H_
19 
20 #include <urpc.h>
21 #include <map>
22 #include <mutex>
23 #include <string>
24 #include <condition_variable>
25 
26 #include "utils/dlopen_macro.h"
27 #include "include/backend/distributed/constants.h"
28 
29 namespace mindspore {
30 namespace distributed {
31 namespace rpc {
32 // Parse url string with format ip:port.
ParseURL(const std::string & url,std::string * ip,uint16_t * port)33 inline bool ParseURL(const std::string &url, std::string *ip, uint16_t *port) {
34   if (ip == nullptr || port == nullptr) {
35     MS_LOG(ERROR) << "Output ip or port is nullptr";
36     return false;
37   }
38 
39   size_t index1 = url.find(URL_PROTOCOL_IP_SEPARATOR);
40   if (index1 == std::string::npos) {
41     index1 = 0;
42   } else {
43     index1 = index1 + sizeof(URL_PROTOCOL_IP_SEPARATOR) - 1;
44   }
45 
46   size_t index2 = url.rfind(':');
47   if (index2 == std::string::npos) {
48     MS_LOG(ERROR) << "Couldn't find the character colon.";
49     return false;
50   }
51 
52   *ip = url.substr(index1, index2 - index1);
53   if (ip->empty()) {
54     MS_LOG(ERROR) << "Couldn't find ip in url: " << url.c_str();
55     return false;
56   }
57 
58   size_t idx = index2 + sizeof(URL_IP_PORT_SEPARATOR) - 1;
59   if (idx >= url.size()) {
60     MS_LOG(ERROR) << "The size of url is invalid";
61     return false;
62   }
63   try {
64     *port = static_cast<uint16_t>(std::stoul(url.substr(idx)));
65   } catch (const std::system_error &e) {
66     MS_LOG(ERROR) << "Couldn't find port in url: " << url.c_str();
67     return false;
68   }
69 
70   MS_LOG(INFO) << "Parse URL for " << url << ". IP: " << *ip << ". Port: " << *port;
71   return true;
72 }
73 
LoadURPC()74 inline void *LoadURPC() {
75   static void *urpc_handle = nullptr;
76   if (urpc_handle == nullptr) {
77     urpc_handle = dlopen("liburpc.so", RTLD_LAZY | RTLD_LOCAL);
78     if (urpc_handle == nullptr) {
79       auto err = GetDlErrorMsg();
80       MS_LOG(EXCEPTION) << "dlopen liburpc.so failed. Error message: " << err;
81     }
82   }
83   return urpc_handle;
84 }
85 inline const void *kURPCHandle = LoadURPC();
86 
87 #define REG_URPC_METHOD(name, return_type, ...)                 \
88   constexpr const char *k##name##Name = #name;                  \
89   using name##FunObj = std::function<return_type(__VA_ARGS__)>; \
90   using name##FunPtr = return_type (*)(__VA_ARGS__);            \
91   const name##FunPtr name##_func = DlsymFuncObj(name, const_cast<void *>(kURPCHandle));
92 
93 // The symbols of liburpc.so to be dynamically loaded.
94 REG_URPC_METHOD(urpc_init, int, struct urpc_config *)
95 REG_URPC_METHOD(urpc_uninit, void)
96 REG_URPC_METHOD(urpc_connect, urpc_session_t *, const char *, uint16_t, urma_jfs_t *)
97 REG_URPC_METHOD(urpc_close, void, urpc_session_t *)
98 REG_URPC_METHOD(urpc_register_memory, int, void *, int)
99 REG_URPC_METHOD(urpc_register_serdes, int, const char *, const urpc_serdes_t *, urpc_tx_cb_t, void *)
100 REG_URPC_METHOD(urpc_register_handler, int, urpc_handler_info_t *, uint32_t *)
101 REG_URPC_METHOD(urpc_register_raw_handler_explicit, int, urpc_raw_handler_t, void *, urpc_tx_cb_t, void *, uint32_t)
102 REG_URPC_METHOD(urpc_unregister_handler, void, const char *, uint32_t)
103 REG_URPC_METHOD(urpc_query_capability, int, struct urpc_cap *)
104 REG_URPC_METHOD(urpc_send_request, int, urpc_session_t *, struct urpc_send_wr *, struct urpc_send_option *)
105 REG_URPC_METHOD(urpc_call, int, urpc_session_t *, const char *, void *, void **, struct urpc_send_option *)
106 REG_URPC_METHOD(urpc_call_sgl, int, urpc_session_t *, const char *, void *, void **, struct urpc_send_option *)
107 REG_URPC_METHOD(urpc_get_default_allocator, struct urpc_buffer_allocator *)
108 
109 constexpr int kURPCSuccess = 0;
110 constexpr uint32_t kInterProcessDataHandleID = 0;
111 
112 constexpr uint32_t kServerWorkingThreadNum = 4;
113 constexpr uint32_t kClientPollingThreadNum = 4;
114 
115 // Set URPC buffer to 1GB.
116 inline uint32_t kLargeBuffSizes[1] = {1 << 30};
117 
118 // URPC is glabally unique because for each process it could be only initialized once.
119 // We need this flag to judge whether URPC is initialized.
120 inline bool kURPCInited = false;
121 // Initialize urpc configuration according to dev_name, ip_addr and port.
InitializeURPC(const std::string & dev_name,const std::string & ip_addr,uint16_t port)122 inline bool InitializeURPC(const std::string &dev_name, const std::string &ip_addr, uint16_t port) {
123   if (!kURPCInited) {
124     // Init URPC if necessary.
125     struct urpc_config urpc_cfg = {};
126     urpc_cfg.mode = URPC_MODE_SERVER_CLIENT;
127     urpc_cfg.model = URPC_THREAD_MODEL_R2C;
128     urpc_cfg.sfeature = URPC_FEATURE_REQ_DISPATCH;
129     urpc_cfg.cfeature = URPC_FEATURE_REQ_DISPATCH;
130     urpc_cfg.worker_num = kServerWorkingThreadNum;
131     urpc_cfg.polling_num = kClientPollingThreadNum;
132     urpc_cfg.transport.dev_name = const_cast<char *>(dev_name.c_str());
133     urpc_cfg.transport.ip_addr = const_cast<char *>(ip_addr.c_str());
134     urpc_cfg.transport.port = port;
135     urpc_cfg.transport.max_sge = 0;
136     urpc_cfg.allocator = nullptr;
137     if (urpc_init_func(&urpc_cfg) != kURPCSuccess) {
138       MS_LOG(WARNING) << "Failed to call urpc_init. Device name: " << dev_name << ", ip address: " << ip_addr
139                       << ", port: " << port << ". Please refer to URPC log directory: /var/log/umdk/urpc.";
140       return false;
141     } else {
142       MS_LOG(INFO) << "URPC is successfully initialized. Device name: " << dev_name << ", ip address: " << ip_addr
143                    << ", port: " << port;
144       kURPCInited = true;
145     }
146   } else {
147     MS_LOG(INFO)
148       << "This process has already initialized URPC. Please check whether 'URPC is successfully initialized' is "
149          "printed in MindSpore INFO log";
150   }
151   return true;
152 }
153 
154 // The map whose key is the server url, value is the URPC session returned by connecting function.
155 // The represents whether this process has already connected to the server.
156 // RDMA clients connecting the same server should reuse URPC session.
157 inline std::map<std::string, urpc_session_t *> kConnectedSession = {};
158 
159 // Callback arguments for URPC sending operations.
160 struct req_cb_arg {
161   // The flag represents request is successfully received by peer.
162   bool rsp_received;
163   // Pointer to URPC sending data which needs to be freed after it's sent.
164   void *data_to_free;
165   // URPC allocator which is used to release data.
166   struct urpc_buffer_allocator *allocator;
167   // Variables for synchronizing in async scenario.
168   std::mutex *mtx;
169   std::condition_variable *cv;
170 };
171 }  // namespace rpc
172 }  // namespace distributed
173 }  // namespace mindspore
174 #endif  // MINDSPORE_CCSRC_DISTRIBUTED_RPC_RDMA_CONSTANTS_H_
175