1 /**
2 * Copyright 2021-2022 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #ifndef MINDSPORE_CCSRC_DISTRIBUTED_RPC_TCP_CONSTANTS_H_
18 #define MINDSPORE_CCSRC_DISTRIBUTED_RPC_TCP_CONSTANTS_H_
19
20 #include <arpa/inet.h>
21 #include <string>
22 #include <csignal>
23 #include <queue>
24 #include <memory>
25 #include <functional>
26
27 #include "include/backend/distributed/constants.h"
28
29 namespace mindspore {
30 namespace distributed {
31 namespace rpc {
32 using DeleteCallBack = void (*)(const std::string &from, const std::string &to);
33 using ConnectionCallBack = std::function<void(void *connection)>;
34
35 constexpr int SEND_MSG_IO_VEC_LEN = 5;
36 constexpr int RECV_MSG_IO_VEC_LEN = 4;
37
38 constexpr unsigned int MAGICID_LEN = 4;
39 constexpr int SENDMSG_QUEUELEN = 1024;
40 constexpr int SENDMSG_DROPED = -1;
41
42 constexpr size_t MAX_KMSG_FROM_LEN = 1024;
43 constexpr size_t MAX_KMSG_TO_LEN = 1024;
44 constexpr size_t MAX_KMSG_NAME_LEN = 1024;
45 constexpr size_t MAX_KMSG_BODY_LEN = 1073741824;
46
47 enum ParseType { kTcpMsg = 1, kHttpReq, kHttpRsp, kUnknown };
48 enum State { kMsgHeader, kBody };
49 enum ConnectionState { kInit = 1, kConnecting, kConnected, kDisconnecting, kClose };
50 enum ConnectionType { kTcp = 1, kSSL };
51 enum ConnectionPriority { kPriorityLow = 1, kPriorityHigh };
52
53 static const int g_httpKmsgEnable = -1;
54
55 using IntTypeMetrics = std::queue<int>;
56 using StringTypeMetrics = std::queue<std::string>;
57
58 static MessageBase *const NULL_MSG = nullptr;
59
60 // Server socket listen backlog.
61 static const int SOCKET_LISTEN_BACKLOG = 2048;
62
63 static const int SOCKET_KEEPALIVE = 1;
64
65 // Send first probe after `interval' seconds.
66 static const int SOCKET_KEEPIDLE = 600;
67
68 // Send next probes after the specified interval.
69 static const int SOCKET_KEEPINTERVAL = 10;
70
71 // Consider the socket in error state after we send three ACK
72 // probes without getting a reply.
73 static const int SOCKET_KEEPCOUNT = 30;
74
75 static const char RPC_MAGICID[] = "RPC0";
76 static const char TCP_RECV_EVLOOP_THREADNAME[] = "RECV_EVENT_LOOP";
77 static const char TCP_SEND_EVLOOP_THREADNAME[] = "SEND_EVENT_LOOP";
78
79 constexpr int RPC_OK = 0;
80 constexpr int RPC_ERROR = -1;
81
82 constexpr int IO_RW_OK = 1;
83 constexpr int IO_RW_ERROR = -1;
84
85 constexpr int IP_LEN_MAX = 128;
86
87 // The timeout(second) window for compute graph nodes to receive message from other nodes. Default: 600 seconds.
88 constexpr char kEnvReceiveMsgTimeOut[] = "MS_RECEIVE_MSG_TIMEOUT";
89 static const size_t kDefaultReceiveMsgTimeOut = 300;
90
91 // Kill the process for safe exiting.
KillProcess(const std::string & ret)92 inline void KillProcess(const std::string &ret) {
93 MS_LOG(ERROR) << ret;
94 (void)raise(SIGKILL);
95 }
96
97 /*
98 * The MessageHeader contains the stats info about the message body.
99 */
100 struct MessageHeader {
MessageHeaderMessageHeader101 MessageHeader() {
102 for (unsigned int i = 0; i < MAGICID_LEN; ++i) {
103 if (i < sizeof(RPC_MAGICID) - 1) {
104 magic[i] = RPC_MAGICID[i];
105 } else {
106 magic[i] = '\0';
107 }
108 }
109 }
110
111 char magic[MAGICID_LEN];
112 uint32_t name_len{0};
113 uint32_t to_len{0};
114 uint32_t from_len{0};
115 uint32_t body_len{0};
116 };
117
118 // Fill the message header using the given message.
FillMessageHeader(const MessageBase & message,MessageHeader * header)119 __attribute__((unused)) static void FillMessageHeader(const MessageBase &message, MessageHeader *header) {
120 std::string send_to = message.to;
121 std::string send_from = message.from;
122 header->name_len = htonl(static_cast<uint32_t>(message.name.size()));
123 header->to_len = htonl(static_cast<uint32_t>(send_to.size()));
124 header->from_len = htonl(static_cast<uint32_t>(send_from.size()));
125 if (message.data != nullptr) {
126 header->body_len = htonl(static_cast<uint32_t>(message.size));
127 } else {
128 header->body_len = htonl(static_cast<uint32_t>(message.body.size()));
129 }
130 }
131
132 // Compute and return the byte size of the whole message.
GetMessageSize(const MessageBase & message)133 __attribute__((unused)) static size_t GetMessageSize(const MessageBase &message) {
134 std::string send_to = message.to;
135 std::string send_from = message.from;
136 size_t size = message.name.size() + send_to.size() + send_from.size() + message.body.size() + sizeof(MessageHeader);
137 return size;
138 }
139
140 #define RPC_ASSERT(expression) \
141 do { \
142 if (!(expression)) { \
143 std::stringstream ss; \
144 ss << "Assertion failed: " << #expression << ", file: " << __FILE__ << ", line: " << __LINE__; \
145 KillProcess(ss.str()); \
146 } \
147 } while (0)
148
149 #define RPC_EXIT(ret) \
150 do { \
151 std::stringstream ss; \
152 ss << (ret) << " ( file: " << __FILE__ << ", line: " << __LINE__ << " )."; \
153 KillProcess(ss.str()); \
154 } while (0)
155 } // namespace rpc
156 } // namespace distributed
157 } // namespace mindspore
158 #endif
159