1 /*
2 * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10 #include "test/peer_scenario/peer_scenario_client.h"
11
12 #include <limits>
13 #include <memory>
14 #include <utility>
15
16 #include "absl/memory/memory.h"
17 #include "api/audio_codecs/builtin_audio_decoder_factory.h"
18 #include "api/audio_codecs/builtin_audio_encoder_factory.h"
19 #include "api/rtc_event_log/rtc_event_log_factory.h"
20 #include "api/task_queue/default_task_queue_factory.h"
21 #include "api/test/create_time_controller.h"
22 #include "api/transport/field_trial_based_config.h"
23 #include "api/video_codecs/builtin_video_decoder_factory.h"
24 #include "api/video_codecs/builtin_video_encoder_factory.h"
25 #include "media/engine/webrtc_media_engine.h"
26 #include "modules/audio_device/include/test_audio_device.h"
27 #include "p2p/client/basic_port_allocator.h"
28 #include "test/fake_decoder.h"
29 #include "test/fake_vp8_encoder.h"
30 #include "test/frame_generator_capturer.h"
31
32 namespace webrtc {
33 namespace test {
34
35 namespace {
36
37 constexpr char kCommonStreamId[] = "stream_id";
38
CreateEndpoints(NetworkEmulationManager * net,std::map<int,EmulatedEndpointConfig> endpoint_configs)39 std::map<int, EmulatedEndpoint*> CreateEndpoints(
40 NetworkEmulationManager* net,
41 std::map<int, EmulatedEndpointConfig> endpoint_configs) {
42 std::map<int, EmulatedEndpoint*> endpoints;
43 for (const auto& kv : endpoint_configs)
44 endpoints[kv.first] = net->CreateEndpoint(kv.second);
45 return endpoints;
46 }
47
48 class LambdaPeerConnectionObserver final : public PeerConnectionObserver {
49 public:
LambdaPeerConnectionObserver(PeerScenarioClient::CallbackHandlers * handlers)50 explicit LambdaPeerConnectionObserver(
51 PeerScenarioClient::CallbackHandlers* handlers)
52 : handlers_(handlers) {}
OnSignalingChange(PeerConnectionInterface::SignalingState new_state)53 void OnSignalingChange(
54 PeerConnectionInterface::SignalingState new_state) override {
55 for (const auto& handler : handlers_->on_signaling_change)
56 handler(new_state);
57 }
OnDataChannel(rtc::scoped_refptr<DataChannelInterface> data_channel)58 void OnDataChannel(
59 rtc::scoped_refptr<DataChannelInterface> data_channel) override {
60 for (const auto& handler : handlers_->on_data_channel)
61 handler(data_channel);
62 }
OnRenegotiationNeeded()63 void OnRenegotiationNeeded() override {
64 for (const auto& handler : handlers_->on_renegotiation_needed)
65 handler();
66 }
OnStandardizedIceConnectionChange(PeerConnectionInterface::IceConnectionState new_state)67 void OnStandardizedIceConnectionChange(
68 PeerConnectionInterface::IceConnectionState new_state) override {
69 for (const auto& handler : handlers_->on_standardized_ice_connection_change)
70 handler(new_state);
71 }
OnConnectionChange(PeerConnectionInterface::PeerConnectionState new_state)72 void OnConnectionChange(
73 PeerConnectionInterface::PeerConnectionState new_state) override {
74 for (const auto& handler : handlers_->on_connection_change)
75 handler(new_state);
76 }
OnIceGatheringChange(PeerConnectionInterface::IceGatheringState new_state)77 void OnIceGatheringChange(
78 PeerConnectionInterface::IceGatheringState new_state) override {
79 for (const auto& handler : handlers_->on_ice_gathering_change)
80 handler(new_state);
81 }
OnIceCandidate(const IceCandidateInterface * candidate)82 void OnIceCandidate(const IceCandidateInterface* candidate) override {
83 for (const auto& handler : handlers_->on_ice_candidate)
84 handler(candidate);
85 }
OnIceCandidateError(const std::string & address,int port,const std::string & url,int error_code,const std::string & error_text)86 void OnIceCandidateError(const std::string& address,
87 int port,
88 const std::string& url,
89 int error_code,
90 const std::string& error_text) override {
91 for (const auto& handler : handlers_->on_ice_candidate_error)
92 handler(address, port, url, error_code, error_text);
93 }
OnIceCandidatesRemoved(const std::vector<cricket::Candidate> & candidates)94 void OnIceCandidatesRemoved(
95 const std::vector<cricket::Candidate>& candidates) override {
96 for (const auto& handler : handlers_->on_ice_candidates_removed)
97 handler(candidates);
98 }
OnAddTrack(rtc::scoped_refptr<RtpReceiverInterface> receiver,const std::vector<rtc::scoped_refptr<MediaStreamInterface>> & streams)99 void OnAddTrack(rtc::scoped_refptr<RtpReceiverInterface> receiver,
100 const std::vector<rtc::scoped_refptr<MediaStreamInterface> >&
101 streams) override {
102 for (const auto& handler : handlers_->on_add_track)
103 handler(receiver, streams);
104 }
OnTrack(rtc::scoped_refptr<RtpTransceiverInterface> transceiver)105 void OnTrack(
106 rtc::scoped_refptr<RtpTransceiverInterface> transceiver) override {
107 for (const auto& handler : handlers_->on_track)
108 handler(transceiver);
109 }
OnRemoveTrack(rtc::scoped_refptr<RtpReceiverInterface> receiver)110 void OnRemoveTrack(
111 rtc::scoped_refptr<RtpReceiverInterface> receiver) override {
112 for (const auto& handler : handlers_->on_remove_track)
113 handler(receiver);
114 }
115
116 private:
117 PeerScenarioClient::CallbackHandlers* handlers_;
118 };
119
120 class LambdaCreateSessionDescriptionObserver
121 : public CreateSessionDescriptionObserver {
122 public:
LambdaCreateSessionDescriptionObserver(std::function<void (std::unique_ptr<SessionDescriptionInterface> desc)> on_success)123 explicit LambdaCreateSessionDescriptionObserver(
124 std::function<void(std::unique_ptr<SessionDescriptionInterface> desc)>
125 on_success)
126 : on_success_(on_success) {}
OnSuccess(SessionDescriptionInterface * desc)127 void OnSuccess(SessionDescriptionInterface* desc) override {
128 // Takes ownership of answer, according to CreateSessionDescriptionObserver
129 // convention.
130 on_success_(absl::WrapUnique(desc));
131 }
OnFailure(RTCError error)132 void OnFailure(RTCError error) override {
133 RTC_DCHECK_NOTREACHED() << error.message();
134 }
135
136 private:
137 std::function<void(std::unique_ptr<SessionDescriptionInterface> desc)>
138 on_success_;
139 };
140
141 class LambdaSetLocalDescriptionObserver
142 : public SetLocalDescriptionObserverInterface {
143 public:
LambdaSetLocalDescriptionObserver(std::function<void (RTCError)> on_complete)144 explicit LambdaSetLocalDescriptionObserver(
145 std::function<void(RTCError)> on_complete)
146 : on_complete_(on_complete) {}
OnSetLocalDescriptionComplete(RTCError error)147 void OnSetLocalDescriptionComplete(RTCError error) override {
148 on_complete_(error);
149 }
150
151 private:
152 std::function<void(RTCError)> on_complete_;
153 };
154
155 class LambdaSetRemoteDescriptionObserver
156 : public SetRemoteDescriptionObserverInterface {
157 public:
LambdaSetRemoteDescriptionObserver(std::function<void (RTCError)> on_complete)158 explicit LambdaSetRemoteDescriptionObserver(
159 std::function<void(RTCError)> on_complete)
160 : on_complete_(on_complete) {}
OnSetRemoteDescriptionComplete(RTCError error)161 void OnSetRemoteDescriptionComplete(RTCError error) override {
162 on_complete_(error);
163 }
164
165 private:
166 std::function<void(RTCError)> on_complete_;
167 };
168
169 class FakeVideoEncoderFactory : public VideoEncoderFactory {
170 public:
FakeVideoEncoderFactory(Clock * clock)171 FakeVideoEncoderFactory(Clock* clock) : clock_(clock) {}
GetSupportedFormats() const172 std::vector<SdpVideoFormat> GetSupportedFormats() const override {
173 return {SdpVideoFormat("VP8")};
174 }
CreateVideoEncoder(const SdpVideoFormat & format)175 std::unique_ptr<VideoEncoder> CreateVideoEncoder(
176 const SdpVideoFormat& format) override {
177 RTC_CHECK_EQ(format.name, "VP8");
178 return std::make_unique<FakeVp8Encoder>(clock_);
179 }
180
181 private:
182 Clock* const clock_;
183 };
184 class FakeVideoDecoderFactory : public VideoDecoderFactory {
185 public:
GetSupportedFormats() const186 std::vector<SdpVideoFormat> GetSupportedFormats() const override {
187 return {SdpVideoFormat("VP8")};
188 }
CreateVideoDecoder(const SdpVideoFormat & format)189 std::unique_ptr<VideoDecoder> CreateVideoDecoder(
190 const SdpVideoFormat& format) override {
191 return std::make_unique<FakeDecoder>();
192 }
193 };
194 } // namespace
195
PeerScenarioClient(NetworkEmulationManager * net,rtc::Thread * signaling_thread,std::unique_ptr<LogWriterFactoryInterface> log_writer_factory,PeerScenarioClient::Config config)196 PeerScenarioClient::PeerScenarioClient(
197 NetworkEmulationManager* net,
198 rtc::Thread* signaling_thread,
199 std::unique_ptr<LogWriterFactoryInterface> log_writer_factory,
200 PeerScenarioClient::Config config)
201 : endpoints_(CreateEndpoints(net, config.endpoints)),
202 task_queue_factory_(net->time_controller()->GetTaskQueueFactory()),
203 signaling_thread_(signaling_thread),
204 log_writer_factory_(std::move(log_writer_factory)),
205 worker_thread_(net->time_controller()->CreateThread("worker")),
206 handlers_(config.handlers),
207 observer_(new LambdaPeerConnectionObserver(&handlers_)) {
208 handlers_.on_track.push_back(
209 [this](rtc::scoped_refptr<RtpTransceiverInterface> transceiver) {
210 auto track = transceiver->receiver()->track().get();
211 if (track->kind() == MediaStreamTrackInterface::kVideoKind) {
212 auto* video = static_cast<VideoTrackInterface*>(track);
213 RTC_DCHECK_RUN_ON(signaling_thread_);
214 for (auto* sink : track_id_to_video_sinks_[track->id()]) {
215 video->AddOrUpdateSink(sink, rtc::VideoSinkWants());
216 }
217 }
218 });
219 handlers_.on_signaling_change.push_back(
220 [this](PeerConnectionInterface::SignalingState state) {
221 RTC_DCHECK_RUN_ON(signaling_thread_);
222 if (state == PeerConnectionInterface::SignalingState::kStable &&
223 peer_connection_->current_remote_description()) {
224 for (const auto& candidate : pending_ice_candidates_) {
225 RTC_CHECK(peer_connection_->AddIceCandidate(candidate.get()));
226 }
227 pending_ice_candidates_.clear();
228 }
229 });
230
231 std::vector<EmulatedEndpoint*> endpoints_vector;
232 for (const auto& kv : endpoints_)
233 endpoints_vector.push_back(kv.second);
234 auto* manager = net->CreateEmulatedNetworkManagerInterface(endpoints_vector);
235
236 PeerConnectionFactoryDependencies pcf_deps;
237 pcf_deps.network_thread = manager->network_thread();
238 pcf_deps.signaling_thread = signaling_thread_;
239 pcf_deps.worker_thread = worker_thread_.get();
240 pcf_deps.call_factory =
241 CreateTimeControllerBasedCallFactory(net->time_controller());
242 pcf_deps.task_queue_factory =
243 net->time_controller()->CreateTaskQueueFactory();
244 pcf_deps.event_log_factory =
245 std::make_unique<RtcEventLogFactory>(task_queue_factory_);
246 pcf_deps.trials = std::make_unique<FieldTrialBasedConfig>();
247
248 cricket::MediaEngineDependencies media_deps;
249 media_deps.task_queue_factory = task_queue_factory_;
250 media_deps.adm = TestAudioDeviceModule::Create(
251 task_queue_factory_,
252 TestAudioDeviceModule::CreatePulsedNoiseCapturer(
253 config.audio.pulsed_noise->amplitude *
254 std::numeric_limits<int16_t>::max(),
255 config.audio.sample_rate, config.audio.channels),
256 TestAudioDeviceModule::CreateDiscardRenderer(config.audio.sample_rate));
257
258 media_deps.audio_processing = AudioProcessingBuilder().Create();
259 if (config.video.use_fake_codecs) {
260 media_deps.video_encoder_factory =
261 std::make_unique<FakeVideoEncoderFactory>(
262 net->time_controller()->GetClock());
263 media_deps.video_decoder_factory =
264 std::make_unique<FakeVideoDecoderFactory>();
265 } else {
266 media_deps.video_encoder_factory = CreateBuiltinVideoEncoderFactory();
267 media_deps.video_decoder_factory = CreateBuiltinVideoDecoderFactory();
268 }
269 media_deps.audio_encoder_factory = CreateBuiltinAudioEncoderFactory();
270 media_deps.audio_decoder_factory = CreateBuiltinAudioDecoderFactory();
271 media_deps.trials = pcf_deps.trials.get();
272
273 pcf_deps.media_engine = cricket::CreateMediaEngine(std::move(media_deps));
274 pcf_deps.fec_controller_factory = nullptr;
275 pcf_deps.network_controller_factory = nullptr;
276 pcf_deps.network_state_predictor_factory = nullptr;
277
278 pc_factory_ = CreateModularPeerConnectionFactory(std::move(pcf_deps));
279 PeerConnectionFactoryInterface::Options pc_options;
280 pc_options.disable_encryption = config.disable_encryption;
281 pc_factory_->SetOptions(pc_options);
282
283 PeerConnectionDependencies pc_deps(observer_.get());
284 pc_deps.allocator = std::make_unique<cricket::BasicPortAllocator>(
285 manager->network_manager(), manager->packet_socket_factory());
286 pc_deps.allocator->set_flags(pc_deps.allocator->flags() |
287 cricket::PORTALLOCATOR_DISABLE_TCP);
288 peer_connection_ =
289 pc_factory_
290 ->CreatePeerConnectionOrError(config.rtc_config, std::move(pc_deps))
291 .MoveValue();
292 if (log_writer_factory_) {
293 peer_connection_->StartRtcEventLog(log_writer_factory_->Create(".rtc.dat"),
294 /*output_period_ms=*/1000);
295 }
296 }
297
endpoint(int index)298 EmulatedEndpoint* PeerScenarioClient::endpoint(int index) {
299 RTC_CHECK_GT(endpoints_.size(), index);
300 return endpoints_.at(index);
301 }
302
CreateAudio(std::string track_id,cricket::AudioOptions options)303 PeerScenarioClient::AudioSendTrack PeerScenarioClient::CreateAudio(
304 std::string track_id,
305 cricket::AudioOptions options) {
306 RTC_DCHECK_RUN_ON(signaling_thread_);
307 AudioSendTrack res;
308 auto source = pc_factory_->CreateAudioSource(options);
309 auto track = pc_factory_->CreateAudioTrack(track_id, source.get());
310 res.track = track;
311 res.sender = peer_connection_->AddTrack(track, {kCommonStreamId}).value();
312 return res;
313 }
314
CreateVideo(std::string track_id,VideoSendTrackConfig config)315 PeerScenarioClient::VideoSendTrack PeerScenarioClient::CreateVideo(
316 std::string track_id,
317 VideoSendTrackConfig config) {
318 RTC_DCHECK_RUN_ON(signaling_thread_);
319 VideoSendTrack res;
320 auto capturer = FrameGeneratorCapturer::Create(clock(), *task_queue_factory_,
321 config.generator);
322 res.capturer = capturer.get();
323 capturer->Init();
324 res.source = rtc::make_ref_counted<FrameGeneratorCapturerVideoTrackSource>(
325 std::move(capturer), config.screencast);
326 auto track = pc_factory_->CreateVideoTrack(track_id, res.source.get());
327 res.track = track.get();
328 res.sender =
329 peer_connection_->AddTrack(track, {kCommonStreamId}).MoveValue().get();
330 return res;
331 }
332
AddVideoReceiveSink(std::string track_id,rtc::VideoSinkInterface<VideoFrame> * video_sink)333 void PeerScenarioClient::AddVideoReceiveSink(
334 std::string track_id,
335 rtc::VideoSinkInterface<VideoFrame>* video_sink) {
336 RTC_DCHECK_RUN_ON(signaling_thread_);
337 track_id_to_video_sinks_[track_id].push_back(video_sink);
338 }
339
CreateAndSetSdp(std::function<void (SessionDescriptionInterface *)> munge_offer,std::function<void (std::string)> offer_handler)340 void PeerScenarioClient::CreateAndSetSdp(
341 std::function<void(SessionDescriptionInterface*)> munge_offer,
342 std::function<void(std::string)> offer_handler) {
343 RTC_DCHECK_RUN_ON(signaling_thread_);
344 peer_connection_->CreateOffer(
345 rtc::make_ref_counted<LambdaCreateSessionDescriptionObserver>(
346 [=](std::unique_ptr<SessionDescriptionInterface> offer) {
347 RTC_DCHECK_RUN_ON(signaling_thread_);
348 if (munge_offer) {
349 munge_offer(offer.get());
350 }
351 std::string sdp_offer;
352 RTC_CHECK(offer->ToString(&sdp_offer));
353 peer_connection_->SetLocalDescription(
354 std::move(offer),
355 rtc::make_ref_counted<LambdaSetLocalDescriptionObserver>(
356 [sdp_offer, offer_handler](RTCError) {
357 offer_handler(sdp_offer);
358 }));
359 })
360 .get(),
361 PeerConnectionInterface::RTCOfferAnswerOptions());
362 }
363
SetSdpOfferAndGetAnswer(std::string remote_offer,std::function<void (std::string)> answer_handler)364 void PeerScenarioClient::SetSdpOfferAndGetAnswer(
365 std::string remote_offer,
366 std::function<void(std::string)> answer_handler) {
367 if (!signaling_thread_->IsCurrent()) {
368 signaling_thread_->PostTask(
369 [=] { SetSdpOfferAndGetAnswer(remote_offer, answer_handler); });
370 return;
371 }
372 RTC_DCHECK_RUN_ON(signaling_thread_);
373 peer_connection_->SetRemoteDescription(
374 CreateSessionDescription(SdpType::kOffer, remote_offer),
375 rtc::make_ref_counted<LambdaSetRemoteDescriptionObserver>([=](RTCError) {
376 RTC_DCHECK_RUN_ON(signaling_thread_);
377 peer_connection_->CreateAnswer(
378 rtc::make_ref_counted<LambdaCreateSessionDescriptionObserver>(
379 [=](std::unique_ptr<SessionDescriptionInterface> answer) {
380 RTC_DCHECK_RUN_ON(signaling_thread_);
381 std::string sdp_answer;
382 answer->ToString(&sdp_answer);
383 RTC_LOG(LS_INFO) << sdp_answer;
384 peer_connection_->SetLocalDescription(
385 std::move(answer),
386 rtc::make_ref_counted<LambdaSetLocalDescriptionObserver>(
387 [answer_handler, sdp_answer](RTCError) {
388 answer_handler(sdp_answer);
389 }));
390 })
391 .get(),
392 PeerConnectionInterface::RTCOfferAnswerOptions());
393 }));
394 }
395
SetSdpAnswer(std::string remote_answer,std::function<void (const SessionDescriptionInterface &)> done_handler)396 void PeerScenarioClient::SetSdpAnswer(
397 std::string remote_answer,
398 std::function<void(const SessionDescriptionInterface&)> done_handler) {
399 if (!signaling_thread_->IsCurrent()) {
400 signaling_thread_->PostTask(
401 [=] { SetSdpAnswer(remote_answer, done_handler); });
402 return;
403 }
404 RTC_DCHECK_RUN_ON(signaling_thread_);
405 peer_connection_->SetRemoteDescription(
406 CreateSessionDescription(SdpType::kAnswer, remote_answer),
407 rtc::make_ref_counted<LambdaSetRemoteDescriptionObserver>(
408 [remote_answer, done_handler](RTCError) {
409 auto answer =
410 CreateSessionDescription(SdpType::kAnswer, remote_answer);
411 done_handler(*answer);
412 }));
413 }
414
AddIceCandidate(std::unique_ptr<IceCandidateInterface> candidate)415 void PeerScenarioClient::AddIceCandidate(
416 std::unique_ptr<IceCandidateInterface> candidate) {
417 RTC_DCHECK_RUN_ON(signaling_thread_);
418 if (peer_connection_->signaling_state() ==
419 PeerConnectionInterface::SignalingState::kStable &&
420 peer_connection_->current_remote_description()) {
421 RTC_CHECK(peer_connection_->AddIceCandidate(candidate.get()));
422 } else {
423 pending_ice_candidates_.push_back(std::move(candidate));
424 }
425 }
426
427 } // namespace test
428 } // namespace webrtc
429