• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "host/frontend/webrtc/lib/streamer.h"
18 
19 #include <android-base/logging.h>
20 #include <json/json.h>
21 
22 #include <api/audio_codecs/audio_decoder_factory.h>
23 #include <api/audio_codecs/audio_encoder_factory.h>
24 #include <api/audio_codecs/builtin_audio_decoder_factory.h>
25 #include <api/audio_codecs/builtin_audio_encoder_factory.h>
26 #include <api/create_peerconnection_factory.h>
27 #include <api/peer_connection_interface.h>
28 #include <api/video_codecs/builtin_video_decoder_factory.h>
29 #include <api/video_codecs/builtin_video_encoder_factory.h>
30 #include <api/video_codecs/video_decoder_factory.h>
31 #include <api/video_codecs/video_encoder_factory.h>
32 #include <media/base/video_broadcaster.h>
33 #include <pc/video_track_source.h>
34 
35 #include "host/frontend/webrtc/lib/audio_device.h"
36 #include "host/frontend/webrtc/lib/audio_track_source_impl.h"
37 #include "host/frontend/webrtc/lib/client_handler.h"
38 #include "host/frontend/webrtc/lib/port_range_socket_factory.h"
39 #include "host/frontend/webrtc/lib/video_track_source_impl.h"
40 #include "host/frontend/webrtc/lib/vp8only_encoder_factory.h"
41 #include "host/frontend/webrtc_operator/constants/signaling_constants.h"
42 
43 namespace cuttlefish {
44 namespace webrtc_streaming {
45 namespace {
46 
47 constexpr auto kStreamIdField = "stream_id";
48 constexpr auto kXResField = "x_res";
49 constexpr auto kYResField = "y_res";
50 constexpr auto kDpiField = "dpi";
51 constexpr auto kIsTouchField = "is_touch";
52 constexpr auto kDisplaysField = "displays";
53 constexpr auto kAudioStreamsField = "audio_streams";
54 constexpr auto kHardwareField = "hardware";
55 constexpr auto kControlPanelButtonCommand = "command";
56 constexpr auto kControlPanelButtonTitle = "title";
57 constexpr auto kControlPanelButtonIconName = "icon_name";
58 constexpr auto kControlPanelButtonShellCommand = "shell_command";
59 constexpr auto kControlPanelButtonDeviceStates = "device_states";
60 constexpr auto kControlPanelButtonLidSwitchOpen = "lid_switch_open";
61 constexpr auto kControlPanelButtonHingeAngleValue = "hinge_angle_value";
62 constexpr auto kCustomControlPanelButtonsField = "custom_control_panel_buttons";
63 
SendJson(WsConnection * ws_conn,const Json::Value & data)64 void SendJson(WsConnection* ws_conn, const Json::Value& data) {
65   Json::StreamWriterBuilder factory;
66   auto data_str = Json::writeString(factory, data);
67   ws_conn->Send(reinterpret_cast<const uint8_t*>(data_str.c_str()),
68                 data_str.size());
69 }
70 
ParseMessage(const uint8_t * data,size_t length,Json::Value * msg_out)71 bool ParseMessage(const uint8_t* data, size_t length, Json::Value* msg_out) {
72   auto str = reinterpret_cast<const char*>(data);
73   Json::CharReaderBuilder builder;
74   std::unique_ptr<Json::CharReader> json_reader(builder.newCharReader());
75   std::string errorMessage;
76   return json_reader->parse(str, str + length, msg_out, &errorMessage);
77 }
78 
CreateAndStartThread(const std::string & name)79 std::unique_ptr<rtc::Thread> CreateAndStartThread(const std::string& name) {
80   auto thread = rtc::Thread::CreateWithSocketServer();
81   if (!thread) {
82     LOG(ERROR) << "Failed to create " << name << " thread";
83     return nullptr;
84   }
85   thread->SetName(name, nullptr);
86   if (!thread->Start()) {
87     LOG(ERROR) << "Failed to start " << name << " thread";
88     return nullptr;
89   }
90   return thread;
91 }
92 
93 struct DisplayDescriptor {
94   int width;
95   int height;
96   int dpi;
97   bool touch_enabled;
98   rtc::scoped_refptr<webrtc::VideoTrackSourceInterface> source;
99 };
100 
101 struct ControlPanelButtonDescriptor {
102   std::string command;
103   std::string title;
104   std::string icon_name;
105   std::optional<std::string> shell_command;
106   std::vector<DeviceState> device_states;
107 };
108 
109 // TODO (jemoreira): move to a place in common with the signaling server
110 struct OperatorServerConfig {
111   std::vector<webrtc::PeerConnectionInterface::IceServer> servers;
112 };
113 
114 // Wraps a scoped_refptr pointer to an audio device module
115 class AudioDeviceModuleWrapper : public AudioSource {
116  public:
AudioDeviceModuleWrapper(rtc::scoped_refptr<CfAudioDeviceModule> device_module)117   AudioDeviceModuleWrapper(
118       rtc::scoped_refptr<CfAudioDeviceModule> device_module)
119       : device_module_(device_module) {}
GetMoreAudioData(void * data,int bytes_per_sample,int samples_per_channel,int num_channels,int sample_rate,bool & muted)120   int GetMoreAudioData(void* data, int bytes_per_sample,
121                        int samples_per_channel, int num_channels,
122                        int sample_rate, bool& muted) override {
123     return device_module_->GetMoreAudioData(data, bytes_per_sample,
124                                             samples_per_channel, num_channels,
125                                             sample_rate, muted);
126   }
127 
device_module()128   rtc::scoped_refptr<CfAudioDeviceModule> device_module() {
129     return device_module_;
130   }
131 
132  private:
133   rtc::scoped_refptr<CfAudioDeviceModule> device_module_;
134 };
135 
136 }  // namespace
137 
138 class Streamer::Impl : public WsConnectionObserver {
139  public:
140   std::shared_ptr<ClientHandler> CreateClientHandler(int client_id);
141 
142   void SendMessageToClient(int client_id, const Json::Value& msg);
143   void DestroyClientHandler(int client_id);
144 
145   // WsObserver
146   void OnOpen() override;
147   void OnClose() override;
148   void OnError(const std::string& error) override;
149   void OnReceive(const uint8_t* msg, size_t length, bool is_binary) override;
150 
151   void HandleConfigMessage(const Json::Value& msg);
152   void HandleClientMessage(const Json::Value& server_message);
153 
154   // All accesses to these variables happen from the signal_thread, so there is
155   // no need for extra synchronization mechanisms (mutex)
156   StreamerConfig config_;
157   OperatorServerConfig operator_config_;
158   std::shared_ptr<WsConnection> server_connection_;
159   std::shared_ptr<ConnectionObserverFactory> connection_observer_factory_;
160   rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface>
161       peer_connection_factory_;
162   std::unique_ptr<rtc::Thread> network_thread_;
163   std::unique_ptr<rtc::Thread> worker_thread_;
164   std::unique_ptr<rtc::Thread> signal_thread_;
165   std::map<std::string, DisplayDescriptor> displays_;
166   std::map<std::string, rtc::scoped_refptr<AudioTrackSourceImpl>>
167       audio_sources_;
168   std::map<int, std::shared_ptr<ClientHandler>> clients_;
169   std::weak_ptr<OperatorObserver> operator_observer_;
170   std::map<std::string, std::string> hardware_;
171   std::vector<ControlPanelButtonDescriptor> custom_control_panel_buttons_;
172   std::shared_ptr<AudioDeviceModuleWrapper> audio_device_module_;
173 };
174 
Streamer(std::unique_ptr<Streamer::Impl> impl)175 Streamer::Streamer(std::unique_ptr<Streamer::Impl> impl)
176     : impl_(std::move(impl)) {}
177 
178 /* static */
Create(const StreamerConfig & cfg,std::shared_ptr<ConnectionObserverFactory> connection_observer_factory)179 std::unique_ptr<Streamer> Streamer::Create(
180     const StreamerConfig& cfg,
181     std::shared_ptr<ConnectionObserverFactory> connection_observer_factory) {
182 
183   rtc::LogMessage::LogToDebug(rtc::LS_ERROR);
184 
185   std::unique_ptr<Streamer::Impl> impl(new Streamer::Impl());
186   impl->config_ = cfg;
187   impl->connection_observer_factory_ = connection_observer_factory;
188 
189   impl->network_thread_ = CreateAndStartThread("network-thread");
190   impl->worker_thread_ = CreateAndStartThread("work-thread");
191   impl->signal_thread_ = CreateAndStartThread("signal-thread");
192   if (!impl->network_thread_ || !impl->worker_thread_ ||
193       !impl->signal_thread_) {
194     return nullptr;
195   }
196 
197   impl->audio_device_module_ = std::make_shared<AudioDeviceModuleWrapper>(
198       rtc::scoped_refptr<CfAudioDeviceModule>(
199           new rtc::RefCountedObject<CfAudioDeviceModule>()));
200 
201   impl->peer_connection_factory_ = webrtc::CreatePeerConnectionFactory(
202       impl->network_thread_.get(), impl->worker_thread_.get(),
203       impl->signal_thread_.get(), impl->audio_device_module_->device_module(),
204       webrtc::CreateBuiltinAudioEncoderFactory(),
205       webrtc::CreateBuiltinAudioDecoderFactory(),
206       std::make_unique<VP8OnlyEncoderFactory>(
207           webrtc::CreateBuiltinVideoEncoderFactory()),
208       webrtc::CreateBuiltinVideoDecoderFactory(), nullptr /* audio_mixer */,
209       nullptr /* audio_processing */);
210 
211   if (!impl->peer_connection_factory_) {
212     LOG(ERROR) << "Failed to create peer connection factory";
213     return nullptr;
214   }
215 
216   webrtc::PeerConnectionFactoryInterface::Options options;
217   // By default the loopback network is ignored, but generating candidates for
218   // it is useful when using TCP port forwarding.
219   options.network_ignore_mask = 0;
220   impl->peer_connection_factory_->SetOptions(options);
221 
222   return std::unique_ptr<Streamer>(new Streamer(std::move(impl)));
223 }
224 
AddDisplay(const std::string & label,int width,int height,int dpi,bool touch_enabled)225 std::shared_ptr<VideoSink> Streamer::AddDisplay(const std::string& label,
226                                                 int width, int height, int dpi,
227                                                 bool touch_enabled) {
228   // Usually called from an application thread
229   return impl_->signal_thread_->Invoke<std::shared_ptr<VideoSink>>(
230       RTC_FROM_HERE,
231       [this, &label, width, height, dpi,
232        touch_enabled]() -> std::shared_ptr<VideoSink> {
233         if (impl_->displays_.count(label)) {
234           LOG(ERROR) << "Display with same label already exists: " << label;
235           return nullptr;
236         }
237         rtc::scoped_refptr<VideoTrackSourceImpl> source(
238             new rtc::RefCountedObject<VideoTrackSourceImpl>(width, height));
239         impl_->displays_[label] = {width, height, dpi, touch_enabled, source};
240         return std::shared_ptr<VideoSink>(
241             new VideoTrackSourceImplSinkWrapper(source));
242       });
243 }
244 
AddAudioStream(const std::string & label)245 std::shared_ptr<AudioSink> Streamer::AddAudioStream(const std::string& label) {
246   // Usually called from an application thread
247   return impl_->signal_thread_->Invoke<std::shared_ptr<AudioSink>>(
248       RTC_FROM_HERE, [this, &label]() -> std::shared_ptr<AudioSink> {
249         if (impl_->audio_sources_.count(label)) {
250           LOG(ERROR) << "Audio stream with same label already exists: "
251                      << label;
252           return nullptr;
253         }
254         rtc::scoped_refptr<AudioTrackSourceImpl> source(
255             new rtc::RefCountedObject<AudioTrackSourceImpl>());
256         impl_->audio_sources_[label] = source;
257         return std::shared_ptr<AudioSink>(
258             new AudioTrackSourceImplSinkWrapper(source));
259       });
260 }
261 
GetAudioSource()262 std::shared_ptr<AudioSource> Streamer::GetAudioSource() {
263   return impl_->audio_device_module_;
264 }
265 
SetHardwareSpec(std::string key,std::string value)266 void Streamer::SetHardwareSpec(std::string key, std::string value) {
267   impl_->hardware_.emplace(key, value);
268 }
269 
AddCustomControlPanelButton(const std::string & command,const std::string & title,const std::string & icon_name)270 void Streamer::AddCustomControlPanelButton(const std::string& command,
271                                            const std::string& title,
272                                            const std::string& icon_name) {
273   ControlPanelButtonDescriptor button = {
274       .command = command, .title = title, .icon_name = icon_name};
275   impl_->custom_control_panel_buttons_.push_back(button);
276 }
277 
AddCustomControlPanelButtonWithShellCommand(const std::string & command,const std::string & title,const std::string & icon_name,const std::string & shell_command)278 void Streamer::AddCustomControlPanelButtonWithShellCommand(
279     const std::string& command, const std::string& title,
280     const std::string& icon_name, const std::string& shell_command) {
281   ControlPanelButtonDescriptor button = {
282       .command = command, .title = title, .icon_name = icon_name};
283   button.shell_command = shell_command;
284   impl_->custom_control_panel_buttons_.push_back(button);
285 }
286 
AddCustomControlPanelButtonWithDeviceStates(const std::string & command,const std::string & title,const std::string & icon_name,const std::vector<DeviceState> & device_states)287 void Streamer::AddCustomControlPanelButtonWithDeviceStates(
288     const std::string& command, const std::string& title,
289     const std::string& icon_name,
290     const std::vector<DeviceState>& device_states) {
291   ControlPanelButtonDescriptor button = {
292       .command = command, .title = title, .icon_name = icon_name};
293   button.device_states = device_states;
294   impl_->custom_control_panel_buttons_.push_back(button);
295 }
296 
Register(std::weak_ptr<OperatorObserver> observer)297 void Streamer::Register(std::weak_ptr<OperatorObserver> observer) {
298   // Usually called from an application thread
299   // No need to block the calling thread on this, the observer will be notified
300   // when the connection is established.
301   impl_->signal_thread_->PostTask(RTC_FROM_HERE, [this, observer]() {
302     impl_->operator_observer_ = observer;
303     // This can be a local variable since the connection object will keep a
304     // reference to it.
305     auto ws_context = WsConnectionContext::Create();
306     CHECK(ws_context) << "Failed to create websocket context";
307     impl_->server_connection_ = ws_context->CreateConnection(
308         impl_->config_.operator_server.port,
309         impl_->config_.operator_server.addr,
310         impl_->config_.operator_server.path,
311         impl_->config_.operator_server.security, impl_,
312         impl_->config_.operator_server.http_headers);
313 
314     CHECK(impl_->server_connection_)
315         << "Unable to create websocket connection object";
316 
317     impl_->server_connection_->Connect();
318   });
319 }
320 
Unregister()321 void Streamer::Unregister() {
322   // Usually called from an application thread.
323   impl_->signal_thread_->PostTask(
324       RTC_FROM_HERE, [this]() { impl_->server_connection_.reset(); });
325 }
326 
RecordDisplays(LocalRecorder & recorder)327 void Streamer::RecordDisplays(LocalRecorder& recorder) {
328   for (auto& [key, display] : impl_->displays_) {
329     rtc::scoped_refptr<webrtc::VideoTrackSourceInterface> source =
330         display.source;
331     auto deleter = [](webrtc::VideoTrackSourceInterface* source) {
332       source->Release();
333     };
334     std::shared_ptr<webrtc::VideoTrackSourceInterface> source_shared(
335         source.release(), deleter);
336     recorder.AddDisplay(display.width, display.height, source_shared);
337   }
338 }
339 
OnOpen()340 void Streamer::Impl::OnOpen() {
341   // Called from the websocket thread.
342   // Connected to operator.
343   signal_thread_->PostTask(RTC_FROM_HERE, [this]() {
344     Json::Value register_obj;
345     register_obj[cuttlefish::webrtc_signaling::kTypeField] =
346         cuttlefish::webrtc_signaling::kRegisterType;
347     register_obj[cuttlefish::webrtc_signaling::kDeviceIdField] =
348         config_.device_id;
349 
350     Json::Value device_info;
351     Json::Value displays(Json::ValueType::arrayValue);
352     // No need to synchronize with other accesses to display_ because all
353     // happens on signal_thread.
354     for (auto& entry : displays_) {
355       Json::Value display;
356       display[kStreamIdField] = entry.first;
357       display[kXResField] = entry.second.width;
358       display[kYResField] = entry.second.height;
359       display[kDpiField] = entry.second.dpi;
360       display[kIsTouchField] = true;
361       displays.append(display);
362     }
363     device_info[kDisplaysField] = displays;
364     Json::Value audio_streams(Json::ValueType::arrayValue);
365     for (auto& entry : audio_sources_) {
366       Json::Value audio;
367       audio[kStreamIdField] = entry.first;
368       audio_streams.append(audio);
369     }
370     device_info[kAudioStreamsField] = audio_streams;
371     Json::Value hardware;
372     for (const auto& [k, v] : hardware_) {
373       hardware[k] = v;
374     }
375     device_info[kHardwareField] = hardware;
376     Json::Value custom_control_panel_buttons(Json::arrayValue);
377     for (const auto& button : custom_control_panel_buttons_) {
378       Json::Value button_entry;
379       button_entry[kControlPanelButtonCommand] = button.command;
380       button_entry[kControlPanelButtonTitle] = button.title;
381       button_entry[kControlPanelButtonIconName] = button.icon_name;
382       if (button.shell_command) {
383         button_entry[kControlPanelButtonShellCommand] = *(button.shell_command);
384       } else if (!button.device_states.empty()) {
385         Json::Value device_states(Json::arrayValue);
386         for (const DeviceState& device_state : button.device_states) {
387           Json::Value device_state_entry;
388           if (device_state.lid_switch_open) {
389             device_state_entry[kControlPanelButtonLidSwitchOpen] =
390                 *device_state.lid_switch_open;
391           }
392           if (device_state.hinge_angle_value) {
393             device_state_entry[kControlPanelButtonHingeAngleValue] =
394                 *device_state.hinge_angle_value;
395           }
396           device_states.append(device_state_entry);
397         }
398         button_entry[kControlPanelButtonDeviceStates] = device_states;
399       }
400       custom_control_panel_buttons.append(button_entry);
401     }
402     device_info[kCustomControlPanelButtonsField] = custom_control_panel_buttons;
403     register_obj[cuttlefish::webrtc_signaling::kDeviceInfoField] = device_info;
404     SendJson(server_connection_.get(), register_obj);
405     // Do this last as OnRegistered() is user code and may take some time to
406     // complete (although it shouldn't...)
407     auto observer = operator_observer_.lock();
408     if (observer) {
409       observer->OnRegistered();
410     }
411   });
412 }
413 
OnClose()414 void Streamer::Impl::OnClose() {
415   // Called from websocket thread
416   // The operator shouldn't close the connection with the client, it's up to the
417   // device to decide when to disconnect.
418   LOG(WARNING) << "Websocket closed unexpectedly";
419   signal_thread_->PostTask(RTC_FROM_HERE, [this]() {
420     auto observer = operator_observer_.lock();
421     if (observer) {
422       observer->OnClose();
423     }
424   });
425 }
426 
OnError(const std::string & error)427 void Streamer::Impl::OnError(const std::string& error) {
428   // Called from websocket thread.
429   LOG(ERROR) << "Error on connection with the operator: " << error;
430   signal_thread_->PostTask(RTC_FROM_HERE, [this]() {
431     auto observer = operator_observer_.lock();
432     if (observer) {
433       observer->OnError();
434     }
435   });
436 }
437 
HandleConfigMessage(const Json::Value & server_message)438 void Streamer::Impl::HandleConfigMessage(const Json::Value& server_message) {
439   CHECK(signal_thread_->IsCurrent())
440       << __FUNCTION__ << " called from the wrong thread";
441   if (server_message.isMember("ice_servers") &&
442       server_message["ice_servers"].isArray()) {
443     auto servers = server_message["ice_servers"];
444     operator_config_.servers.clear();
445     for (int server_idx = 0; server_idx < servers.size(); server_idx++) {
446       auto server = servers[server_idx];
447       webrtc::PeerConnectionInterface::IceServer ice_server;
448       if (!server.isMember("urls") || !server["urls"].isArray()) {
449         // The urls field is required
450         LOG(WARNING)
451             << "Invalid ICE server specification obtained from server: "
452             << server.toStyledString();
453         continue;
454       }
455       auto urls = server["urls"];
456       for (int url_idx = 0; url_idx < urls.size(); url_idx++) {
457         auto url = urls[url_idx];
458         if (!url.isString()) {
459           LOG(WARNING) << "Non string 'urls' field in ice server: "
460                        << url.toStyledString();
461           continue;
462         }
463         ice_server.urls.push_back(url.asString());
464         if (server.isMember("credential") && server["credential"].isString()) {
465           ice_server.password = server["credential"].asString();
466         }
467         if (server.isMember("username") && server["username"].isString()) {
468           ice_server.username = server["username"].asString();
469         }
470         operator_config_.servers.push_back(ice_server);
471       }
472     }
473   }
474 }
475 
HandleClientMessage(const Json::Value & server_message)476 void Streamer::Impl::HandleClientMessage(const Json::Value& server_message) {
477   CHECK(signal_thread_->IsCurrent())
478       << __FUNCTION__ << " called from the wrong thread";
479   if (!server_message.isMember(cuttlefish::webrtc_signaling::kClientIdField) ||
480       !server_message[cuttlefish::webrtc_signaling::kClientIdField].isInt()) {
481     LOG(ERROR) << "Client message received without valid client id";
482     return;
483   }
484   auto client_id =
485       server_message[cuttlefish::webrtc_signaling::kClientIdField].asInt();
486   if (!server_message.isMember(cuttlefish::webrtc_signaling::kPayloadField)) {
487     LOG(WARNING) << "Received empty client message";
488     return;
489   }
490   auto client_message =
491       server_message[cuttlefish::webrtc_signaling::kPayloadField];
492   if (clients_.count(client_id) == 0) {
493     auto client_handler = CreateClientHandler(client_id);
494     if (!client_handler) {
495       LOG(ERROR) << "Failed to create a new client handler";
496       return;
497     }
498     clients_.emplace(client_id, client_handler);
499   }
500   auto client_handler = clients_[client_id];
501 
502   client_handler->HandleMessage(client_message);
503 }
504 
OnReceive(const uint8_t * msg,size_t length,bool is_binary)505 void Streamer::Impl::OnReceive(const uint8_t* msg, size_t length,
506                                bool is_binary) {
507   // Usually called from websocket thread.
508   Json::Value server_message;
509   // Once OnReceive returns the buffer can be destroyed/recycled at any time, so
510   // parse the data into a JSON object while still on the websocket thread.
511   if (is_binary || !ParseMessage(msg, length, &server_message)) {
512     LOG(ERROR) << "Received invalid JSON from server: '"
513                << (is_binary ? std::string("(binary_data)")
514                              : std::string(msg, msg + length))
515                << "'";
516     return;
517   }
518   // Transition to the signal thread before member variables are accessed.
519   signal_thread_->PostTask(RTC_FROM_HERE, [this, server_message]() {
520     if (!server_message.isMember(cuttlefish::webrtc_signaling::kTypeField) ||
521         !server_message[cuttlefish::webrtc_signaling::kTypeField].isString()) {
522       LOG(ERROR) << "No message_type field from server";
523       // Notify the caller
524       OnError(
525           "Invalid message received from operator: no message type field "
526           "present");
527       return;
528     }
529     auto type =
530         server_message[cuttlefish::webrtc_signaling::kTypeField].asString();
531     if (type == cuttlefish::webrtc_signaling::kConfigType) {
532       HandleConfigMessage(server_message);
533     } else if (type == cuttlefish::webrtc_signaling::kClientDisconnectType) {
534       if (!server_message.isMember(
535               cuttlefish::webrtc_signaling::kClientIdField) ||
536           !server_message.isMember(
537               cuttlefish::webrtc_signaling::kClientIdField)) {
538         LOG(ERROR) << "Invalid disconnect message received from server";
539         // Notify the caller
540         OnError("Invalid disconnect message: client_id is required");
541         return;
542       }
543       auto client_id =
544           server_message[cuttlefish::webrtc_signaling::kClientIdField].asInt();
545       LOG(INFO) << "Client " << client_id << " has disconnected.";
546       DestroyClientHandler(client_id);
547     } else if (type == cuttlefish::webrtc_signaling::kClientMessageType) {
548       HandleClientMessage(server_message);
549     } else {
550       LOG(ERROR) << "Unknown message type: " << type;
551       // Notify the caller
552       OnError("Invalid message received from operator: unknown message type");
553       return;
554     }
555   });
556 }
557 
CreateClientHandler(int client_id)558 std::shared_ptr<ClientHandler> Streamer::Impl::CreateClientHandler(
559     int client_id) {
560   CHECK(signal_thread_->IsCurrent())
561       << __FUNCTION__ << " called from the wrong thread";
562   auto observer = connection_observer_factory_->CreateObserver();
563 
564   auto client_handler = ClientHandler::Create(
565       client_id, observer,
566       [this, client_id](const Json::Value& msg) {
567         SendMessageToClient(client_id, msg);
568       },
569       [this, client_id] { DestroyClientHandler(client_id); });
570 
571   webrtc::PeerConnectionInterface::RTCConfiguration config;
572   config.sdp_semantics = webrtc::SdpSemantics::kUnifiedPlan;
573   config.enable_dtls_srtp = true;
574   config.servers.insert(config.servers.end(), operator_config_.servers.begin(),
575                         operator_config_.servers.end());
576   webrtc::PeerConnectionDependencies dependencies(client_handler.get());
577   // PortRangeSocketFactory's super class' constructor needs to be called on the
578   // network thread or have it as a parameter
579   dependencies.packet_socket_factory.reset(new PortRangeSocketFactory(
580       network_thread_.get(), config_.udp_port_range, config_.tcp_port_range));
581   auto peer_connection = peer_connection_factory_->CreatePeerConnection(
582       config, std::move(dependencies));
583 
584   if (!peer_connection) {
585     LOG(ERROR) << "Failed to create peer connection";
586     return nullptr;
587   }
588 
589   if (!client_handler->SetPeerConnection(std::move(peer_connection))) {
590     return nullptr;
591   }
592 
593   for (auto& entry : displays_) {
594     auto& label = entry.first;
595     auto& video_source = entry.second.source;
596 
597     auto video_track =
598         peer_connection_factory_->CreateVideoTrack(label, video_source.get());
599     client_handler->AddDisplay(video_track, label);
600   }
601 
602   for (auto& entry : audio_sources_) {
603     auto& label = entry.first;
604     auto& audio_stream = entry.second;
605     auto audio_track =
606         peer_connection_factory_->CreateAudioTrack(label, audio_stream.get());
607     client_handler->AddAudio(audio_track, label);
608   }
609 
610   return client_handler;
611 }
612 
SendMessageToClient(int client_id,const Json::Value & msg)613 void Streamer::Impl::SendMessageToClient(int client_id,
614                                          const Json::Value& msg) {
615   LOG(VERBOSE) << "Sending to client: " << msg.toStyledString();
616   CHECK(signal_thread_->IsCurrent())
617       << __FUNCTION__ << " called from the wrong thread";
618   Json::Value wrapper;
619   wrapper[cuttlefish::webrtc_signaling::kPayloadField] = msg;
620   wrapper[cuttlefish::webrtc_signaling::kTypeField] =
621       cuttlefish::webrtc_signaling::kForwardType;
622   wrapper[cuttlefish::webrtc_signaling::kClientIdField] = client_id;
623   // This is safe to call from the webrtc threads because
624   // WsConnection is thread safe
625   SendJson(server_connection_.get(), wrapper);
626 }
627 
DestroyClientHandler(int client_id)628 void Streamer::Impl::DestroyClientHandler(int client_id) {
629   // Usually called from signal thread, could be called from websocket thread or
630   // an application thread.
631   signal_thread_->PostTask(RTC_FROM_HERE, [this, client_id]() {
632     // This needs to be 'posted' to the thread instead of 'invoked'
633     // immediately for two reasons:
634     // * The client handler is destroyed by this code, it's generally a
635     // bad idea (though not necessarily wrong) to return to a member
636     // function of a destroyed object.
637     // * The client handler may call this from within a peer connection
638     // observer callback, destroying the client handler there leads to a
639     // deadlock.
640     clients_.erase(client_id);
641   });
642 }
643 
644 }  // namespace webrtc_streaming
645 }  // namespace cuttlefish
646