• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "host/frontend/webrtc/lib/streamer.h"
18 
19 #include <android-base/logging.h>
20 #include <json/json.h>
21 
22 #include <api/audio_codecs/audio_decoder_factory.h>
23 #include <api/audio_codecs/audio_encoder_factory.h>
24 #include <api/audio_codecs/builtin_audio_decoder_factory.h>
25 #include <api/audio_codecs/builtin_audio_encoder_factory.h>
26 #include <api/create_peerconnection_factory.h>
27 #include <api/peer_connection_interface.h>
28 #include <api/video_codecs/builtin_video_decoder_factory.h>
29 #include <api/video_codecs/builtin_video_encoder_factory.h>
30 #include <api/video_codecs/video_decoder_factory.h>
31 #include <api/video_codecs/video_encoder_factory.h>
32 #include <media/base/video_broadcaster.h>
33 #include <pc/video_track_source.h>
34 
35 #include "host/frontend/webrtc/lib/audio_device.h"
36 #include "host/frontend/webrtc/lib/audio_track_source_impl.h"
37 #include "host/frontend/webrtc/lib/camera_streamer.h"
38 #include "host/frontend/webrtc/lib/client_handler.h"
39 #include "host/frontend/webrtc/lib/port_range_socket_factory.h"
40 #include "host/frontend/webrtc/lib/video_track_source_impl.h"
41 #include "host/frontend/webrtc/lib/vp8only_encoder_factory.h"
42 #include "host/frontend/webrtc_operator/constants/signaling_constants.h"
43 
44 namespace cuttlefish {
45 namespace webrtc_streaming {
46 namespace {
47 
48 constexpr auto kStreamIdField = "stream_id";
49 constexpr auto kXResField = "x_res";
50 constexpr auto kYResField = "y_res";
51 constexpr auto kDpiField = "dpi";
52 constexpr auto kIsTouchField = "is_touch";
53 constexpr auto kDisplaysField = "displays";
54 constexpr auto kAudioStreamsField = "audio_streams";
55 constexpr auto kHardwareField = "hardware";
56 constexpr auto kControlPanelButtonCommand = "command";
57 constexpr auto kControlPanelButtonTitle = "title";
58 constexpr auto kControlPanelButtonIconName = "icon_name";
59 constexpr auto kControlPanelButtonShellCommand = "shell_command";
60 constexpr auto kControlPanelButtonDeviceStates = "device_states";
61 constexpr auto kControlPanelButtonLidSwitchOpen = "lid_switch_open";
62 constexpr auto kControlPanelButtonHingeAngleValue = "hinge_angle_value";
63 constexpr auto kCustomControlPanelButtonsField = "custom_control_panel_buttons";
64 
65 constexpr int kRegistrationRetries = 3;
66 constexpr int kRetryFirstIntervalMs = 1000;
67 constexpr int kReconnectRetries = 100;
68 constexpr int kReconnectIntervalMs = 1000;
69 
ParseMessage(const uint8_t * data,size_t length,Json::Value * msg_out)70 bool ParseMessage(const uint8_t* data, size_t length, Json::Value* msg_out) {
71   auto str = reinterpret_cast<const char*>(data);
72   Json::CharReaderBuilder builder;
73   std::unique_ptr<Json::CharReader> json_reader(builder.newCharReader());
74   std::string errorMessage;
75   return json_reader->parse(str, str + length, msg_out, &errorMessage);
76 }
77 
CreateAndStartThread(const std::string & name)78 std::unique_ptr<rtc::Thread> CreateAndStartThread(const std::string& name) {
79   auto thread = rtc::Thread::CreateWithSocketServer();
80   if (!thread) {
81     LOG(ERROR) << "Failed to create " << name << " thread";
82     return nullptr;
83   }
84   thread->SetName(name, nullptr);
85   if (!thread->Start()) {
86     LOG(ERROR) << "Failed to start " << name << " thread";
87     return nullptr;
88   }
89   return thread;
90 }
91 
92 struct DisplayDescriptor {
93   int width;
94   int height;
95   int dpi;
96   bool touch_enabled;
97   rtc::scoped_refptr<webrtc::VideoTrackSourceInterface> source;
98 };
99 
100 struct ControlPanelButtonDescriptor {
101   std::string command;
102   std::string title;
103   std::string icon_name;
104   std::optional<std::string> shell_command;
105   std::vector<DeviceState> device_states;
106 };
107 
108 // TODO (jemoreira): move to a place in common with the signaling server
109 struct OperatorServerConfig {
110   std::vector<webrtc::PeerConnectionInterface::IceServer> servers;
111 };
112 
113 // Wraps a scoped_refptr pointer to an audio device module
114 class AudioDeviceModuleWrapper : public AudioSource {
115  public:
AudioDeviceModuleWrapper(rtc::scoped_refptr<CfAudioDeviceModule> device_module)116   AudioDeviceModuleWrapper(
117       rtc::scoped_refptr<CfAudioDeviceModule> device_module)
118       : device_module_(device_module) {}
GetMoreAudioData(void * data,int bytes_per_sample,int samples_per_channel,int num_channels,int sample_rate,bool & muted)119   int GetMoreAudioData(void* data, int bytes_per_sample,
120                        int samples_per_channel, int num_channels,
121                        int sample_rate, bool& muted) override {
122     return device_module_->GetMoreAudioData(data, bytes_per_sample,
123                                             samples_per_channel, num_channels,
124                                             sample_rate, muted);
125   }
126 
device_module()127   rtc::scoped_refptr<CfAudioDeviceModule> device_module() {
128     return device_module_;
129   }
130 
131  private:
132   rtc::scoped_refptr<CfAudioDeviceModule> device_module_;
133 };
134 
135 }  // namespace
136 
137 
138 class Streamer::Impl : public ServerConnectionObserver,
139                        public std::enable_shared_from_this<ServerConnectionObserver> {
140  public:
141   std::shared_ptr<ClientHandler> CreateClientHandler(int client_id);
142 
143   void Register(std::weak_ptr<OperatorObserver> observer);
144 
145   void SendMessageToClient(int client_id, const Json::Value& msg);
146   void DestroyClientHandler(int client_id);
147   void SetupCameraForClient(int client_id);
148 
149   // WsObserver
150   void OnOpen() override;
151   void OnClose() override;
152   void OnError(const std::string& error) override;
153   void OnReceive(const uint8_t* msg, size_t length, bool is_binary) override;
154 
155   void HandleConfigMessage(const Json::Value& msg);
156   void HandleClientMessage(const Json::Value& server_message);
157 
158   // All accesses to these variables happen from the signal_thread, so there is
159   // no need for extra synchronization mechanisms (mutex)
160   StreamerConfig config_;
161   OperatorServerConfig operator_config_;
162   std::unique_ptr<ServerConnection> server_connection_;
163   std::shared_ptr<ConnectionObserverFactory> connection_observer_factory_;
164   rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface>
165       peer_connection_factory_;
166   std::unique_ptr<rtc::Thread> network_thread_;
167   std::unique_ptr<rtc::Thread> worker_thread_;
168   std::unique_ptr<rtc::Thread> signal_thread_;
169   std::map<std::string, DisplayDescriptor> displays_;
170   std::map<std::string, rtc::scoped_refptr<AudioTrackSourceImpl>>
171       audio_sources_;
172   std::map<int, std::shared_ptr<ClientHandler>> clients_;
173   std::weak_ptr<OperatorObserver> operator_observer_;
174   std::map<std::string, std::string> hardware_;
175   std::vector<ControlPanelButtonDescriptor> custom_control_panel_buttons_;
176   std::shared_ptr<AudioDeviceModuleWrapper> audio_device_module_;
177   std::unique_ptr<CameraStreamer> camera_streamer_;
178   int registration_retries_left_ = kRegistrationRetries;
179   int retry_interval_ms_ = kRetryFirstIntervalMs;
180 };
181 
Streamer(std::unique_ptr<Streamer::Impl> impl)182 Streamer::Streamer(std::unique_ptr<Streamer::Impl> impl)
183     : impl_(std::move(impl)) {}
184 
185 /* static */
Create(const StreamerConfig & cfg,std::shared_ptr<ConnectionObserverFactory> connection_observer_factory)186 std::unique_ptr<Streamer> Streamer::Create(
187     const StreamerConfig& cfg,
188     std::shared_ptr<ConnectionObserverFactory> connection_observer_factory) {
189 
190   rtc::LogMessage::LogToDebug(rtc::LS_ERROR);
191 
192   std::unique_ptr<Streamer::Impl> impl(new Streamer::Impl());
193   impl->config_ = cfg;
194   impl->connection_observer_factory_ = connection_observer_factory;
195 
196   impl->network_thread_ = CreateAndStartThread("network-thread");
197   impl->worker_thread_ = CreateAndStartThread("work-thread");
198   impl->signal_thread_ = CreateAndStartThread("signal-thread");
199   if (!impl->network_thread_ || !impl->worker_thread_ ||
200       !impl->signal_thread_) {
201     return nullptr;
202   }
203 
204   impl->audio_device_module_ = std::make_shared<AudioDeviceModuleWrapper>(
205       rtc::scoped_refptr<CfAudioDeviceModule>(
206           new rtc::RefCountedObject<CfAudioDeviceModule>()));
207 
208   impl->peer_connection_factory_ = webrtc::CreatePeerConnectionFactory(
209       impl->network_thread_.get(), impl->worker_thread_.get(),
210       impl->signal_thread_.get(), impl->audio_device_module_->device_module(),
211       webrtc::CreateBuiltinAudioEncoderFactory(),
212       webrtc::CreateBuiltinAudioDecoderFactory(),
213       std::make_unique<VP8OnlyEncoderFactory>(
214           webrtc::CreateBuiltinVideoEncoderFactory()),
215       webrtc::CreateBuiltinVideoDecoderFactory(), nullptr /* audio_mixer */,
216       nullptr /* audio_processing */);
217 
218   if (!impl->peer_connection_factory_) {
219     LOG(ERROR) << "Failed to create peer connection factory";
220     return nullptr;
221   }
222 
223   webrtc::PeerConnectionFactoryInterface::Options options;
224   // By default the loopback network is ignored, but generating candidates for
225   // it is useful when using TCP port forwarding.
226   options.network_ignore_mask = 0;
227   impl->peer_connection_factory_->SetOptions(options);
228 
229   return std::unique_ptr<Streamer>(new Streamer(std::move(impl)));
230 }
231 
AddDisplay(const std::string & label,int width,int height,int dpi,bool touch_enabled)232 std::shared_ptr<VideoSink> Streamer::AddDisplay(const std::string& label,
233                                                 int width, int height, int dpi,
234                                                 bool touch_enabled) {
235   // Usually called from an application thread
236   return impl_->signal_thread_->Invoke<std::shared_ptr<VideoSink>>(
237       RTC_FROM_HERE,
238       [this, &label, width, height, dpi,
239        touch_enabled]() -> std::shared_ptr<VideoSink> {
240         if (impl_->displays_.count(label)) {
241           LOG(ERROR) << "Display with same label already exists: " << label;
242           return nullptr;
243         }
244         rtc::scoped_refptr<VideoTrackSourceImpl> source(
245             new rtc::RefCountedObject<VideoTrackSourceImpl>(width, height));
246         impl_->displays_[label] = {width, height, dpi, touch_enabled, source};
247         return std::shared_ptr<VideoSink>(
248             new VideoTrackSourceImplSinkWrapper(source));
249       });
250 }
251 
AddAudioStream(const std::string & label)252 std::shared_ptr<AudioSink> Streamer::AddAudioStream(const std::string& label) {
253   // Usually called from an application thread
254   return impl_->signal_thread_->Invoke<std::shared_ptr<AudioSink>>(
255       RTC_FROM_HERE, [this, &label]() -> std::shared_ptr<AudioSink> {
256         if (impl_->audio_sources_.count(label)) {
257           LOG(ERROR) << "Audio stream with same label already exists: "
258                      << label;
259           return nullptr;
260         }
261         rtc::scoped_refptr<AudioTrackSourceImpl> source(
262             new rtc::RefCountedObject<AudioTrackSourceImpl>());
263         impl_->audio_sources_[label] = source;
264         return std::shared_ptr<AudioSink>(
265             new AudioTrackSourceImplSinkWrapper(source));
266       });
267 }
268 
GetAudioSource()269 std::shared_ptr<AudioSource> Streamer::GetAudioSource() {
270   return impl_->audio_device_module_;
271 }
272 
AddCamera(unsigned int port,unsigned int cid)273 CameraController* Streamer::AddCamera(unsigned int port, unsigned int cid) {
274   impl_->camera_streamer_ = std::make_unique<CameraStreamer>(port, cid);
275   return impl_->camera_streamer_.get();
276 }
277 
SetHardwareSpec(std::string key,std::string value)278 void Streamer::SetHardwareSpec(std::string key, std::string value) {
279   impl_->hardware_.emplace(key, value);
280 }
281 
AddCustomControlPanelButton(const std::string & command,const std::string & title,const std::string & icon_name)282 void Streamer::AddCustomControlPanelButton(const std::string& command,
283                                            const std::string& title,
284                                            const std::string& icon_name) {
285   ControlPanelButtonDescriptor button = {
286       .command = command, .title = title, .icon_name = icon_name};
287   impl_->custom_control_panel_buttons_.push_back(button);
288 }
289 
AddCustomControlPanelButtonWithShellCommand(const std::string & command,const std::string & title,const std::string & icon_name,const std::string & shell_command)290 void Streamer::AddCustomControlPanelButtonWithShellCommand(
291     const std::string& command, const std::string& title,
292     const std::string& icon_name, const std::string& shell_command) {
293   ControlPanelButtonDescriptor button = {
294       .command = command, .title = title, .icon_name = icon_name};
295   button.shell_command = shell_command;
296   impl_->custom_control_panel_buttons_.push_back(button);
297 }
298 
AddCustomControlPanelButtonWithDeviceStates(const std::string & command,const std::string & title,const std::string & icon_name,const std::vector<DeviceState> & device_states)299 void Streamer::AddCustomControlPanelButtonWithDeviceStates(
300     const std::string& command, const std::string& title,
301     const std::string& icon_name,
302     const std::vector<DeviceState>& device_states) {
303   ControlPanelButtonDescriptor button = {
304       .command = command, .title = title, .icon_name = icon_name};
305   button.device_states = device_states;
306   impl_->custom_control_panel_buttons_.push_back(button);
307 }
308 
Register(std::weak_ptr<OperatorObserver> observer)309 void Streamer::Register(std::weak_ptr<OperatorObserver> observer) {
310   // Usually called from an application thread
311   // No need to block the calling thread on this, the observer will be notified
312   // when the connection is established.
313   impl_->signal_thread_->PostTask(RTC_FROM_HERE, [this, observer]() {
314     impl_->Register(observer);
315   });
316 }
317 
Unregister()318 void Streamer::Unregister() {
319   // Usually called from an application thread.
320   impl_->signal_thread_->PostTask(
321       RTC_FROM_HERE, [this]() { impl_->server_connection_.reset(); });
322 }
323 
RecordDisplays(LocalRecorder & recorder)324 void Streamer::RecordDisplays(LocalRecorder& recorder) {
325   for (auto& [key, display] : impl_->displays_) {
326     rtc::scoped_refptr<webrtc::VideoTrackSourceInterface> source =
327         display.source;
328     auto deleter = [](webrtc::VideoTrackSourceInterface* source) {
329       source->Release();
330     };
331     std::shared_ptr<webrtc::VideoTrackSourceInterface> source_shared(
332         source.release(), deleter);
333     recorder.AddDisplay(display.width, display.height, source_shared);
334   }
335 }
336 
Register(std::weak_ptr<OperatorObserver> observer)337 void Streamer::Impl::Register(std::weak_ptr<OperatorObserver> observer) {
338   operator_observer_ = observer;
339   // When the connection is established the OnOpen function will be called where
340   // the registration will take place
341   if (!server_connection_) {
342     server_connection_ =
343         ServerConnection::Connect(config_.operator_server, weak_from_this());
344   } else {
345     // in case connection attempt is retried, just call Reconnect().
346     // Recreating server_connection_ object will destroy existing WSConnection
347     // object and task re-scheduling will fail
348     server_connection_->Reconnect();
349   }
350 }
351 
OnOpen()352 void Streamer::Impl::OnOpen() {
353   // Called from the websocket thread.
354   // Connected to operator.
355   signal_thread_->PostTask(RTC_FROM_HERE, [this]() {
356     Json::Value register_obj;
357     register_obj[cuttlefish::webrtc_signaling::kTypeField] =
358         cuttlefish::webrtc_signaling::kRegisterType;
359     register_obj[cuttlefish::webrtc_signaling::kDeviceIdField] =
360         config_.device_id;
361     CHECK(config_.client_files_port >= 0) << "Invalide device port provided";
362     register_obj[cuttlefish::webrtc_signaling::kDevicePortField] =
363         config_.client_files_port;
364 
365     Json::Value device_info;
366     Json::Value displays(Json::ValueType::arrayValue);
367     // No need to synchronize with other accesses to display_ because all
368     // happens on signal_thread.
369     for (auto& entry : displays_) {
370       Json::Value display;
371       display[kStreamIdField] = entry.first;
372       display[kXResField] = entry.second.width;
373       display[kYResField] = entry.second.height;
374       display[kDpiField] = entry.second.dpi;
375       display[kIsTouchField] = true;
376       displays.append(display);
377     }
378     device_info[kDisplaysField] = displays;
379     Json::Value audio_streams(Json::ValueType::arrayValue);
380     for (auto& entry : audio_sources_) {
381       Json::Value audio;
382       audio[kStreamIdField] = entry.first;
383       audio_streams.append(audio);
384     }
385     device_info[kAudioStreamsField] = audio_streams;
386     Json::Value hardware;
387     for (const auto& [k, v] : hardware_) {
388       hardware[k] = v;
389     }
390     device_info[kHardwareField] = hardware;
391     Json::Value custom_control_panel_buttons(Json::arrayValue);
392     for (const auto& button : custom_control_panel_buttons_) {
393       Json::Value button_entry;
394       button_entry[kControlPanelButtonCommand] = button.command;
395       button_entry[kControlPanelButtonTitle] = button.title;
396       button_entry[kControlPanelButtonIconName] = button.icon_name;
397       if (button.shell_command) {
398         button_entry[kControlPanelButtonShellCommand] = *(button.shell_command);
399       } else if (!button.device_states.empty()) {
400         Json::Value device_states(Json::arrayValue);
401         for (const DeviceState& device_state : button.device_states) {
402           Json::Value device_state_entry;
403           if (device_state.lid_switch_open) {
404             device_state_entry[kControlPanelButtonLidSwitchOpen] =
405                 *device_state.lid_switch_open;
406           }
407           if (device_state.hinge_angle_value) {
408             device_state_entry[kControlPanelButtonHingeAngleValue] =
409                 *device_state.hinge_angle_value;
410           }
411           device_states.append(device_state_entry);
412         }
413         button_entry[kControlPanelButtonDeviceStates] = device_states;
414       }
415       custom_control_panel_buttons.append(button_entry);
416     }
417     device_info[kCustomControlPanelButtonsField] = custom_control_panel_buttons;
418     register_obj[cuttlefish::webrtc_signaling::kDeviceInfoField] = device_info;
419     server_connection_->Send(register_obj);
420     // Do this last as OnRegistered() is user code and may take some time to
421     // complete (although it shouldn't...)
422     auto observer = operator_observer_.lock();
423     if (observer) {
424       observer->OnRegistered();
425     }
426   });
427 }
428 
OnClose()429 void Streamer::Impl::OnClose() {
430   // Called from websocket thread
431   // The operator shouldn't close the connection with the client, it's up to the
432   // device to decide when to disconnect.
433   LOG(WARNING) << "Connection with server closed unexpectedly";
434   signal_thread_->PostTask(RTC_FROM_HERE, [this]() {
435     auto observer = operator_observer_.lock();
436     if (observer) {
437       observer->OnClose();
438     }
439   });
440   LOG(INFO) << "Trying to re-connect to operator..";
441   registration_retries_left_ = kReconnectRetries;
442   retry_interval_ms_ = kReconnectIntervalMs;
443   signal_thread_->PostDelayedTask(
444       RTC_FROM_HERE, [this]() { Register(operator_observer_); },
445       retry_interval_ms_);
446 }
447 
OnError(const std::string & error)448 void Streamer::Impl::OnError(const std::string& error) {
449   // Called from websocket thread.
450   if (registration_retries_left_) {
451     LOG(WARNING) << "Connection to operator failed (" << error << "), "
452                  << registration_retries_left_ << " retries left"
453                  << " (will retry in " << retry_interval_ms_ / 1000 << "s)";
454     --registration_retries_left_;
455     signal_thread_->PostDelayedTask(
456         RTC_FROM_HERE,
457         [this]() {
458           // Need to reconnect and register again with operator
459           Register(operator_observer_);
460         },
461         retry_interval_ms_);
462     retry_interval_ms_ *= 2;
463   } else {
464     LOG(ERROR) << "Error on connection with the operator: " << error;
465     signal_thread_->PostTask(RTC_FROM_HERE, [this]() {
466       auto observer = operator_observer_.lock();
467       if (observer) {
468         observer->OnError();
469       }
470     });
471   }
472 }
473 
HandleConfigMessage(const Json::Value & server_message)474 void Streamer::Impl::HandleConfigMessage(const Json::Value& server_message) {
475   CHECK(signal_thread_->IsCurrent())
476       << __FUNCTION__ << " called from the wrong thread";
477   if (server_message.isMember("ice_servers") &&
478       server_message["ice_servers"].isArray()) {
479     auto servers = server_message["ice_servers"];
480     operator_config_.servers.clear();
481     for (int server_idx = 0; server_idx < servers.size(); server_idx++) {
482       auto server = servers[server_idx];
483       webrtc::PeerConnectionInterface::IceServer ice_server;
484       if (!server.isMember("urls") || !server["urls"].isArray()) {
485         // The urls field is required
486         LOG(WARNING)
487             << "Invalid ICE server specification obtained from server: "
488             << server.toStyledString();
489         continue;
490       }
491       auto urls = server["urls"];
492       for (int url_idx = 0; url_idx < urls.size(); url_idx++) {
493         auto url = urls[url_idx];
494         if (!url.isString()) {
495           LOG(WARNING) << "Non string 'urls' field in ice server: "
496                        << url.toStyledString();
497           continue;
498         }
499         ice_server.urls.push_back(url.asString());
500         if (server.isMember("credential") && server["credential"].isString()) {
501           ice_server.password = server["credential"].asString();
502         }
503         if (server.isMember("username") && server["username"].isString()) {
504           ice_server.username = server["username"].asString();
505         }
506         operator_config_.servers.push_back(ice_server);
507       }
508     }
509   }
510 }
511 
HandleClientMessage(const Json::Value & server_message)512 void Streamer::Impl::HandleClientMessage(const Json::Value& server_message) {
513   CHECK(signal_thread_->IsCurrent())
514       << __FUNCTION__ << " called from the wrong thread";
515   if (!server_message.isMember(cuttlefish::webrtc_signaling::kClientIdField) ||
516       !server_message[cuttlefish::webrtc_signaling::kClientIdField].isInt()) {
517     LOG(ERROR) << "Client message received without valid client id";
518     return;
519   }
520   auto client_id =
521       server_message[cuttlefish::webrtc_signaling::kClientIdField].asInt();
522   if (!server_message.isMember(cuttlefish::webrtc_signaling::kPayloadField)) {
523     LOG(WARNING) << "Received empty client message";
524     return;
525   }
526   auto client_message =
527       server_message[cuttlefish::webrtc_signaling::kPayloadField];
528   if (clients_.count(client_id) == 0) {
529     auto client_handler = CreateClientHandler(client_id);
530     if (!client_handler) {
531       LOG(ERROR) << "Failed to create a new client handler";
532       return;
533     }
534     clients_.emplace(client_id, client_handler);
535   }
536   auto client_handler = clients_[client_id];
537 
538   client_handler->HandleMessage(client_message);
539 }
540 
OnReceive(const uint8_t * msg,size_t length,bool is_binary)541 void Streamer::Impl::OnReceive(const uint8_t* msg, size_t length,
542                                bool is_binary) {
543   // Usually called from websocket thread.
544   Json::Value server_message;
545   // Once OnReceive returns the buffer can be destroyed/recycled at any time, so
546   // parse the data into a JSON object while still on the websocket thread.
547   if (is_binary || !ParseMessage(msg, length, &server_message)) {
548     LOG(ERROR) << "Received invalid JSON from server: '"
549                << (is_binary ? std::string("(binary_data)")
550                              : std::string(msg, msg + length))
551                << "'";
552     return;
553   }
554   // Transition to the signal thread before member variables are accessed.
555   signal_thread_->PostTask(RTC_FROM_HERE, [this, server_message]() {
556     if (!server_message.isMember(cuttlefish::webrtc_signaling::kTypeField) ||
557         !server_message[cuttlefish::webrtc_signaling::kTypeField].isString()) {
558       LOG(ERROR) << "No message_type field from server";
559       // Notify the caller
560       OnError(
561           "Invalid message received from operator: no message type field "
562           "present");
563       return;
564     }
565     auto type =
566         server_message[cuttlefish::webrtc_signaling::kTypeField].asString();
567     if (type == cuttlefish::webrtc_signaling::kConfigType) {
568       HandleConfigMessage(server_message);
569     } else if (type == cuttlefish::webrtc_signaling::kClientDisconnectType) {
570       if (!server_message.isMember(
571               cuttlefish::webrtc_signaling::kClientIdField) ||
572           !server_message.isMember(
573               cuttlefish::webrtc_signaling::kClientIdField)) {
574         LOG(ERROR) << "Invalid disconnect message received from server";
575         // Notify the caller
576         OnError("Invalid disconnect message: client_id is required");
577         return;
578       }
579       auto client_id =
580           server_message[cuttlefish::webrtc_signaling::kClientIdField].asInt();
581       LOG(INFO) << "Client " << client_id << " has disconnected.";
582       DestroyClientHandler(client_id);
583     } else if (type == cuttlefish::webrtc_signaling::kClientMessageType) {
584       HandleClientMessage(server_message);
585     } else {
586       LOG(ERROR) << "Unknown message type: " << type;
587       // Notify the caller
588       OnError("Invalid message received from operator: unknown message type");
589       return;
590     }
591   });
592 }
593 
CreateClientHandler(int client_id)594 std::shared_ptr<ClientHandler> Streamer::Impl::CreateClientHandler(
595     int client_id) {
596   CHECK(signal_thread_->IsCurrent())
597       << __FUNCTION__ << " called from the wrong thread";
598   auto observer = connection_observer_factory_->CreateObserver();
599 
600   auto client_handler = ClientHandler::Create(
601       client_id, observer,
602       [this, client_id](const Json::Value& msg) {
603         SendMessageToClient(client_id, msg);
604       },
605       [this, client_id](bool isOpen) {
606         if (isOpen) {
607           SetupCameraForClient(client_id);
608         } else {
609           DestroyClientHandler(client_id);
610         }
611       });
612 
613   webrtc::PeerConnectionInterface::RTCConfiguration config;
614   config.sdp_semantics = webrtc::SdpSemantics::kUnifiedPlan;
615   config.enable_dtls_srtp = true;
616   config.servers.insert(config.servers.end(), operator_config_.servers.begin(),
617                         operator_config_.servers.end());
618   webrtc::PeerConnectionDependencies dependencies(client_handler.get());
619   // PortRangeSocketFactory's super class' constructor needs to be called on the
620   // network thread or have it as a parameter
621   dependencies.packet_socket_factory.reset(new PortRangeSocketFactory(
622       network_thread_.get(), config_.udp_port_range, config_.tcp_port_range));
623   auto peer_connection = peer_connection_factory_->CreatePeerConnection(
624       config, std::move(dependencies));
625 
626   if (!peer_connection) {
627     LOG(ERROR) << "Failed to create peer connection";
628     return nullptr;
629   }
630 
631   if (!client_handler->SetPeerConnection(std::move(peer_connection))) {
632     return nullptr;
633   }
634 
635   for (auto& entry : displays_) {
636     auto& label = entry.first;
637     auto& video_source = entry.second.source;
638 
639     auto video_track =
640         peer_connection_factory_->CreateVideoTrack(label, video_source.get());
641     client_handler->AddDisplay(video_track, label);
642   }
643 
644   for (auto& entry : audio_sources_) {
645     auto& label = entry.first;
646     auto& audio_stream = entry.second;
647     auto audio_track =
648         peer_connection_factory_->CreateAudioTrack(label, audio_stream.get());
649     client_handler->AddAudio(audio_track, label);
650   }
651 
652   return client_handler;
653 }
654 
SendMessageToClient(int client_id,const Json::Value & msg)655 void Streamer::Impl::SendMessageToClient(int client_id,
656                                          const Json::Value& msg) {
657   LOG(VERBOSE) << "Sending to client: " << msg.toStyledString();
658   CHECK(signal_thread_->IsCurrent())
659       << __FUNCTION__ << " called from the wrong thread";
660   Json::Value wrapper;
661   wrapper[cuttlefish::webrtc_signaling::kPayloadField] = msg;
662   wrapper[cuttlefish::webrtc_signaling::kTypeField] =
663       cuttlefish::webrtc_signaling::kForwardType;
664   wrapper[cuttlefish::webrtc_signaling::kClientIdField] = client_id;
665   // This is safe to call from the webrtc threads because
666   // ServerConnection(s) are thread safe
667   server_connection_->Send(wrapper);
668 }
669 
DestroyClientHandler(int client_id)670 void Streamer::Impl::DestroyClientHandler(int client_id) {
671   // Usually called from signal thread, could be called from websocket thread or
672   // an application thread.
673   signal_thread_->PostTask(RTC_FROM_HERE, [this, client_id]() {
674     // This needs to be 'posted' to the thread instead of 'invoked'
675     // immediately for two reasons:
676     // * The client handler is destroyed by this code, it's generally a
677     // bad idea (though not necessarily wrong) to return to a member
678     // function of a destroyed object.
679     // * The client handler may call this from within a peer connection
680     // observer callback, destroying the client handler there leads to a
681     // deadlock.
682     clients_.erase(client_id);
683   });
684 }
685 
SetupCameraForClient(int client_id)686 void Streamer::Impl::SetupCameraForClient(int client_id) {
687   if (!camera_streamer_) {
688     return;
689   }
690   auto client_handler = clients_[client_id];
691   if (client_handler) {
692     auto camera_track = client_handler->GetCameraStream();
693     if (camera_track) {
694       camera_track->AddOrUpdateSink(camera_streamer_.get(),
695                                     rtc::VideoSinkWants());
696     }
697   }
698 }
699 
700 }  // namespace webrtc_streaming
701 }  // namespace cuttlefish
702