• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021-2022 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CCSRC_DISTRIBUTED_RPC_TCP_CONSTANTS_H_
18 #define MINDSPORE_CCSRC_DISTRIBUTED_RPC_TCP_CONSTANTS_H_
19 
20 #include <arpa/inet.h>
21 #include <string>
22 #include <csignal>
23 #include <queue>
24 #include <memory>
25 #include <functional>
26 
27 #include "include/backend/distributed/constants.h"
28 
29 namespace mindspore {
30 namespace distributed {
31 namespace rpc {
32 using DeleteCallBack = void (*)(const std::string &from, const std::string &to);
33 using ConnectionCallBack = std::function<void(void *connection)>;
34 
35 constexpr int SEND_MSG_IO_VEC_LEN = 5;
36 constexpr int RECV_MSG_IO_VEC_LEN = 4;
37 
38 constexpr unsigned int MAGICID_LEN = 4;
39 constexpr int SENDMSG_QUEUELEN = 1024;
40 constexpr int SENDMSG_DROPED = -1;
41 
42 constexpr size_t MAX_KMSG_FROM_LEN = 1024;
43 constexpr size_t MAX_KMSG_TO_LEN = 1024;
44 constexpr size_t MAX_KMSG_NAME_LEN = 1024;
45 constexpr size_t MAX_KMSG_BODY_LEN = 1073741824;
46 
47 enum ParseType { kTcpMsg = 1, kHttpReq, kHttpRsp, kUnknown };
48 enum State { kMsgHeader, kBody };
49 enum ConnectionState { kInit = 1, kConnecting, kConnected, kDisconnecting, kClose };
50 enum ConnectionType { kTcp = 1, kSSL };
51 enum ConnectionPriority { kPriorityLow = 1, kPriorityHigh };
52 
53 static const int g_httpKmsgEnable = -1;
54 
55 using IntTypeMetrics = std::queue<int>;
56 using StringTypeMetrics = std::queue<std::string>;
57 
58 static MessageBase *const NULL_MSG = nullptr;
59 
60 // Server socket listen backlog.
61 static const int SOCKET_LISTEN_BACKLOG = 2048;
62 
63 static const int SOCKET_KEEPALIVE = 1;
64 
65 // Send first probe after `interval' seconds.
66 static const int SOCKET_KEEPIDLE = 600;
67 
68 // Send next probes after the specified interval.
69 static const int SOCKET_KEEPINTERVAL = 10;
70 
71 // Consider the socket in error state after we send three ACK
72 // probes without getting a reply.
73 static const int SOCKET_KEEPCOUNT = 30;
74 
75 static const char RPC_MAGICID[] = "RPC0";
76 static const char TCP_RECV_EVLOOP_THREADNAME[] = "RECV_EVENT_LOOP";
77 static const char TCP_SEND_EVLOOP_THREADNAME[] = "SEND_EVENT_LOOP";
78 
79 constexpr int RPC_OK = 0;
80 constexpr int RPC_ERROR = -1;
81 
82 constexpr int IO_RW_OK = 1;
83 constexpr int IO_RW_ERROR = -1;
84 
85 constexpr int IP_LEN_MAX = 128;
86 
87 // The timeout(second) window for compute graph nodes to receive message from other nodes. Default: 600 seconds.
88 constexpr char kEnvReceiveMsgTimeOut[] = "MS_RECEIVE_MSG_TIMEOUT";
89 static const size_t kDefaultReceiveMsgTimeOut = 300;
90 
91 // Kill the process for safe exiting.
KillProcess(const std::string & ret)92 inline void KillProcess(const std::string &ret) {
93   MS_LOG(ERROR) << ret;
94   (void)raise(SIGKILL);
95 }
96 
97 /*
98  * The MessageHeader contains the stats info about the message body.
99  */
100 struct MessageHeader {
MessageHeaderMessageHeader101   MessageHeader() {
102     for (unsigned int i = 0; i < MAGICID_LEN; ++i) {
103       if (i < sizeof(RPC_MAGICID) - 1) {
104         magic[i] = RPC_MAGICID[i];
105       } else {
106         magic[i] = '\0';
107       }
108     }
109   }
110 
111   char magic[MAGICID_LEN];
112   uint32_t name_len{0};
113   uint32_t to_len{0};
114   uint32_t from_len{0};
115   uint32_t body_len{0};
116 };
117 
118 // Fill the message header using the given message.
FillMessageHeader(const MessageBase & message,MessageHeader * header)119 __attribute__((unused)) static void FillMessageHeader(const MessageBase &message, MessageHeader *header) {
120   std::string send_to = message.to;
121   std::string send_from = message.from;
122   header->name_len = htonl(static_cast<uint32_t>(message.name.size()));
123   header->to_len = htonl(static_cast<uint32_t>(send_to.size()));
124   header->from_len = htonl(static_cast<uint32_t>(send_from.size()));
125   if (message.data != nullptr) {
126     header->body_len = htonl(static_cast<uint32_t>(message.size));
127   } else {
128     header->body_len = htonl(static_cast<uint32_t>(message.body.size()));
129   }
130 }
131 
132 // Compute and return the byte size of the whole message.
GetMessageSize(const MessageBase & message)133 __attribute__((unused)) static size_t GetMessageSize(const MessageBase &message) {
134   std::string send_to = message.to;
135   std::string send_from = message.from;
136   size_t size = message.name.size() + send_to.size() + send_from.size() + message.body.size() + sizeof(MessageHeader);
137   return size;
138 }
139 
140 #define RPC_ASSERT(expression)                                                                       \
141   do {                                                                                               \
142     if (!(expression)) {                                                                             \
143       std::stringstream ss;                                                                          \
144       ss << "Assertion failed: " << #expression << ", file: " << __FILE__ << ", line: " << __LINE__; \
145       KillProcess(ss.str());                                                                         \
146     }                                                                                                \
147   } while (0)
148 
149 #define RPC_EXIT(ret)                                                           \
150   do {                                                                          \
151     std::stringstream ss;                                                       \
152     ss << (ret) << "  ( file: " << __FILE__ << ", line: " << __LINE__ << " )."; \
153     KillProcess(ss.str());                                                      \
154   } while (0)
155 }  // namespace rpc
156 }  // namespace distributed
157 }  // namespace mindspore
158 #endif
159