• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *  Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  *
10  *  Data Channel Benchmarking tool.
11  *
12  *  Create a server using: ./data_channel_benchmark --server --port 12345
13  *  Start the flow of data from the server to a client using:
14  *  ./data_channel_benchmark --port 12345 --transfer_size 100 --packet_size 8196
15  *  The throughput is reported on the server console.
16  *
17  *  The negotiation does not require a 3rd party server and is done over a gRPC
18  *  transport. No TURN server is configured, so both peers need to be reachable
19  *  using STUN only.
20  */
21 #include <inttypes.h>
22 
23 #include <charconv>
24 
25 #include "absl/cleanup/cleanup.h"
26 #include "absl/flags/flag.h"
27 #include "absl/flags/parse.h"
28 #include "rtc_base/event.h"
29 #include "rtc_base/ssl_adapter.h"
30 #include "rtc_base/thread.h"
31 #include "rtc_tools/data_channel_benchmark/grpc_signaling.h"
32 #include "rtc_tools/data_channel_benchmark/peer_connection_client.h"
33 #include "system_wrappers/include/field_trial.h"
34 
35 ABSL_FLAG(int, verbose, 0, "verbosity level (0-5)");
36 ABSL_FLAG(bool, server, false, "Server mode");
37 ABSL_FLAG(bool, oneshot, true, "Terminate after serving a client");
38 ABSL_FLAG(std::string, address, "localhost", "Connect to server address");
39 ABSL_FLAG(uint16_t, port, 0, "Connect to port (0 for random)");
40 ABSL_FLAG(uint64_t, transfer_size, 2, "Transfer size (MiB)");
41 ABSL_FLAG(uint64_t, packet_size, 256 * 1024, "Packet size");
42 ABSL_FLAG(std::string,
43           force_fieldtrials,
44           "",
45           "Field trials control experimental feature code which can be forced. "
46           "E.g. running with --force_fieldtrials=WebRTC-FooFeature/Enable/"
47           " will assign the group Enable to field trial WebRTC-FooFeature.");
48 
49 struct SetupMessage {
50   size_t packet_size;
51   size_t transfer_size;
52 
ToStringSetupMessage53   std::string ToString() {
54     char buffer[64];
55     rtc::SimpleStringBuilder sb(buffer);
56     sb << packet_size << "," << transfer_size;
57 
58     return sb.str();
59   }
60 
FromStringSetupMessage61   static SetupMessage FromString(absl::string_view sv) {
62     SetupMessage result;
63     auto parameters = rtc::split(sv, ',');
64     std::from_chars(parameters[0].data(),
65                     parameters[0].data() + parameters[0].size(),
66                     result.packet_size, 10);
67     std::from_chars(parameters[1].data(),
68                     parameters[1].data() + parameters[1].size(),
69                     result.transfer_size, 10);
70     return result;
71   }
72 };
73 
74 class DataChannelObserverImpl : public webrtc::DataChannelObserver {
75  public:
DataChannelObserverImpl(webrtc::DataChannelInterface * dc)76   explicit DataChannelObserverImpl(webrtc::DataChannelInterface* dc)
77       : dc_(dc), bytes_received_(0) {}
OnStateChange()78   void OnStateChange() override {
79     RTC_LOG(LS_INFO) << "State changed to " << dc_->state();
80     switch (dc_->state()) {
81       case webrtc::DataChannelInterface::DataState::kOpen:
82         open_event_.Set();
83         break;
84       case webrtc::DataChannelInterface::DataState::kClosed:
85         closed_event_.Set();
86         break;
87       default:
88         break;
89     }
90   }
OnMessage(const webrtc::DataBuffer & buffer)91   void OnMessage(const webrtc::DataBuffer& buffer) override {
92     bytes_received_ += buffer.data.size();
93     if (bytes_received_threshold_ &&
94         bytes_received_ >= bytes_received_threshold_) {
95       bytes_received_event_.Set();
96     }
97 
98     if (setup_message_.empty() && !buffer.binary) {
99       setup_message_.assign(buffer.data.cdata<char>(), buffer.data.size());
100       setup_message_event_.Set();
101     }
102   }
OnBufferedAmountChange(uint64_t sent_data_size)103   void OnBufferedAmountChange(uint64_t sent_data_size) override {
104     if (dc_->buffered_amount() <
105         webrtc::DataChannelInterface::MaxSendQueueSize() / 2)
106       low_buffered_threshold_event_.Set();
107     else
108       low_buffered_threshold_event_.Reset();
109   }
110 
WaitForOpenState()111   bool WaitForOpenState() {
112     return dc_->state() == webrtc::DataChannelInterface::DataState::kOpen ||
113            open_event_.Wait(rtc::Event::kForever);
114   }
WaitForClosedState()115   bool WaitForClosedState() {
116     return dc_->state() == webrtc::DataChannelInterface::DataState::kClosed ||
117            closed_event_.Wait(rtc::Event::kForever);
118   }
119 
120   // Set how many received bytes are required until
121   // WaitForBytesReceivedThreshold return true.
SetBytesReceivedThreshold(uint64_t bytes_received_threshold)122   void SetBytesReceivedThreshold(uint64_t bytes_received_threshold) {
123     bytes_received_threshold_ = bytes_received_threshold;
124     if (bytes_received_ >= bytes_received_threshold_)
125       bytes_received_event_.Set();
126   }
127   // Wait until the received byte count reaches the desired value.
WaitForBytesReceivedThreshold()128   bool WaitForBytesReceivedThreshold() {
129     return (bytes_received_threshold_ &&
130             bytes_received_ >= bytes_received_threshold_) ||
131            bytes_received_event_.Wait(rtc::Event::kForever);
132   }
133 
WaitForLowbufferedThreshold()134   bool WaitForLowbufferedThreshold() {
135     return low_buffered_threshold_event_.Wait(rtc::Event::kForever);
136   }
SetupMessage()137   std::string SetupMessage() { return setup_message_; }
WaitForSetupMessage()138   bool WaitForSetupMessage() {
139     return setup_message_event_.Wait(rtc::Event::kForever);
140   }
141 
142  private:
143   webrtc::DataChannelInterface* dc_;
144   rtc::Event open_event_;
145   rtc::Event closed_event_;
146   rtc::Event bytes_received_event_;
147   absl::optional<uint64_t> bytes_received_threshold_;
148   uint64_t bytes_received_;
149   rtc::Event low_buffered_threshold_event_;
150   std::string setup_message_;
151   rtc::Event setup_message_event_;
152 };
153 
RunServer()154 int RunServer() {
155   bool oneshot = absl::GetFlag(FLAGS_oneshot);
156   uint16_t port = absl::GetFlag(FLAGS_port);
157 
158   auto signaling_thread = rtc::Thread::Create();
159   signaling_thread->Start();
160   {
161     auto factory = webrtc::PeerConnectionClient::CreateDefaultFactory(
162         signaling_thread.get());
163 
164     auto grpc_server = webrtc::GrpcSignalingServerInterface::Create(
165         [factory = rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface>(
166              factory)](webrtc::SignalingInterface* signaling) {
167           webrtc::PeerConnectionClient client(factory.get(), signaling);
168           client.StartPeerConnection();
169           auto peer_connection = client.peerConnection();
170 
171           // Set up the data channel
172           auto dc_or_error =
173               peer_connection->CreateDataChannelOrError("benchmark", nullptr);
174           auto data_channel = dc_or_error.MoveValue();
175           auto data_channel_observer =
176               std::make_unique<DataChannelObserverImpl>(data_channel.get());
177           data_channel->RegisterObserver(data_channel_observer.get());
178           absl::Cleanup unregister_observer(
179               [data_channel] { data_channel->UnregisterObserver(); });
180 
181           // Wait for a first message from the remote peer.
182           // It configures how much data should be sent and how big the packets
183           // should be.
184           // First message is "packet_size,transfer_size".
185           data_channel_observer->WaitForSetupMessage();
186           auto parameters =
187               SetupMessage::FromString(data_channel_observer->SetupMessage());
188 
189           // Wait for the sender and receiver peers to stabilize (send all ACKs)
190           // This makes it easier to isolate the sending part when profiling.
191           absl::SleepFor(absl::Seconds(1));
192 
193           std::string data(parameters.packet_size, '0');
194           size_t remaining_data = parameters.transfer_size;
195 
196           auto begin_time = webrtc::Clock::GetRealTimeClock()->CurrentTime();
197 
198           while (remaining_data) {
199             if (remaining_data < data.size())
200               data.resize(remaining_data);
201 
202             rtc::CopyOnWriteBuffer buffer(data);
203             webrtc::DataBuffer data_buffer(buffer, true);
204             if (!data_channel->Send(data_buffer)) {
205               // If the send() call failed, the buffers are full.
206               // We wait until there's more room.
207               data_channel_observer->WaitForLowbufferedThreshold();
208               continue;
209             }
210             remaining_data -= buffer.size();
211             fprintf(stderr, "Progress: %zu / %zu (%zu%%)\n",
212                     (parameters.transfer_size - remaining_data),
213                     parameters.transfer_size,
214                     (100 - remaining_data * 100 / parameters.transfer_size));
215           }
216 
217           // Receiver signals the data channel close event when it has received
218           // all the data it requested.
219           data_channel_observer->WaitForClosedState();
220 
221           auto end_time = webrtc::Clock::GetRealTimeClock()->CurrentTime();
222           auto duration_ms = (end_time - begin_time).ms<size_t>();
223           double throughput = (parameters.transfer_size / 1024. / 1024.) /
224                               (duration_ms / 1000.);
225           printf("Elapsed time: %zums %gMiB/s\n", duration_ms, throughput);
226         },
227         port, oneshot);
228     grpc_server->Start();
229 
230     printf("Server listening on port %d\n", grpc_server->SelectedPort());
231     grpc_server->Wait();
232   }
233 
234   signaling_thread->Quit();
235   return 0;
236 }
237 
RunClient()238 int RunClient() {
239   uint16_t port = absl::GetFlag(FLAGS_port);
240   std::string server_address = absl::GetFlag(FLAGS_address);
241   size_t transfer_size = absl::GetFlag(FLAGS_transfer_size) * 1024 * 1024;
242   size_t packet_size = absl::GetFlag(FLAGS_packet_size);
243 
244   auto signaling_thread = rtc::Thread::Create();
245   signaling_thread->Start();
246   {
247     auto factory = webrtc::PeerConnectionClient::CreateDefaultFactory(
248         signaling_thread.get());
249     auto grpc_client = webrtc::GrpcSignalingClientInterface::Create(
250         server_address + ":" + std::to_string(port));
251     webrtc::PeerConnectionClient client(factory.get(),
252                                         grpc_client->signaling_client());
253 
254     // Set up the callback to receive the data channel from the sender.
255     rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel;
256     rtc::Event got_data_channel;
257     client.SetOnDataChannel(
258         [&data_channel, &got_data_channel](
259             rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
260           data_channel = channel;
261           got_data_channel.Set();
262         });
263 
264     // Connect to the server.
265     if (!grpc_client->Start()) {
266       fprintf(stderr, "Failed to connect to server\n");
267       return 1;
268     }
269 
270     // Wait for the data channel to be received
271     got_data_channel.Wait(rtc::Event::kForever);
272 
273     // DataChannel needs an observer to start draining the read queue
274     DataChannelObserverImpl observer(data_channel.get());
275     observer.SetBytesReceivedThreshold(transfer_size);
276     data_channel->RegisterObserver(&observer);
277     absl::Cleanup unregister_observer(
278         [data_channel] { data_channel->UnregisterObserver(); });
279 
280     // Send a configuration string to the server to tell it to send
281     // 'packet_size' bytes packets and send a total of 'transfer_size' MB.
282     observer.WaitForOpenState();
283     SetupMessage setup_message = {
284         .packet_size = packet_size,
285         .transfer_size = transfer_size,
286     };
287     if (!data_channel->Send(webrtc::DataBuffer(setup_message.ToString()))) {
288       fprintf(stderr, "Failed to send parameter string\n");
289       return 1;
290     }
291 
292     // Wait until we have received all the data
293     observer.WaitForBytesReceivedThreshold();
294 
295     // Close the data channel, signaling to the server we have received
296     // all the requested data.
297     data_channel->Close();
298   }
299 
300   signaling_thread->Quit();
301 
302   return 0;
303 }
304 
main(int argc,char ** argv)305 int main(int argc, char** argv) {
306   rtc::InitializeSSL();
307   absl::ParseCommandLine(argc, argv);
308 
309   // Make sure that higher severity number means more logs by reversing the
310   // rtc::LoggingSeverity values.
311   auto logging_severity =
312       std::max(0, rtc::LS_NONE - absl::GetFlag(FLAGS_verbose));
313   rtc::LogMessage::LogToDebug(
314       static_cast<rtc::LoggingSeverity>(logging_severity));
315 
316   bool is_server = absl::GetFlag(FLAGS_server);
317   std::string field_trials = absl::GetFlag(FLAGS_force_fieldtrials);
318 
319   webrtc::field_trial::InitFieldTrialsFromString(field_trials.c_str());
320 
321   return is_server ? RunServer() : RunClient();
322 }
323