• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "host/frontend/webrtc/libdevice/streamer.h"
18 
19 #include <android-base/logging.h>
20 #include <json/json.h>
21 
22 #include <api/audio_codecs/audio_decoder_factory.h>
23 #include <api/audio_codecs/audio_encoder_factory.h>
24 #include <api/audio_codecs/builtin_audio_decoder_factory.h>
25 #include <api/audio_codecs/builtin_audio_encoder_factory.h>
26 #include <api/create_peerconnection_factory.h>
27 #include <api/peer_connection_interface.h>
28 #include <api/video_codecs/builtin_video_decoder_factory.h>
29 #include <api/video_codecs/builtin_video_encoder_factory.h>
30 #include <api/video_codecs/video_decoder_factory.h>
31 #include <api/video_codecs/video_encoder_factory.h>
32 #include <media/base/video_broadcaster.h>
33 #include <pc/video_track_source.h>
34 
35 #include "host/frontend/webrtc/libcommon/audio_device.h"
36 #include "host/frontend/webrtc/libcommon/peer_connection_utils.h"
37 #include "host/frontend/webrtc/libcommon/port_range_socket_factory.h"
38 #include "host/frontend/webrtc/libcommon/utils.h"
39 #include "host/frontend/webrtc/libcommon/vp8only_encoder_factory.h"
40 #include "host/frontend/webrtc/libdevice/audio_track_source_impl.h"
41 #include "host/frontend/webrtc/libdevice/camera_streamer.h"
42 #include "host/frontend/webrtc/libdevice/client_handler.h"
43 #include "host/frontend/webrtc/libdevice/video_track_source_impl.h"
44 #include "host/frontend/webrtc_operator/constants/signaling_constants.h"
45 
46 namespace cuttlefish {
47 namespace webrtc_streaming {
48 namespace {
49 
50 constexpr auto kStreamIdField = "stream_id";
51 constexpr auto kXResField = "x_res";
52 constexpr auto kYResField = "y_res";
53 constexpr auto kDpiField = "dpi";
54 constexpr auto kIsTouchField = "is_touch";
55 constexpr auto kDisplaysField = "displays";
56 constexpr auto kAudioStreamsField = "audio_streams";
57 constexpr auto kHardwareField = "hardware";
58 constexpr auto kControlPanelButtonCommand = "command";
59 constexpr auto kControlPanelButtonTitle = "title";
60 constexpr auto kControlPanelButtonIconName = "icon_name";
61 constexpr auto kControlPanelButtonShellCommand = "shell_command";
62 constexpr auto kControlPanelButtonDeviceStates = "device_states";
63 constexpr auto kControlPanelButtonLidSwitchOpen = "lid_switch_open";
64 constexpr auto kControlPanelButtonHingeAngleValue = "hinge_angle_value";
65 constexpr auto kCustomControlPanelButtonsField = "custom_control_panel_buttons";
66 
67 constexpr int kRegistrationRetries = 3;
68 constexpr int kRetryFirstIntervalMs = 1000;
69 constexpr int kReconnectRetries = 100;
70 constexpr int kReconnectIntervalMs = 1000;
71 
ParseMessage(const uint8_t * data,size_t length,Json::Value * msg_out)72 bool ParseMessage(const uint8_t* data, size_t length, Json::Value* msg_out) {
73   auto str = reinterpret_cast<const char*>(data);
74   Json::CharReaderBuilder builder;
75   std::unique_ptr<Json::CharReader> json_reader(builder.newCharReader());
76   std::string errorMessage;
77   return json_reader->parse(str, str + length, msg_out, &errorMessage);
78 }
79 
80 struct DisplayDescriptor {
81   int width;
82   int height;
83   int dpi;
84   bool touch_enabled;
85   rtc::scoped_refptr<webrtc::VideoTrackSourceInterface> source;
86 };
87 
88 struct ControlPanelButtonDescriptor {
89   std::string command;
90   std::string title;
91   std::string icon_name;
92   std::optional<std::string> shell_command;
93   std::vector<DeviceState> device_states;
94 };
95 
96 // TODO (jemoreira): move to a place in common with the signaling server
97 struct OperatorServerConfig {
98   std::vector<webrtc::PeerConnectionInterface::IceServer> servers;
99 };
100 
101 // Wraps a scoped_refptr pointer to an audio device module
102 class AudioDeviceModuleWrapper : public AudioSource {
103  public:
AudioDeviceModuleWrapper(rtc::scoped_refptr<CfAudioDeviceModule> device_module)104   AudioDeviceModuleWrapper(
105       rtc::scoped_refptr<CfAudioDeviceModule> device_module)
106       : device_module_(device_module) {}
GetMoreAudioData(void * data,int bytes_per_sample,int samples_per_channel,int num_channels,int sample_rate,bool & muted)107   int GetMoreAudioData(void* data, int bytes_per_sample,
108                        int samples_per_channel, int num_channels,
109                        int sample_rate, bool& muted) override {
110     return device_module_->GetMoreAudioData(data, bytes_per_sample,
111                                             samples_per_channel, num_channels,
112                                             sample_rate, muted);
113   }
114 
device_module()115   rtc::scoped_refptr<CfAudioDeviceModule> device_module() {
116     return device_module_;
117   }
118 
119  private:
120   rtc::scoped_refptr<CfAudioDeviceModule> device_module_;
121 };
122 
123 }  // namespace
124 
125 
126 class Streamer::Impl : public ServerConnectionObserver,
127                        public PeerConnectionBuilder,
128                        public std::enable_shared_from_this<ServerConnectionObserver> {
129  public:
130   std::shared_ptr<ClientHandler> CreateClientHandler(int client_id);
131 
132   void Register(std::weak_ptr<OperatorObserver> observer);
133 
134   void SendMessageToClient(int client_id, const Json::Value& msg);
135   void DestroyClientHandler(int client_id);
136   void SetupCameraForClient(int client_id);
137 
138   // WsObserver
139   void OnOpen() override;
140   void OnClose() override;
141   void OnError(const std::string& error) override;
142   void OnReceive(const uint8_t* msg, size_t length, bool is_binary) override;
143 
144   void HandleConfigMessage(const Json::Value& msg);
145   void HandleClientMessage(const Json::Value& server_message);
146 
147   // PeerConnectionBuilder
148   Result<rtc::scoped_refptr<webrtc::PeerConnectionInterface>> Build(
149       webrtc::PeerConnectionObserver& observer,
150       const std::vector<webrtc::PeerConnectionInterface::IceServer>&
151           per_connection_servers) override;
152 
153   // All accesses to these variables happen from the signal_thread, so there is
154   // no need for extra synchronization mechanisms (mutex)
155   StreamerConfig config_;
156   OperatorServerConfig operator_config_;
157   std::unique_ptr<ServerConnection> server_connection_;
158   std::shared_ptr<ConnectionObserverFactory> connection_observer_factory_;
159   rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface>
160       peer_connection_factory_;
161   std::unique_ptr<rtc::Thread> network_thread_;
162   std::unique_ptr<rtc::Thread> worker_thread_;
163   std::unique_ptr<rtc::Thread> signal_thread_;
164   std::map<std::string, DisplayDescriptor> displays_;
165   std::map<std::string, rtc::scoped_refptr<AudioTrackSourceImpl>>
166       audio_sources_;
167   std::map<int, std::shared_ptr<ClientHandler>> clients_;
168   std::weak_ptr<OperatorObserver> operator_observer_;
169   std::map<std::string, std::string> hardware_;
170   std::vector<ControlPanelButtonDescriptor> custom_control_panel_buttons_;
171   std::shared_ptr<AudioDeviceModuleWrapper> audio_device_module_;
172   std::unique_ptr<CameraStreamer> camera_streamer_;
173   int registration_retries_left_ = kRegistrationRetries;
174   int retry_interval_ms_ = kRetryFirstIntervalMs;
175   LocalRecorder* recorder_ = nullptr;
176 };
177 
Streamer(std::unique_ptr<Streamer::Impl> impl)178 Streamer::Streamer(std::unique_ptr<Streamer::Impl> impl)
179     : impl_(std::move(impl)) {}
180 
181 /* static */
Create(const StreamerConfig & cfg,LocalRecorder * recorder,std::shared_ptr<ConnectionObserverFactory> connection_observer_factory)182 std::unique_ptr<Streamer> Streamer::Create(
183     const StreamerConfig& cfg, LocalRecorder* recorder,
184     std::shared_ptr<ConnectionObserverFactory> connection_observer_factory) {
185   rtc::LogMessage::LogToDebug(rtc::LS_ERROR);
186 
187   std::unique_ptr<Streamer::Impl> impl(new Streamer::Impl());
188   impl->config_ = cfg;
189   impl->recorder_ = recorder;
190   impl->connection_observer_factory_ = connection_observer_factory;
191 
192   auto network_thread_result = CreateAndStartThread("network-thread");
193   if (!network_thread_result.ok()) {
194     LOG(ERROR) << network_thread_result.error().Trace();
195     return nullptr;
196   }
197   impl->network_thread_ = std::move(*network_thread_result);
198 
199   auto worker_thread_result = CreateAndStartThread("worker-thread");
200   if (!worker_thread_result.ok()) {
201     LOG(ERROR) << worker_thread_result.error().Trace();
202     return nullptr;
203   }
204   impl->worker_thread_ = std::move(*worker_thread_result);
205 
206   auto signal_thread_result = CreateAndStartThread("signal-thread");
207   if (!signal_thread_result.ok()) {
208     LOG(ERROR) << signal_thread_result.error().Trace();
209     return nullptr;
210   }
211   impl->signal_thread_ = std::move(*signal_thread_result);
212 
213   impl->audio_device_module_ = std::make_shared<AudioDeviceModuleWrapper>(
214       rtc::scoped_refptr<CfAudioDeviceModule>(
215           new rtc::RefCountedObject<CfAudioDeviceModule>()));
216 
217   auto result = CreatePeerConnectionFactory(
218       impl->network_thread_.get(), impl->worker_thread_.get(),
219       impl->signal_thread_.get(), impl->audio_device_module_->device_module());
220 
221   if (!result.ok()) {
222     LOG(ERROR) << result.error().Trace();
223     return nullptr;
224   }
225   impl->peer_connection_factory_ = *result;
226 
227   return std::unique_ptr<Streamer>(new Streamer(std::move(impl)));
228 }
229 
AddDisplay(const std::string & label,int width,int height,int dpi,bool touch_enabled)230 std::shared_ptr<VideoSink> Streamer::AddDisplay(const std::string& label,
231                                                 int width, int height, int dpi,
232                                                 bool touch_enabled) {
233   // Usually called from an application thread
234   return impl_->signal_thread_->BlockingCall(
235       [this, &label, width, height, dpi,
236        touch_enabled]() -> std::shared_ptr<VideoSink> {
237         if (impl_->displays_.count(label)) {
238           LOG(ERROR) << "Display with same label already exists: " << label;
239           return nullptr;
240         }
241         rtc::scoped_refptr<VideoTrackSourceImpl> source(
242             new rtc::RefCountedObject<VideoTrackSourceImpl>(width, height));
243         impl_->displays_[label] = {width, height, dpi, touch_enabled, source};
244 
245         auto video_track = impl_->peer_connection_factory_->CreateVideoTrack(
246             label, source.get());
247 
248         for (auto& [_, client] : impl_->clients_) {
249           client->AddDisplay(video_track, label);
250         }
251 
252         if (impl_->recorder_) {
253           rtc::scoped_refptr<webrtc::VideoTrackSourceInterface> source2 =
254               source;
255           auto deleter = [](webrtc::VideoTrackSourceInterface* source) {
256             source->Release();
257           };
258           std::shared_ptr<webrtc::VideoTrackSourceInterface> source_shared(
259               source2.release(), deleter);
260           impl_->recorder_->AddDisplay(width, height, source_shared);
261         }
262 
263         return std::shared_ptr<VideoSink>(
264             new VideoTrackSourceImplSinkWrapper(source));
265       });
266 }
267 
RemoveDisplay(const std::string & label)268 bool Streamer::RemoveDisplay(const std::string& label) {
269   // Usually called from an application thread
270   return impl_->signal_thread_->BlockingCall(
271       [this, &label]() -> bool {
272         for (auto& [_, client] : impl_->clients_) {
273           client->RemoveDisplay(label);
274         }
275 
276         impl_->displays_.erase(label);
277         return true;
278       });
279 }
280 
AddAudioStream(const std::string & label)281 std::shared_ptr<AudioSink> Streamer::AddAudioStream(const std::string& label) {
282   // Usually called from an application thread
283   return impl_->signal_thread_->BlockingCall(
284       [this, &label]() -> std::shared_ptr<AudioSink> {
285         if (impl_->audio_sources_.count(label)) {
286           LOG(ERROR) << "Audio stream with same label already exists: "
287                      << label;
288           return nullptr;
289         }
290         rtc::scoped_refptr<AudioTrackSourceImpl> source(
291             new rtc::RefCountedObject<AudioTrackSourceImpl>());
292         impl_->audio_sources_[label] = source;
293         return std::shared_ptr<AudioSink>(
294             new AudioTrackSourceImplSinkWrapper(source));
295       });
296 }
297 
GetAudioSource()298 std::shared_ptr<AudioSource> Streamer::GetAudioSource() {
299   return impl_->audio_device_module_;
300 }
301 
AddCamera(unsigned int port,unsigned int cid)302 CameraController* Streamer::AddCamera(unsigned int port, unsigned int cid) {
303   impl_->camera_streamer_ = std::make_unique<CameraStreamer>(port, cid);
304   return impl_->camera_streamer_.get();
305 }
306 
SetHardwareSpec(std::string key,std::string value)307 void Streamer::SetHardwareSpec(std::string key, std::string value) {
308   impl_->hardware_.emplace(key, value);
309 }
310 
AddCustomControlPanelButton(const std::string & command,const std::string & title,const std::string & icon_name)311 void Streamer::AddCustomControlPanelButton(const std::string& command,
312                                            const std::string& title,
313                                            const std::string& icon_name) {
314   ControlPanelButtonDescriptor button = {
315       .command = command, .title = title, .icon_name = icon_name};
316   impl_->custom_control_panel_buttons_.push_back(button);
317 }
318 
AddCustomControlPanelButtonWithShellCommand(const std::string & command,const std::string & title,const std::string & icon_name,const std::string & shell_command)319 void Streamer::AddCustomControlPanelButtonWithShellCommand(
320     const std::string& command, const std::string& title,
321     const std::string& icon_name, const std::string& shell_command) {
322   ControlPanelButtonDescriptor button = {
323       .command = command, .title = title, .icon_name = icon_name};
324   button.shell_command = shell_command;
325   impl_->custom_control_panel_buttons_.push_back(button);
326 }
327 
AddCustomControlPanelButtonWithDeviceStates(const std::string & command,const std::string & title,const std::string & icon_name,const std::vector<DeviceState> & device_states)328 void Streamer::AddCustomControlPanelButtonWithDeviceStates(
329     const std::string& command, const std::string& title,
330     const std::string& icon_name,
331     const std::vector<DeviceState>& device_states) {
332   ControlPanelButtonDescriptor button = {
333       .command = command, .title = title, .icon_name = icon_name};
334   button.device_states = device_states;
335   impl_->custom_control_panel_buttons_.push_back(button);
336 }
337 
Register(std::weak_ptr<OperatorObserver> observer)338 void Streamer::Register(std::weak_ptr<OperatorObserver> observer) {
339   // Usually called from an application thread
340   // No need to block the calling thread on this, the observer will be notified
341   // when the connection is established.
342   impl_->signal_thread_->PostTask([this, observer]() {
343     impl_->Register(observer);
344   });
345 }
346 
Unregister()347 void Streamer::Unregister() {
348   // Usually called from an application thread.
349   impl_->signal_thread_->PostTask(
350       [this]() { impl_->server_connection_.reset(); });
351 }
352 
Register(std::weak_ptr<OperatorObserver> observer)353 void Streamer::Impl::Register(std::weak_ptr<OperatorObserver> observer) {
354   operator_observer_ = observer;
355   // When the connection is established the OnOpen function will be called where
356   // the registration will take place
357   if (!server_connection_) {
358     server_connection_ =
359         ServerConnection::Connect(config_.operator_server, weak_from_this());
360   } else {
361     // in case connection attempt is retried, just call Reconnect().
362     // Recreating server_connection_ object will destroy existing WSConnection
363     // object and task re-scheduling will fail
364     server_connection_->Reconnect();
365   }
366 }
367 
OnOpen()368 void Streamer::Impl::OnOpen() {
369   // Called from the websocket thread.
370   // Connected to operator.
371   signal_thread_->PostTask([this]() {
372     Json::Value register_obj;
373     register_obj[cuttlefish::webrtc_signaling::kTypeField] =
374         cuttlefish::webrtc_signaling::kRegisterType;
375     register_obj[cuttlefish::webrtc_signaling::kDeviceIdField] =
376         config_.device_id;
377     CHECK(config_.client_files_port >= 0) << "Invalid device port provided";
378     register_obj[cuttlefish::webrtc_signaling::kDevicePortField] =
379         config_.client_files_port;
380 
381     Json::Value device_info;
382     Json::Value displays(Json::ValueType::arrayValue);
383     // No need to synchronize with other accesses to display_ because all
384     // happens on signal_thread.
385     for (auto& entry : displays_) {
386       Json::Value display;
387       display[kStreamIdField] = entry.first;
388       display[kXResField] = entry.second.width;
389       display[kYResField] = entry.second.height;
390       display[kDpiField] = entry.second.dpi;
391       display[kIsTouchField] = true;
392       displays.append(display);
393     }
394     device_info[kDisplaysField] = displays;
395     Json::Value audio_streams(Json::ValueType::arrayValue);
396     for (auto& entry : audio_sources_) {
397       Json::Value audio;
398       audio[kStreamIdField] = entry.first;
399       audio_streams.append(audio);
400     }
401     device_info[kAudioStreamsField] = audio_streams;
402     Json::Value hardware;
403     for (const auto& [k, v] : hardware_) {
404       hardware[k] = v;
405     }
406     device_info[kHardwareField] = hardware;
407     Json::Value custom_control_panel_buttons(Json::arrayValue);
408     for (const auto& button : custom_control_panel_buttons_) {
409       Json::Value button_entry;
410       button_entry[kControlPanelButtonCommand] = button.command;
411       button_entry[kControlPanelButtonTitle] = button.title;
412       button_entry[kControlPanelButtonIconName] = button.icon_name;
413       if (button.shell_command) {
414         button_entry[kControlPanelButtonShellCommand] = *(button.shell_command);
415       } else if (!button.device_states.empty()) {
416         Json::Value device_states(Json::arrayValue);
417         for (const DeviceState& device_state : button.device_states) {
418           Json::Value device_state_entry;
419           if (device_state.lid_switch_open) {
420             device_state_entry[kControlPanelButtonLidSwitchOpen] =
421                 *device_state.lid_switch_open;
422           }
423           if (device_state.hinge_angle_value) {
424             device_state_entry[kControlPanelButtonHingeAngleValue] =
425                 *device_state.hinge_angle_value;
426           }
427           device_states.append(device_state_entry);
428         }
429         button_entry[kControlPanelButtonDeviceStates] = device_states;
430       }
431       custom_control_panel_buttons.append(button_entry);
432     }
433     device_info[kCustomControlPanelButtonsField] = custom_control_panel_buttons;
434     register_obj[cuttlefish::webrtc_signaling::kDeviceInfoField] = device_info;
435     server_connection_->Send(register_obj);
436     // Do this last as OnRegistered() is user code and may take some time to
437     // complete (although it shouldn't...)
438     auto observer = operator_observer_.lock();
439     if (observer) {
440       observer->OnRegistered();
441     }
442   });
443 }
444 
OnClose()445 void Streamer::Impl::OnClose() {
446   // Called from websocket thread
447   // The operator shouldn't close the connection with the client, it's up to the
448   // device to decide when to disconnect.
449   LOG(WARNING) << "Connection with server closed unexpectedly";
450   signal_thread_->PostTask([this]() {
451     auto observer = operator_observer_.lock();
452     if (observer) {
453       observer->OnClose();
454     }
455   });
456   LOG(INFO) << "Trying to re-connect to operator..";
457   registration_retries_left_ = kReconnectRetries;
458   retry_interval_ms_ = kReconnectIntervalMs;
459   signal_thread_->PostDelayedTask(
460       [this]() { Register(operator_observer_); },
461       webrtc::TimeDelta::Millis(retry_interval_ms_));
462 }
463 
OnError(const std::string & error)464 void Streamer::Impl::OnError(const std::string& error) {
465   // Called from websocket thread.
466   if (registration_retries_left_) {
467     LOG(WARNING) << "Connection to operator failed (" << error << "), "
468                  << registration_retries_left_ << " retries left"
469                  << " (will retry in " << retry_interval_ms_ / 1000 << "s)";
470     --registration_retries_left_;
471     signal_thread_->PostDelayedTask(
472         [this]() {
473           // Need to reconnect and register again with operator
474           Register(operator_observer_);
475         },
476         webrtc::TimeDelta::Millis(retry_interval_ms_));
477     retry_interval_ms_ *= 2;
478   } else {
479     LOG(ERROR) << "Error on connection with the operator: " << error;
480     signal_thread_->PostTask([this]() {
481       auto observer = operator_observer_.lock();
482       if (observer) {
483         observer->OnError();
484       }
485     });
486   }
487 }
488 
HandleConfigMessage(const Json::Value & server_message)489 void Streamer::Impl::HandleConfigMessage(const Json::Value& server_message) {
490   CHECK(signal_thread_->IsCurrent())
491       << __FUNCTION__ << " called from the wrong thread";
492   auto result = ParseIceServersMessage(server_message);
493   if (!result.ok()) {
494     LOG(WARNING) << "Failed to parse ice servers message from server: "
495                  << result.error().Trace();
496   }
497   operator_config_.servers = *result;
498 }
499 
HandleClientMessage(const Json::Value & server_message)500 void Streamer::Impl::HandleClientMessage(const Json::Value& server_message) {
501   CHECK(signal_thread_->IsCurrent())
502       << __FUNCTION__ << " called from the wrong thread";
503   if (!server_message.isMember(cuttlefish::webrtc_signaling::kClientIdField) ||
504       !server_message[cuttlefish::webrtc_signaling::kClientIdField].isInt()) {
505     LOG(ERROR) << "Client message received without valid client id";
506     return;
507   }
508   auto client_id =
509       server_message[cuttlefish::webrtc_signaling::kClientIdField].asInt();
510   if (!server_message.isMember(cuttlefish::webrtc_signaling::kPayloadField)) {
511     LOG(WARNING) << "Received empty client message";
512     return;
513   }
514   auto client_message =
515       server_message[cuttlefish::webrtc_signaling::kPayloadField];
516   if (clients_.count(client_id) == 0) {
517     auto client_handler = CreateClientHandler(client_id);
518     if (!client_handler) {
519       LOG(ERROR) << "Failed to create a new client handler";
520       return;
521     }
522     clients_.emplace(client_id, client_handler);
523   }
524   auto client_handler = clients_[client_id];
525 
526   client_handler->HandleMessage(client_message);
527 }
528 
OnReceive(const uint8_t * msg,size_t length,bool is_binary)529 void Streamer::Impl::OnReceive(const uint8_t* msg, size_t length,
530                                bool is_binary) {
531   // Usually called from websocket thread.
532   Json::Value server_message;
533   // Once OnReceive returns the buffer can be destroyed/recycled at any time, so
534   // parse the data into a JSON object while still on the websocket thread.
535   if (is_binary || !ParseMessage(msg, length, &server_message)) {
536     LOG(ERROR) << "Received invalid JSON from server: '"
537                << (is_binary ? std::string("(binary_data)")
538                              : std::string(msg, msg + length))
539                << "'";
540     return;
541   }
542   // Transition to the signal thread before member variables are accessed.
543   signal_thread_->PostTask([this, server_message]() {
544     if (!server_message.isMember(cuttlefish::webrtc_signaling::kTypeField) ||
545         !server_message[cuttlefish::webrtc_signaling::kTypeField].isString()) {
546       LOG(ERROR) << "No message_type field from server";
547       // Notify the caller
548       OnError(
549           "Invalid message received from operator: no message type field "
550           "present");
551       return;
552     }
553     auto type =
554         server_message[cuttlefish::webrtc_signaling::kTypeField].asString();
555     if (type == cuttlefish::webrtc_signaling::kConfigType) {
556       HandleConfigMessage(server_message);
557     } else if (type == cuttlefish::webrtc_signaling::kClientDisconnectType) {
558       if (!server_message.isMember(
559               cuttlefish::webrtc_signaling::kClientIdField) ||
560           !server_message.isMember(
561               cuttlefish::webrtc_signaling::kClientIdField)) {
562         LOG(ERROR) << "Invalid disconnect message received from server";
563         // Notify the caller
564         OnError("Invalid disconnect message: client_id is required");
565         return;
566       }
567       auto client_id =
568           server_message[cuttlefish::webrtc_signaling::kClientIdField].asInt();
569       LOG(INFO) << "Client " << client_id << " has disconnected.";
570       DestroyClientHandler(client_id);
571     } else if (type == cuttlefish::webrtc_signaling::kClientMessageType) {
572       HandleClientMessage(server_message);
573     } else {
574       LOG(ERROR) << "Unknown message type: " << type;
575       // Notify the caller
576       OnError("Invalid message received from operator: unknown message type");
577       return;
578     }
579   });
580 }
581 
CreateClientHandler(int client_id)582 std::shared_ptr<ClientHandler> Streamer::Impl::CreateClientHandler(
583     int client_id) {
584   CHECK(signal_thread_->IsCurrent())
585       << __FUNCTION__ << " called from the wrong thread";
586   auto observer = connection_observer_factory_->CreateObserver();
587 
588   auto client_handler = ClientHandler::Create(
589       client_id, observer, *this,
590       [this, client_id](const Json::Value& msg) {
591         SendMessageToClient(client_id, msg);
592       },
593       [this, client_id](bool isOpen) {
594         if (isOpen) {
595           SetupCameraForClient(client_id);
596         } else {
597           DestroyClientHandler(client_id);
598         }
599       });
600 
601   for (auto& entry : displays_) {
602     auto& label = entry.first;
603     auto& video_source = entry.second.source;
604 
605     auto video_track =
606         peer_connection_factory_->CreateVideoTrack(label, video_source.get());
607     client_handler->AddDisplay(video_track, label);
608   }
609 
610   for (auto& entry : audio_sources_) {
611     auto& label = entry.first;
612     auto& audio_stream = entry.second;
613     auto audio_track =
614         peer_connection_factory_->CreateAudioTrack(label, audio_stream.get());
615     client_handler->AddAudio(audio_track, label);
616   }
617 
618   return client_handler;
619 }
620 
621 Result<rtc::scoped_refptr<webrtc::PeerConnectionInterface>>
Build(webrtc::PeerConnectionObserver & observer,const std::vector<webrtc::PeerConnectionInterface::IceServer> & per_connection_servers)622 Streamer::Impl::Build(
623     webrtc::PeerConnectionObserver& observer,
624     const std::vector<webrtc::PeerConnectionInterface::IceServer>&
625         per_connection_servers) {
626   webrtc::PeerConnectionDependencies dependencies(&observer);
627   auto servers = operator_config_.servers;
628   servers.insert(servers.end(), per_connection_servers.begin(),
629                  per_connection_servers.end());
630   if (config_.udp_port_range != config_.tcp_port_range) {
631     // libwebrtc removed the ability to provide a packet socket factory when
632     // creating a peer connection. They plan to provide that functionality with
633     // the peer connection factory, but that's currently incomplete (the packet
634     // socket factory is ignored by the peer connection factory). The only other
635     // choice to customize port ranges is through the port allocator config, but
636     // this is suboptimal as it only allows to specify a single port range that
637     // will be use for both tcp and udp ports.
638     LOG(WARNING) << "TCP and UDP port ranges differ, TCP connections may not "
639                     "work properly";
640   }
641   return CF_EXPECT(
642       CreatePeerConnection(peer_connection_factory_, std::move(dependencies),
643                            config_.udp_port_range.first,
644                            config_.udp_port_range.second, servers),
645       "Failed to build peer connection");
646 }
647 
SendMessageToClient(int client_id,const Json::Value & msg)648 void Streamer::Impl::SendMessageToClient(int client_id,
649                                          const Json::Value& msg) {
650   LOG(VERBOSE) << "Sending to client: " << msg.toStyledString();
651   CHECK(signal_thread_->IsCurrent())
652       << __FUNCTION__ << " called from the wrong thread";
653   Json::Value wrapper;
654   wrapper[cuttlefish::webrtc_signaling::kPayloadField] = msg;
655   wrapper[cuttlefish::webrtc_signaling::kTypeField] =
656       cuttlefish::webrtc_signaling::kForwardType;
657   wrapper[cuttlefish::webrtc_signaling::kClientIdField] = client_id;
658   // This is safe to call from the webrtc threads because
659   // ServerConnection(s) are thread safe
660   server_connection_->Send(wrapper);
661 }
662 
DestroyClientHandler(int client_id)663 void Streamer::Impl::DestroyClientHandler(int client_id) {
664   // Usually called from signal thread, could be called from websocket thread or
665   // an application thread.
666   signal_thread_->PostTask([this, client_id]() {
667     // This needs to be 'posted' to the thread instead of 'invoked'
668     // immediately for two reasons:
669     // * The client handler is destroyed by this code, it's generally a
670     // bad idea (though not necessarily wrong) to return to a member
671     // function of a destroyed object.
672     // * The client handler may call this from within a peer connection
673     // observer callback, destroying the client handler there leads to a
674     // deadlock.
675     clients_.erase(client_id);
676   });
677 }
678 
SetupCameraForClient(int client_id)679 void Streamer::Impl::SetupCameraForClient(int client_id) {
680   if (!camera_streamer_) {
681     return;
682   }
683   auto client_handler = clients_[client_id];
684   if (client_handler) {
685     auto camera_track = client_handler->GetCameraStream();
686     if (camera_track) {
687       camera_track->AddOrUpdateSink(camera_streamer_.get(),
688                                     rtc::VideoSinkWants());
689     }
690   }
691 }
692 
693 }  // namespace webrtc_streaming
694 }  // namespace cuttlefish
695