1 /*
2 * Copyright (C) 2020 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "host/frontend/webrtc/lib/streamer.h"
18
19 #include <android-base/logging.h>
20 #include <json/json.h>
21
22 #include <api/audio_codecs/audio_decoder_factory.h>
23 #include <api/audio_codecs/audio_encoder_factory.h>
24 #include <api/audio_codecs/builtin_audio_decoder_factory.h>
25 #include <api/audio_codecs/builtin_audio_encoder_factory.h>
26 #include <api/create_peerconnection_factory.h>
27 #include <api/peer_connection_interface.h>
28 #include <api/video_codecs/builtin_video_decoder_factory.h>
29 #include <api/video_codecs/builtin_video_encoder_factory.h>
30 #include <api/video_codecs/video_decoder_factory.h>
31 #include <api/video_codecs/video_encoder_factory.h>
32 #include <media/base/video_broadcaster.h>
33 #include <pc/video_track_source.h>
34
35 #include "host/frontend/webrtc/lib/audio_device.h"
36 #include "host/frontend/webrtc/lib/audio_track_source_impl.h"
37 #include "host/frontend/webrtc/lib/client_handler.h"
38 #include "host/frontend/webrtc/lib/port_range_socket_factory.h"
39 #include "host/frontend/webrtc/lib/video_track_source_impl.h"
40 #include "host/frontend/webrtc/lib/vp8only_encoder_factory.h"
41 #include "host/frontend/webrtc_operator/constants/signaling_constants.h"
42
43 namespace cuttlefish {
44 namespace webrtc_streaming {
45 namespace {
46
47 constexpr auto kStreamIdField = "stream_id";
48 constexpr auto kXResField = "x_res";
49 constexpr auto kYResField = "y_res";
50 constexpr auto kDpiField = "dpi";
51 constexpr auto kIsTouchField = "is_touch";
52 constexpr auto kDisplaysField = "displays";
53 constexpr auto kAudioStreamsField = "audio_streams";
54 constexpr auto kHardwareField = "hardware";
55 constexpr auto kControlPanelButtonCommand = "command";
56 constexpr auto kControlPanelButtonTitle = "title";
57 constexpr auto kControlPanelButtonIconName = "icon_name";
58 constexpr auto kControlPanelButtonShellCommand = "shell_command";
59 constexpr auto kControlPanelButtonDeviceStates = "device_states";
60 constexpr auto kControlPanelButtonLidSwitchOpen = "lid_switch_open";
61 constexpr auto kControlPanelButtonHingeAngleValue = "hinge_angle_value";
62 constexpr auto kCustomControlPanelButtonsField = "custom_control_panel_buttons";
63
SendJson(WsConnection * ws_conn,const Json::Value & data)64 void SendJson(WsConnection* ws_conn, const Json::Value& data) {
65 Json::StreamWriterBuilder factory;
66 auto data_str = Json::writeString(factory, data);
67 ws_conn->Send(reinterpret_cast<const uint8_t*>(data_str.c_str()),
68 data_str.size());
69 }
70
ParseMessage(const uint8_t * data,size_t length,Json::Value * msg_out)71 bool ParseMessage(const uint8_t* data, size_t length, Json::Value* msg_out) {
72 auto str = reinterpret_cast<const char*>(data);
73 Json::CharReaderBuilder builder;
74 std::unique_ptr<Json::CharReader> json_reader(builder.newCharReader());
75 std::string errorMessage;
76 return json_reader->parse(str, str + length, msg_out, &errorMessage);
77 }
78
CreateAndStartThread(const std::string & name)79 std::unique_ptr<rtc::Thread> CreateAndStartThread(const std::string& name) {
80 auto thread = rtc::Thread::CreateWithSocketServer();
81 if (!thread) {
82 LOG(ERROR) << "Failed to create " << name << " thread";
83 return nullptr;
84 }
85 thread->SetName(name, nullptr);
86 if (!thread->Start()) {
87 LOG(ERROR) << "Failed to start " << name << " thread";
88 return nullptr;
89 }
90 return thread;
91 }
92
93 struct DisplayDescriptor {
94 int width;
95 int height;
96 int dpi;
97 bool touch_enabled;
98 rtc::scoped_refptr<webrtc::VideoTrackSourceInterface> source;
99 };
100
101 struct ControlPanelButtonDescriptor {
102 std::string command;
103 std::string title;
104 std::string icon_name;
105 std::optional<std::string> shell_command;
106 std::vector<DeviceState> device_states;
107 };
108
109 // TODO (jemoreira): move to a place in common with the signaling server
110 struct OperatorServerConfig {
111 std::vector<webrtc::PeerConnectionInterface::IceServer> servers;
112 };
113
114 // Wraps a scoped_refptr pointer to an audio device module
115 class AudioDeviceModuleWrapper : public AudioSource {
116 public:
AudioDeviceModuleWrapper(rtc::scoped_refptr<CfAudioDeviceModule> device_module)117 AudioDeviceModuleWrapper(
118 rtc::scoped_refptr<CfAudioDeviceModule> device_module)
119 : device_module_(device_module) {}
GetMoreAudioData(void * data,int bytes_per_sample,int samples_per_channel,int num_channels,int sample_rate,bool & muted)120 int GetMoreAudioData(void* data, int bytes_per_sample,
121 int samples_per_channel, int num_channels,
122 int sample_rate, bool& muted) override {
123 return device_module_->GetMoreAudioData(data, bytes_per_sample,
124 samples_per_channel, num_channels,
125 sample_rate, muted);
126 }
127
device_module()128 rtc::scoped_refptr<CfAudioDeviceModule> device_module() {
129 return device_module_;
130 }
131
132 private:
133 rtc::scoped_refptr<CfAudioDeviceModule> device_module_;
134 };
135
136 } // namespace
137
138 class Streamer::Impl : public WsConnectionObserver {
139 public:
140 std::shared_ptr<ClientHandler> CreateClientHandler(int client_id);
141
142 void SendMessageToClient(int client_id, const Json::Value& msg);
143 void DestroyClientHandler(int client_id);
144
145 // WsObserver
146 void OnOpen() override;
147 void OnClose() override;
148 void OnError(const std::string& error) override;
149 void OnReceive(const uint8_t* msg, size_t length, bool is_binary) override;
150
151 void HandleConfigMessage(const Json::Value& msg);
152 void HandleClientMessage(const Json::Value& server_message);
153
154 // All accesses to these variables happen from the signal_thread, so there is
155 // no need for extra synchronization mechanisms (mutex)
156 StreamerConfig config_;
157 OperatorServerConfig operator_config_;
158 std::shared_ptr<WsConnection> server_connection_;
159 std::shared_ptr<ConnectionObserverFactory> connection_observer_factory_;
160 rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface>
161 peer_connection_factory_;
162 std::unique_ptr<rtc::Thread> network_thread_;
163 std::unique_ptr<rtc::Thread> worker_thread_;
164 std::unique_ptr<rtc::Thread> signal_thread_;
165 std::map<std::string, DisplayDescriptor> displays_;
166 std::map<std::string, rtc::scoped_refptr<AudioTrackSourceImpl>>
167 audio_sources_;
168 std::map<int, std::shared_ptr<ClientHandler>> clients_;
169 std::weak_ptr<OperatorObserver> operator_observer_;
170 std::map<std::string, std::string> hardware_;
171 std::vector<ControlPanelButtonDescriptor> custom_control_panel_buttons_;
172 std::shared_ptr<AudioDeviceModuleWrapper> audio_device_module_;
173 };
174
Streamer(std::unique_ptr<Streamer::Impl> impl)175 Streamer::Streamer(std::unique_ptr<Streamer::Impl> impl)
176 : impl_(std::move(impl)) {}
177
178 /* static */
Create(const StreamerConfig & cfg,std::shared_ptr<ConnectionObserverFactory> connection_observer_factory)179 std::unique_ptr<Streamer> Streamer::Create(
180 const StreamerConfig& cfg,
181 std::shared_ptr<ConnectionObserverFactory> connection_observer_factory) {
182
183 rtc::LogMessage::LogToDebug(rtc::LS_ERROR);
184
185 std::unique_ptr<Streamer::Impl> impl(new Streamer::Impl());
186 impl->config_ = cfg;
187 impl->connection_observer_factory_ = connection_observer_factory;
188
189 impl->network_thread_ = CreateAndStartThread("network-thread");
190 impl->worker_thread_ = CreateAndStartThread("work-thread");
191 impl->signal_thread_ = CreateAndStartThread("signal-thread");
192 if (!impl->network_thread_ || !impl->worker_thread_ ||
193 !impl->signal_thread_) {
194 return nullptr;
195 }
196
197 impl->audio_device_module_ = std::make_shared<AudioDeviceModuleWrapper>(
198 rtc::scoped_refptr<CfAudioDeviceModule>(
199 new rtc::RefCountedObject<CfAudioDeviceModule>()));
200
201 impl->peer_connection_factory_ = webrtc::CreatePeerConnectionFactory(
202 impl->network_thread_.get(), impl->worker_thread_.get(),
203 impl->signal_thread_.get(), impl->audio_device_module_->device_module(),
204 webrtc::CreateBuiltinAudioEncoderFactory(),
205 webrtc::CreateBuiltinAudioDecoderFactory(),
206 std::make_unique<VP8OnlyEncoderFactory>(
207 webrtc::CreateBuiltinVideoEncoderFactory()),
208 webrtc::CreateBuiltinVideoDecoderFactory(), nullptr /* audio_mixer */,
209 nullptr /* audio_processing */);
210
211 if (!impl->peer_connection_factory_) {
212 LOG(ERROR) << "Failed to create peer connection factory";
213 return nullptr;
214 }
215
216 webrtc::PeerConnectionFactoryInterface::Options options;
217 // By default the loopback network is ignored, but generating candidates for
218 // it is useful when using TCP port forwarding.
219 options.network_ignore_mask = 0;
220 impl->peer_connection_factory_->SetOptions(options);
221
222 return std::unique_ptr<Streamer>(new Streamer(std::move(impl)));
223 }
224
AddDisplay(const std::string & label,int width,int height,int dpi,bool touch_enabled)225 std::shared_ptr<VideoSink> Streamer::AddDisplay(const std::string& label,
226 int width, int height, int dpi,
227 bool touch_enabled) {
228 // Usually called from an application thread
229 return impl_->signal_thread_->Invoke<std::shared_ptr<VideoSink>>(
230 RTC_FROM_HERE,
231 [this, &label, width, height, dpi,
232 touch_enabled]() -> std::shared_ptr<VideoSink> {
233 if (impl_->displays_.count(label)) {
234 LOG(ERROR) << "Display with same label already exists: " << label;
235 return nullptr;
236 }
237 rtc::scoped_refptr<VideoTrackSourceImpl> source(
238 new rtc::RefCountedObject<VideoTrackSourceImpl>(width, height));
239 impl_->displays_[label] = {width, height, dpi, touch_enabled, source};
240 return std::shared_ptr<VideoSink>(
241 new VideoTrackSourceImplSinkWrapper(source));
242 });
243 }
244
AddAudioStream(const std::string & label)245 std::shared_ptr<AudioSink> Streamer::AddAudioStream(const std::string& label) {
246 // Usually called from an application thread
247 return impl_->signal_thread_->Invoke<std::shared_ptr<AudioSink>>(
248 RTC_FROM_HERE, [this, &label]() -> std::shared_ptr<AudioSink> {
249 if (impl_->audio_sources_.count(label)) {
250 LOG(ERROR) << "Audio stream with same label already exists: "
251 << label;
252 return nullptr;
253 }
254 rtc::scoped_refptr<AudioTrackSourceImpl> source(
255 new rtc::RefCountedObject<AudioTrackSourceImpl>());
256 impl_->audio_sources_[label] = source;
257 return std::shared_ptr<AudioSink>(
258 new AudioTrackSourceImplSinkWrapper(source));
259 });
260 }
261
GetAudioSource()262 std::shared_ptr<AudioSource> Streamer::GetAudioSource() {
263 return impl_->audio_device_module_;
264 }
265
SetHardwareSpec(std::string key,std::string value)266 void Streamer::SetHardwareSpec(std::string key, std::string value) {
267 impl_->hardware_.emplace(key, value);
268 }
269
AddCustomControlPanelButton(const std::string & command,const std::string & title,const std::string & icon_name)270 void Streamer::AddCustomControlPanelButton(const std::string& command,
271 const std::string& title,
272 const std::string& icon_name) {
273 ControlPanelButtonDescriptor button = {
274 .command = command, .title = title, .icon_name = icon_name};
275 impl_->custom_control_panel_buttons_.push_back(button);
276 }
277
AddCustomControlPanelButtonWithShellCommand(const std::string & command,const std::string & title,const std::string & icon_name,const std::string & shell_command)278 void Streamer::AddCustomControlPanelButtonWithShellCommand(
279 const std::string& command, const std::string& title,
280 const std::string& icon_name, const std::string& shell_command) {
281 ControlPanelButtonDescriptor button = {
282 .command = command, .title = title, .icon_name = icon_name};
283 button.shell_command = shell_command;
284 impl_->custom_control_panel_buttons_.push_back(button);
285 }
286
AddCustomControlPanelButtonWithDeviceStates(const std::string & command,const std::string & title,const std::string & icon_name,const std::vector<DeviceState> & device_states)287 void Streamer::AddCustomControlPanelButtonWithDeviceStates(
288 const std::string& command, const std::string& title,
289 const std::string& icon_name,
290 const std::vector<DeviceState>& device_states) {
291 ControlPanelButtonDescriptor button = {
292 .command = command, .title = title, .icon_name = icon_name};
293 button.device_states = device_states;
294 impl_->custom_control_panel_buttons_.push_back(button);
295 }
296
Register(std::weak_ptr<OperatorObserver> observer)297 void Streamer::Register(std::weak_ptr<OperatorObserver> observer) {
298 // Usually called from an application thread
299 // No need to block the calling thread on this, the observer will be notified
300 // when the connection is established.
301 impl_->signal_thread_->PostTask(RTC_FROM_HERE, [this, observer]() {
302 impl_->operator_observer_ = observer;
303 // This can be a local variable since the connection object will keep a
304 // reference to it.
305 auto ws_context = WsConnectionContext::Create();
306 CHECK(ws_context) << "Failed to create websocket context";
307 impl_->server_connection_ = ws_context->CreateConnection(
308 impl_->config_.operator_server.port,
309 impl_->config_.operator_server.addr,
310 impl_->config_.operator_server.path,
311 impl_->config_.operator_server.security, impl_,
312 impl_->config_.operator_server.http_headers);
313
314 CHECK(impl_->server_connection_)
315 << "Unable to create websocket connection object";
316
317 impl_->server_connection_->Connect();
318 });
319 }
320
Unregister()321 void Streamer::Unregister() {
322 // Usually called from an application thread.
323 impl_->signal_thread_->PostTask(
324 RTC_FROM_HERE, [this]() { impl_->server_connection_.reset(); });
325 }
326
RecordDisplays(LocalRecorder & recorder)327 void Streamer::RecordDisplays(LocalRecorder& recorder) {
328 for (auto& [key, display] : impl_->displays_) {
329 rtc::scoped_refptr<webrtc::VideoTrackSourceInterface> source =
330 display.source;
331 auto deleter = [](webrtc::VideoTrackSourceInterface* source) {
332 source->Release();
333 };
334 std::shared_ptr<webrtc::VideoTrackSourceInterface> source_shared(
335 source.release(), deleter);
336 recorder.AddDisplay(display.width, display.height, source_shared);
337 }
338 }
339
OnOpen()340 void Streamer::Impl::OnOpen() {
341 // Called from the websocket thread.
342 // Connected to operator.
343 signal_thread_->PostTask(RTC_FROM_HERE, [this]() {
344 Json::Value register_obj;
345 register_obj[cuttlefish::webrtc_signaling::kTypeField] =
346 cuttlefish::webrtc_signaling::kRegisterType;
347 register_obj[cuttlefish::webrtc_signaling::kDeviceIdField] =
348 config_.device_id;
349
350 Json::Value device_info;
351 Json::Value displays(Json::ValueType::arrayValue);
352 // No need to synchronize with other accesses to display_ because all
353 // happens on signal_thread.
354 for (auto& entry : displays_) {
355 Json::Value display;
356 display[kStreamIdField] = entry.first;
357 display[kXResField] = entry.second.width;
358 display[kYResField] = entry.second.height;
359 display[kDpiField] = entry.second.dpi;
360 display[kIsTouchField] = true;
361 displays.append(display);
362 }
363 device_info[kDisplaysField] = displays;
364 Json::Value audio_streams(Json::ValueType::arrayValue);
365 for (auto& entry : audio_sources_) {
366 Json::Value audio;
367 audio[kStreamIdField] = entry.first;
368 audio_streams.append(audio);
369 }
370 device_info[kAudioStreamsField] = audio_streams;
371 Json::Value hardware;
372 for (const auto& [k, v] : hardware_) {
373 hardware[k] = v;
374 }
375 device_info[kHardwareField] = hardware;
376 Json::Value custom_control_panel_buttons(Json::arrayValue);
377 for (const auto& button : custom_control_panel_buttons_) {
378 Json::Value button_entry;
379 button_entry[kControlPanelButtonCommand] = button.command;
380 button_entry[kControlPanelButtonTitle] = button.title;
381 button_entry[kControlPanelButtonIconName] = button.icon_name;
382 if (button.shell_command) {
383 button_entry[kControlPanelButtonShellCommand] = *(button.shell_command);
384 } else if (!button.device_states.empty()) {
385 Json::Value device_states(Json::arrayValue);
386 for (const DeviceState& device_state : button.device_states) {
387 Json::Value device_state_entry;
388 if (device_state.lid_switch_open) {
389 device_state_entry[kControlPanelButtonLidSwitchOpen] =
390 *device_state.lid_switch_open;
391 }
392 if (device_state.hinge_angle_value) {
393 device_state_entry[kControlPanelButtonHingeAngleValue] =
394 *device_state.hinge_angle_value;
395 }
396 device_states.append(device_state_entry);
397 }
398 button_entry[kControlPanelButtonDeviceStates] = device_states;
399 }
400 custom_control_panel_buttons.append(button_entry);
401 }
402 device_info[kCustomControlPanelButtonsField] = custom_control_panel_buttons;
403 register_obj[cuttlefish::webrtc_signaling::kDeviceInfoField] = device_info;
404 SendJson(server_connection_.get(), register_obj);
405 // Do this last as OnRegistered() is user code and may take some time to
406 // complete (although it shouldn't...)
407 auto observer = operator_observer_.lock();
408 if (observer) {
409 observer->OnRegistered();
410 }
411 });
412 }
413
OnClose()414 void Streamer::Impl::OnClose() {
415 // Called from websocket thread
416 // The operator shouldn't close the connection with the client, it's up to the
417 // device to decide when to disconnect.
418 LOG(WARNING) << "Websocket closed unexpectedly";
419 signal_thread_->PostTask(RTC_FROM_HERE, [this]() {
420 auto observer = operator_observer_.lock();
421 if (observer) {
422 observer->OnClose();
423 }
424 });
425 }
426
OnError(const std::string & error)427 void Streamer::Impl::OnError(const std::string& error) {
428 // Called from websocket thread.
429 LOG(ERROR) << "Error on connection with the operator: " << error;
430 signal_thread_->PostTask(RTC_FROM_HERE, [this]() {
431 auto observer = operator_observer_.lock();
432 if (observer) {
433 observer->OnError();
434 }
435 });
436 }
437
HandleConfigMessage(const Json::Value & server_message)438 void Streamer::Impl::HandleConfigMessage(const Json::Value& server_message) {
439 CHECK(signal_thread_->IsCurrent())
440 << __FUNCTION__ << " called from the wrong thread";
441 if (server_message.isMember("ice_servers") &&
442 server_message["ice_servers"].isArray()) {
443 auto servers = server_message["ice_servers"];
444 operator_config_.servers.clear();
445 for (int server_idx = 0; server_idx < servers.size(); server_idx++) {
446 auto server = servers[server_idx];
447 webrtc::PeerConnectionInterface::IceServer ice_server;
448 if (!server.isMember("urls") || !server["urls"].isArray()) {
449 // The urls field is required
450 LOG(WARNING)
451 << "Invalid ICE server specification obtained from server: "
452 << server.toStyledString();
453 continue;
454 }
455 auto urls = server["urls"];
456 for (int url_idx = 0; url_idx < urls.size(); url_idx++) {
457 auto url = urls[url_idx];
458 if (!url.isString()) {
459 LOG(WARNING) << "Non string 'urls' field in ice server: "
460 << url.toStyledString();
461 continue;
462 }
463 ice_server.urls.push_back(url.asString());
464 if (server.isMember("credential") && server["credential"].isString()) {
465 ice_server.password = server["credential"].asString();
466 }
467 if (server.isMember("username") && server["username"].isString()) {
468 ice_server.username = server["username"].asString();
469 }
470 operator_config_.servers.push_back(ice_server);
471 }
472 }
473 }
474 }
475
HandleClientMessage(const Json::Value & server_message)476 void Streamer::Impl::HandleClientMessage(const Json::Value& server_message) {
477 CHECK(signal_thread_->IsCurrent())
478 << __FUNCTION__ << " called from the wrong thread";
479 if (!server_message.isMember(cuttlefish::webrtc_signaling::kClientIdField) ||
480 !server_message[cuttlefish::webrtc_signaling::kClientIdField].isInt()) {
481 LOG(ERROR) << "Client message received without valid client id";
482 return;
483 }
484 auto client_id =
485 server_message[cuttlefish::webrtc_signaling::kClientIdField].asInt();
486 if (!server_message.isMember(cuttlefish::webrtc_signaling::kPayloadField)) {
487 LOG(WARNING) << "Received empty client message";
488 return;
489 }
490 auto client_message =
491 server_message[cuttlefish::webrtc_signaling::kPayloadField];
492 if (clients_.count(client_id) == 0) {
493 auto client_handler = CreateClientHandler(client_id);
494 if (!client_handler) {
495 LOG(ERROR) << "Failed to create a new client handler";
496 return;
497 }
498 clients_.emplace(client_id, client_handler);
499 }
500 auto client_handler = clients_[client_id];
501
502 client_handler->HandleMessage(client_message);
503 }
504
OnReceive(const uint8_t * msg,size_t length,bool is_binary)505 void Streamer::Impl::OnReceive(const uint8_t* msg, size_t length,
506 bool is_binary) {
507 // Usually called from websocket thread.
508 Json::Value server_message;
509 // Once OnReceive returns the buffer can be destroyed/recycled at any time, so
510 // parse the data into a JSON object while still on the websocket thread.
511 if (is_binary || !ParseMessage(msg, length, &server_message)) {
512 LOG(ERROR) << "Received invalid JSON from server: '"
513 << (is_binary ? std::string("(binary_data)")
514 : std::string(msg, msg + length))
515 << "'";
516 return;
517 }
518 // Transition to the signal thread before member variables are accessed.
519 signal_thread_->PostTask(RTC_FROM_HERE, [this, server_message]() {
520 if (!server_message.isMember(cuttlefish::webrtc_signaling::kTypeField) ||
521 !server_message[cuttlefish::webrtc_signaling::kTypeField].isString()) {
522 LOG(ERROR) << "No message_type field from server";
523 // Notify the caller
524 OnError(
525 "Invalid message received from operator: no message type field "
526 "present");
527 return;
528 }
529 auto type =
530 server_message[cuttlefish::webrtc_signaling::kTypeField].asString();
531 if (type == cuttlefish::webrtc_signaling::kConfigType) {
532 HandleConfigMessage(server_message);
533 } else if (type == cuttlefish::webrtc_signaling::kClientDisconnectType) {
534 if (!server_message.isMember(
535 cuttlefish::webrtc_signaling::kClientIdField) ||
536 !server_message.isMember(
537 cuttlefish::webrtc_signaling::kClientIdField)) {
538 LOG(ERROR) << "Invalid disconnect message received from server";
539 // Notify the caller
540 OnError("Invalid disconnect message: client_id is required");
541 return;
542 }
543 auto client_id =
544 server_message[cuttlefish::webrtc_signaling::kClientIdField].asInt();
545 LOG(INFO) << "Client " << client_id << " has disconnected.";
546 DestroyClientHandler(client_id);
547 } else if (type == cuttlefish::webrtc_signaling::kClientMessageType) {
548 HandleClientMessage(server_message);
549 } else {
550 LOG(ERROR) << "Unknown message type: " << type;
551 // Notify the caller
552 OnError("Invalid message received from operator: unknown message type");
553 return;
554 }
555 });
556 }
557
CreateClientHandler(int client_id)558 std::shared_ptr<ClientHandler> Streamer::Impl::CreateClientHandler(
559 int client_id) {
560 CHECK(signal_thread_->IsCurrent())
561 << __FUNCTION__ << " called from the wrong thread";
562 auto observer = connection_observer_factory_->CreateObserver();
563
564 auto client_handler = ClientHandler::Create(
565 client_id, observer,
566 [this, client_id](const Json::Value& msg) {
567 SendMessageToClient(client_id, msg);
568 },
569 [this, client_id] { DestroyClientHandler(client_id); });
570
571 webrtc::PeerConnectionInterface::RTCConfiguration config;
572 config.sdp_semantics = webrtc::SdpSemantics::kUnifiedPlan;
573 config.enable_dtls_srtp = true;
574 config.servers.insert(config.servers.end(), operator_config_.servers.begin(),
575 operator_config_.servers.end());
576 webrtc::PeerConnectionDependencies dependencies(client_handler.get());
577 // PortRangeSocketFactory's super class' constructor needs to be called on the
578 // network thread or have it as a parameter
579 dependencies.packet_socket_factory.reset(new PortRangeSocketFactory(
580 network_thread_.get(), config_.udp_port_range, config_.tcp_port_range));
581 auto peer_connection = peer_connection_factory_->CreatePeerConnection(
582 config, std::move(dependencies));
583
584 if (!peer_connection) {
585 LOG(ERROR) << "Failed to create peer connection";
586 return nullptr;
587 }
588
589 if (!client_handler->SetPeerConnection(std::move(peer_connection))) {
590 return nullptr;
591 }
592
593 for (auto& entry : displays_) {
594 auto& label = entry.first;
595 auto& video_source = entry.second.source;
596
597 auto video_track =
598 peer_connection_factory_->CreateVideoTrack(label, video_source.get());
599 client_handler->AddDisplay(video_track, label);
600 }
601
602 for (auto& entry : audio_sources_) {
603 auto& label = entry.first;
604 auto& audio_stream = entry.second;
605 auto audio_track =
606 peer_connection_factory_->CreateAudioTrack(label, audio_stream.get());
607 client_handler->AddAudio(audio_track, label);
608 }
609
610 return client_handler;
611 }
612
SendMessageToClient(int client_id,const Json::Value & msg)613 void Streamer::Impl::SendMessageToClient(int client_id,
614 const Json::Value& msg) {
615 LOG(VERBOSE) << "Sending to client: " << msg.toStyledString();
616 CHECK(signal_thread_->IsCurrent())
617 << __FUNCTION__ << " called from the wrong thread";
618 Json::Value wrapper;
619 wrapper[cuttlefish::webrtc_signaling::kPayloadField] = msg;
620 wrapper[cuttlefish::webrtc_signaling::kTypeField] =
621 cuttlefish::webrtc_signaling::kForwardType;
622 wrapper[cuttlefish::webrtc_signaling::kClientIdField] = client_id;
623 // This is safe to call from the webrtc threads because
624 // WsConnection is thread safe
625 SendJson(server_connection_.get(), wrapper);
626 }
627
DestroyClientHandler(int client_id)628 void Streamer::Impl::DestroyClientHandler(int client_id) {
629 // Usually called from signal thread, could be called from websocket thread or
630 // an application thread.
631 signal_thread_->PostTask(RTC_FROM_HERE, [this, client_id]() {
632 // This needs to be 'posted' to the thread instead of 'invoked'
633 // immediately for two reasons:
634 // * The client handler is destroyed by this code, it's generally a
635 // bad idea (though not necessarily wrong) to return to a member
636 // function of a destroyed object.
637 // * The client handler may call this from within a peer connection
638 // observer callback, destroying the client handler there leads to a
639 // deadlock.
640 clients_.erase(client_id);
641 });
642 }
643
644 } // namespace webrtc_streaming
645 } // namespace cuttlefish
646