1 /*
2 * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 *
10 * Data Channel Benchmarking tool.
11 *
12 * Create a server using: ./data_channel_benchmark --server --port 12345
13 * Start the flow of data from the server to a client using:
14 * ./data_channel_benchmark --port 12345 --transfer_size 100 --packet_size 8196
15 * The throughput is reported on the server console.
16 *
17 * The negotiation does not require a 3rd party server and is done over a gRPC
18 * transport. No TURN server is configured, so both peers need to be reachable
19 * using STUN only.
20 */
21 #include <inttypes.h>
22
23 #include <charconv>
24
25 #include "absl/cleanup/cleanup.h"
26 #include "absl/flags/flag.h"
27 #include "absl/flags/parse.h"
28 #include "rtc_base/event.h"
29 #include "rtc_base/ssl_adapter.h"
30 #include "rtc_base/thread.h"
31 #include "rtc_tools/data_channel_benchmark/grpc_signaling.h"
32 #include "rtc_tools/data_channel_benchmark/peer_connection_client.h"
33 #include "system_wrappers/include/field_trial.h"
34
35 ABSL_FLAG(int, verbose, 0, "verbosity level (0-5)");
36 ABSL_FLAG(bool, server, false, "Server mode");
37 ABSL_FLAG(bool, oneshot, true, "Terminate after serving a client");
38 ABSL_FLAG(std::string, address, "localhost", "Connect to server address");
39 ABSL_FLAG(uint16_t, port, 0, "Connect to port (0 for random)");
40 ABSL_FLAG(uint64_t, transfer_size, 2, "Transfer size (MiB)");
41 ABSL_FLAG(uint64_t, packet_size, 256 * 1024, "Packet size");
42 ABSL_FLAG(std::string,
43 force_fieldtrials,
44 "",
45 "Field trials control experimental feature code which can be forced. "
46 "E.g. running with --force_fieldtrials=WebRTC-FooFeature/Enable/"
47 " will assign the group Enable to field trial WebRTC-FooFeature.");
48
49 struct SetupMessage {
50 size_t packet_size;
51 size_t transfer_size;
52
ToStringSetupMessage53 std::string ToString() {
54 char buffer[64];
55 rtc::SimpleStringBuilder sb(buffer);
56 sb << packet_size << "," << transfer_size;
57
58 return sb.str();
59 }
60
FromStringSetupMessage61 static SetupMessage FromString(absl::string_view sv) {
62 SetupMessage result;
63 auto parameters = rtc::split(sv, ',');
64 std::from_chars(parameters[0].data(),
65 parameters[0].data() + parameters[0].size(),
66 result.packet_size, 10);
67 std::from_chars(parameters[1].data(),
68 parameters[1].data() + parameters[1].size(),
69 result.transfer_size, 10);
70 return result;
71 }
72 };
73
74 class DataChannelObserverImpl : public webrtc::DataChannelObserver {
75 public:
DataChannelObserverImpl(webrtc::DataChannelInterface * dc)76 explicit DataChannelObserverImpl(webrtc::DataChannelInterface* dc)
77 : dc_(dc), bytes_received_(0) {}
OnStateChange()78 void OnStateChange() override {
79 RTC_LOG(LS_INFO) << "State changed to " << dc_->state();
80 switch (dc_->state()) {
81 case webrtc::DataChannelInterface::DataState::kOpen:
82 open_event_.Set();
83 break;
84 case webrtc::DataChannelInterface::DataState::kClosed:
85 closed_event_.Set();
86 break;
87 default:
88 break;
89 }
90 }
OnMessage(const webrtc::DataBuffer & buffer)91 void OnMessage(const webrtc::DataBuffer& buffer) override {
92 bytes_received_ += buffer.data.size();
93 if (bytes_received_threshold_ &&
94 bytes_received_ >= bytes_received_threshold_) {
95 bytes_received_event_.Set();
96 }
97
98 if (setup_message_.empty() && !buffer.binary) {
99 setup_message_.assign(buffer.data.cdata<char>(), buffer.data.size());
100 setup_message_event_.Set();
101 }
102 }
OnBufferedAmountChange(uint64_t sent_data_size)103 void OnBufferedAmountChange(uint64_t sent_data_size) override {
104 if (dc_->buffered_amount() <
105 webrtc::DataChannelInterface::MaxSendQueueSize() / 2)
106 low_buffered_threshold_event_.Set();
107 else
108 low_buffered_threshold_event_.Reset();
109 }
110
WaitForOpenState()111 bool WaitForOpenState() {
112 return dc_->state() == webrtc::DataChannelInterface::DataState::kOpen ||
113 open_event_.Wait(rtc::Event::kForever);
114 }
WaitForClosedState()115 bool WaitForClosedState() {
116 return dc_->state() == webrtc::DataChannelInterface::DataState::kClosed ||
117 closed_event_.Wait(rtc::Event::kForever);
118 }
119
120 // Set how many received bytes are required until
121 // WaitForBytesReceivedThreshold return true.
SetBytesReceivedThreshold(uint64_t bytes_received_threshold)122 void SetBytesReceivedThreshold(uint64_t bytes_received_threshold) {
123 bytes_received_threshold_ = bytes_received_threshold;
124 if (bytes_received_ >= bytes_received_threshold_)
125 bytes_received_event_.Set();
126 }
127 // Wait until the received byte count reaches the desired value.
WaitForBytesReceivedThreshold()128 bool WaitForBytesReceivedThreshold() {
129 return (bytes_received_threshold_ &&
130 bytes_received_ >= bytes_received_threshold_) ||
131 bytes_received_event_.Wait(rtc::Event::kForever);
132 }
133
WaitForLowbufferedThreshold()134 bool WaitForLowbufferedThreshold() {
135 return low_buffered_threshold_event_.Wait(rtc::Event::kForever);
136 }
SetupMessage()137 std::string SetupMessage() { return setup_message_; }
WaitForSetupMessage()138 bool WaitForSetupMessage() {
139 return setup_message_event_.Wait(rtc::Event::kForever);
140 }
141
142 private:
143 webrtc::DataChannelInterface* dc_;
144 rtc::Event open_event_;
145 rtc::Event closed_event_;
146 rtc::Event bytes_received_event_;
147 absl::optional<uint64_t> bytes_received_threshold_;
148 uint64_t bytes_received_;
149 rtc::Event low_buffered_threshold_event_;
150 std::string setup_message_;
151 rtc::Event setup_message_event_;
152 };
153
RunServer()154 int RunServer() {
155 bool oneshot = absl::GetFlag(FLAGS_oneshot);
156 uint16_t port = absl::GetFlag(FLAGS_port);
157
158 auto signaling_thread = rtc::Thread::Create();
159 signaling_thread->Start();
160 {
161 auto factory = webrtc::PeerConnectionClient::CreateDefaultFactory(
162 signaling_thread.get());
163
164 auto grpc_server = webrtc::GrpcSignalingServerInterface::Create(
165 [factory = rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface>(
166 factory)](webrtc::SignalingInterface* signaling) {
167 webrtc::PeerConnectionClient client(factory.get(), signaling);
168 client.StartPeerConnection();
169 auto peer_connection = client.peerConnection();
170
171 // Set up the data channel
172 auto dc_or_error =
173 peer_connection->CreateDataChannelOrError("benchmark", nullptr);
174 auto data_channel = dc_or_error.MoveValue();
175 auto data_channel_observer =
176 std::make_unique<DataChannelObserverImpl>(data_channel.get());
177 data_channel->RegisterObserver(data_channel_observer.get());
178 absl::Cleanup unregister_observer(
179 [data_channel] { data_channel->UnregisterObserver(); });
180
181 // Wait for a first message from the remote peer.
182 // It configures how much data should be sent and how big the packets
183 // should be.
184 // First message is "packet_size,transfer_size".
185 data_channel_observer->WaitForSetupMessage();
186 auto parameters =
187 SetupMessage::FromString(data_channel_observer->SetupMessage());
188
189 // Wait for the sender and receiver peers to stabilize (send all ACKs)
190 // This makes it easier to isolate the sending part when profiling.
191 absl::SleepFor(absl::Seconds(1));
192
193 std::string data(parameters.packet_size, '0');
194 size_t remaining_data = parameters.transfer_size;
195
196 auto begin_time = webrtc::Clock::GetRealTimeClock()->CurrentTime();
197
198 while (remaining_data) {
199 if (remaining_data < data.size())
200 data.resize(remaining_data);
201
202 rtc::CopyOnWriteBuffer buffer(data);
203 webrtc::DataBuffer data_buffer(buffer, true);
204 if (!data_channel->Send(data_buffer)) {
205 // If the send() call failed, the buffers are full.
206 // We wait until there's more room.
207 data_channel_observer->WaitForLowbufferedThreshold();
208 continue;
209 }
210 remaining_data -= buffer.size();
211 fprintf(stderr, "Progress: %zu / %zu (%zu%%)\n",
212 (parameters.transfer_size - remaining_data),
213 parameters.transfer_size,
214 (100 - remaining_data * 100 / parameters.transfer_size));
215 }
216
217 // Receiver signals the data channel close event when it has received
218 // all the data it requested.
219 data_channel_observer->WaitForClosedState();
220
221 auto end_time = webrtc::Clock::GetRealTimeClock()->CurrentTime();
222 auto duration_ms = (end_time - begin_time).ms<size_t>();
223 double throughput = (parameters.transfer_size / 1024. / 1024.) /
224 (duration_ms / 1000.);
225 printf("Elapsed time: %zums %gMiB/s\n", duration_ms, throughput);
226 },
227 port, oneshot);
228 grpc_server->Start();
229
230 printf("Server listening on port %d\n", grpc_server->SelectedPort());
231 grpc_server->Wait();
232 }
233
234 signaling_thread->Quit();
235 return 0;
236 }
237
RunClient()238 int RunClient() {
239 uint16_t port = absl::GetFlag(FLAGS_port);
240 std::string server_address = absl::GetFlag(FLAGS_address);
241 size_t transfer_size = absl::GetFlag(FLAGS_transfer_size) * 1024 * 1024;
242 size_t packet_size = absl::GetFlag(FLAGS_packet_size);
243
244 auto signaling_thread = rtc::Thread::Create();
245 signaling_thread->Start();
246 {
247 auto factory = webrtc::PeerConnectionClient::CreateDefaultFactory(
248 signaling_thread.get());
249 auto grpc_client = webrtc::GrpcSignalingClientInterface::Create(
250 server_address + ":" + std::to_string(port));
251 webrtc::PeerConnectionClient client(factory.get(),
252 grpc_client->signaling_client());
253
254 // Set up the callback to receive the data channel from the sender.
255 rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel;
256 rtc::Event got_data_channel;
257 client.SetOnDataChannel(
258 [&data_channel, &got_data_channel](
259 rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
260 data_channel = channel;
261 got_data_channel.Set();
262 });
263
264 // Connect to the server.
265 if (!grpc_client->Start()) {
266 fprintf(stderr, "Failed to connect to server\n");
267 return 1;
268 }
269
270 // Wait for the data channel to be received
271 got_data_channel.Wait(rtc::Event::kForever);
272
273 // DataChannel needs an observer to start draining the read queue
274 DataChannelObserverImpl observer(data_channel.get());
275 observer.SetBytesReceivedThreshold(transfer_size);
276 data_channel->RegisterObserver(&observer);
277 absl::Cleanup unregister_observer(
278 [data_channel] { data_channel->UnregisterObserver(); });
279
280 // Send a configuration string to the server to tell it to send
281 // 'packet_size' bytes packets and send a total of 'transfer_size' MB.
282 observer.WaitForOpenState();
283 SetupMessage setup_message = {
284 .packet_size = packet_size,
285 .transfer_size = transfer_size,
286 };
287 if (!data_channel->Send(webrtc::DataBuffer(setup_message.ToString()))) {
288 fprintf(stderr, "Failed to send parameter string\n");
289 return 1;
290 }
291
292 // Wait until we have received all the data
293 observer.WaitForBytesReceivedThreshold();
294
295 // Close the data channel, signaling to the server we have received
296 // all the requested data.
297 data_channel->Close();
298 }
299
300 signaling_thread->Quit();
301
302 return 0;
303 }
304
main(int argc,char ** argv)305 int main(int argc, char** argv) {
306 rtc::InitializeSSL();
307 absl::ParseCommandLine(argc, argv);
308
309 // Make sure that higher severity number means more logs by reversing the
310 // rtc::LoggingSeverity values.
311 auto logging_severity =
312 std::max(0, rtc::LS_NONE - absl::GetFlag(FLAGS_verbose));
313 rtc::LogMessage::LogToDebug(
314 static_cast<rtc::LoggingSeverity>(logging_severity));
315
316 bool is_server = absl::GetFlag(FLAGS_server);
317 std::string field_trials = absl::GetFlag(FLAGS_force_fieldtrials);
318
319 webrtc::field_trial::InitFieldTrialsFromString(field_trials.c_str());
320
321 return is_server ? RunServer() : RunClient();
322 }
323