• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright (C) 2020 The Android Open Source Project
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //      http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 //
15 
16 #include "host/frontend/webrtc/lib/server_connection.h"
17 
18 #include <android-base/logging.h>
19 #include <libwebsockets.h>
20 
21 #include "common/libs/fs/shared_fd.h"
22 #include "common/libs/fs/shared_select.h"
23 #include "common/libs/utils/files.h"
24 
25 namespace cuttlefish {
26 namespace webrtc_streaming {
27 
28 // ServerConnection over Unix socket
29 class UnixServerConnection : public ServerConnection {
30  public:
31   UnixServerConnection(const std::string& addr,
32                        std::weak_ptr<ServerConnectionObserver> observer);
33   ~UnixServerConnection() override;
34 
35   bool Send(const Json::Value& msg) override;
36 
37  private:
38   void Connect() override;
39   void StopThread();
40   void ReadLoop();
41 
42   const std::string addr_;
43   SharedFD conn_;
44   std::mutex write_mtx_;
45   std::weak_ptr<ServerConnectionObserver> observer_;
46   // The event fd must be declared before the thread to ensure it's initialized
47   // before the thread starts and is safe to be accessed from it.
48   SharedFD thread_notifier_;
49   std::atomic_bool running_ = false;
50   std::thread thread_;
51 };
52 
53 // ServerConnection using websockets
54 class WsConnectionContext;
55 
56 class WsConnection : public std::enable_shared_from_this<WsConnection> {
57  public:
58   struct CreateConnectionSul {
59     lws_sorted_usec_list_t sul = {};
60     std::weak_ptr<WsConnection> weak_this;
61   };
62 
63   WsConnection(int port, const std::string& addr, const std::string& path,
64                ServerConfig::Security secure,
65                const std::vector<std::pair<std::string, std::string>>& headers,
66                std::weak_ptr<ServerConnectionObserver> observer,
67                std::shared_ptr<WsConnectionContext> context);
68 
69   ~WsConnection();
70 
71   void Connect();
72   bool Send(const Json::Value& msg);
73 
74   void ConnectInner();
75 
76   void OnError(const std::string& error);
77   void OnReceive(const uint8_t* data, size_t len, bool is_binary);
78   void OnOpen();
79   void OnClose();
80   void OnWriteable();
81 
82   void AddHttpHeaders(unsigned char** p, unsigned char* end) const;
83 
84  private:
85   struct WsBuffer {
86     WsBuffer() = default;
WsBuffercuttlefish::webrtc_streaming::WsConnection::WsBuffer87     WsBuffer(const uint8_t* data, size_t len, bool binary)
88         : buffer_(LWS_PRE + len), is_binary_(binary) {
89       memcpy(&buffer_[LWS_PRE], data, len);
90     }
91 
datacuttlefish::webrtc_streaming::WsConnection::WsBuffer92     uint8_t* data() { return &buffer_[LWS_PRE]; }
is_binarycuttlefish::webrtc_streaming::WsConnection::WsBuffer93     bool is_binary() const { return is_binary_; }
sizecuttlefish::webrtc_streaming::WsConnection::WsBuffer94     size_t size() const { return buffer_.size() - LWS_PRE; }
95 
96    private:
97     std::vector<uint8_t> buffer_;
98     bool is_binary_;
99   };
100   bool Send(const uint8_t* data, size_t len, bool binary = false);
101 
102   CreateConnectionSul extended_sul_;
103   struct lws* wsi_;
104   const int port_;
105   const std::string addr_;
106   const std::string path_;
107   const ServerConfig::Security security_;
108   const std::vector<std::pair<std::string, std::string>> headers_;
109 
110   std::weak_ptr<ServerConnectionObserver> observer_;
111 
112   // each element contains the data to be sent and whether it's binary or not
113   std::deque<WsBuffer> write_queue_;
114   std::mutex write_queue_mutex_;
115   // The connection object should not outlive the context object. This reference
116   // guarantees it.
117   std::shared_ptr<WsConnectionContext> context_;
118 };
119 
120 class WsConnectionContext
121     : public std::enable_shared_from_this<WsConnectionContext> {
122  public:
123   static std::shared_ptr<WsConnectionContext> Create();
124 
125   WsConnectionContext(struct lws_context* lws_ctx);
126   ~WsConnectionContext();
127 
128   std::unique_ptr<ServerConnection> CreateConnection(
129       int port, const std::string& addr, const std::string& path,
130       ServerConfig::Security secure,
131       std::weak_ptr<ServerConnectionObserver> observer,
132       const std::vector<std::pair<std::string, std::string>>& headers);
133 
134   void RememberConnection(void*, std::weak_ptr<WsConnection>);
135   void ForgetConnection(void*);
136   std::shared_ptr<WsConnection> GetConnection(void*);
137 
lws_context()138   struct lws_context* lws_context() {
139     return lws_context_;
140   }
141 
142  private:
143   void Start();
144 
145   std::map<void*, std::weak_ptr<WsConnection>> weak_by_ptr_;
146   std::mutex map_mutex_;
147   struct lws_context* lws_context_;
148   std::thread message_loop_;
149 };
150 
Connect(const ServerConfig & conf,std::weak_ptr<ServerConnectionObserver> observer)151 std::unique_ptr<ServerConnection> ServerConnection::Connect(
152     const ServerConfig& conf,
153     std::weak_ptr<ServerConnectionObserver> observer) {
154   std::unique_ptr<ServerConnection> ret;
155   // If the provided address points to an existing UNIX socket in the file
156   // system connect to it, otherwise assume it's a network address and connect
157   // using websockets
158   if (FileIsSocket(conf.addr)) {
159     ret.reset(new UnixServerConnection(conf.addr, observer));
160   } else {
161     // This can be a local variable since the ws connection will keep a
162     // reference to it.
163     auto ws_context = WsConnectionContext::Create();
164     CHECK(ws_context) << "Failed to create websocket context";
165     ret = ws_context->CreateConnection(conf.port, conf.addr, conf.path,
166                                        conf.security, observer,
167                                        conf.http_headers);
168   }
169   ret->Connect();
170   return ret;
171 }
172 
Reconnect()173 void ServerConnection::Reconnect() { Connect(); }
174 
175 // UnixServerConnection implementation
176 
UnixServerConnection(const std::string & addr,std::weak_ptr<ServerConnectionObserver> observer)177 UnixServerConnection::UnixServerConnection(
178     const std::string& addr, std::weak_ptr<ServerConnectionObserver> observer)
179     : addr_(addr), observer_(observer) {}
180 
~UnixServerConnection()181 UnixServerConnection::~UnixServerConnection() {
182   StopThread();
183 }
184 
Send(const Json::Value & msg)185 bool UnixServerConnection::Send(const Json::Value& msg) {
186   Json::StreamWriterBuilder factory;
187   auto str = Json::writeString(factory, msg);
188   std::lock_guard<std::mutex> lock(write_mtx_);
189   auto res =
190       conn_->Send(reinterpret_cast<const uint8_t*>(str.c_str()), str.size(), 0);
191   if (res < 0) {
192     LOG(ERROR) << "Failed to send data to signaling server: "
193                << conn_->StrError();
194     // Don't call OnError() here, the receiving thread probably did it already
195     // or is about to do it.
196   }
197   // A SOCK_SEQPACKET unix socket will send the entire message or fail, but it
198   // won't send a partial message.
199   return res == str.size();
200 }
201 
Connect()202 void UnixServerConnection::Connect() {
203   // The thread could be running if this is a Reconnect
204   StopThread();
205 
206   conn_ = SharedFD::SocketLocalClient(addr_, false, SOCK_SEQPACKET);
207   if (!conn_->IsOpen()) {
208     LOG(ERROR) << "Failed to connect to unix socket: " << conn_->StrError();
209     if (auto o = observer_.lock(); o) {
210       o->OnError("Failed to connect to unix socket");
211     }
212     return;
213   }
214   thread_notifier_ = SharedFD::Event();
215   if (!thread_notifier_->IsOpen()) {
216     LOG(ERROR) << "Failed to create eventfd for background thread: "
217                << thread_notifier_->StrError();
218     if (auto o = observer_.lock(); o) {
219       o->OnError("Failed to create eventfd for background thread");
220     }
221     return;
222   }
223   if (auto o = observer_.lock(); o) {
224     o->OnOpen();
225   }
226   // Start the thread
227   running_ = true;
228   thread_ = std::thread([this](){ReadLoop();});
229 }
230 
StopThread()231 void UnixServerConnection::StopThread() {
232   running_ = false;
233   if (!thread_notifier_->IsOpen()) {
234     // The thread won't be running if this isn't open
235     return;
236   }
237   if (thread_notifier_->EventfdWrite(1) < 0) {
238     LOG(ERROR) << "Failed to notify background thread, this thread may block";
239   }
240   if (thread_.joinable()) {
241     thread_.join();
242   }
243 }
244 
ReadLoop()245 void UnixServerConnection::ReadLoop() {
246   if (!thread_notifier_->IsOpen()) {
247     LOG(ERROR) << "The UnixServerConnection's background thread is unable to "
248                   "receive notifications so it can't run";
249     return;
250   }
251   std::vector<uint8_t> buffer(4096, 0);
252   while (running_) {
253     SharedFDSet rset;
254     rset.Set(thread_notifier_);
255     rset.Set(conn_);
256     auto res = Select(&rset, nullptr, nullptr, nullptr);
257     if (res < 0) {
258       LOG(ERROR) << "Failed to select from background thread";
259       break;
260     }
261     if (rset.IsSet(thread_notifier_)) {
262       eventfd_t val;
263       auto res = thread_notifier_->EventfdRead(&val);
264       if (res < 0) {
265         LOG(ERROR) << "Error reading from event fd: "
266                    << thread_notifier_->StrError();
267         break;
268       }
269     }
270     if (rset.IsSet(conn_)) {
271       auto size = conn_->Recv(buffer.data(), 0, MSG_TRUNC | MSG_PEEK);
272       if (size > buffer.size()) {
273         // Enlarge enough to accommodate size bytes and be a multiple of 4096
274         auto new_size = (size + 4095) & ~4095;
275         buffer.resize(new_size);
276       }
277       auto res = conn_->Recv(buffer.data(), buffer.size(), MSG_TRUNC);
278       if (res < 0) {
279         LOG(ERROR) << "Failed to read from server: " << conn_->StrError();
280         if (auto observer = observer_.lock(); observer) {
281           observer->OnError(conn_->StrError());
282         }
283         return;
284       }
285       if (res == 0) {
286         auto observer = observer_.lock();
287         if (observer) {
288           observer->OnClose();
289         }
290         break;
291       }
292       auto observer = observer_.lock();
293       if (observer) {
294         observer->OnReceive(buffer.data(), res, false);
295       }
296     }
297   }
298 }
299 
300 // WsConnection implementation
301 
302 int LwsCallback(struct lws* wsi, enum lws_callback_reasons reason, void* user,
303                 void* in, size_t len);
304 void CreateConnectionCallback(lws_sorted_usec_list_t* sul);
305 
306 namespace {
307 
308 constexpr char kProtocolName[] = "cf-webrtc-device";
309 constexpr int kBufferSize = 65536;
310 
311 const uint32_t backoff_ms[] = {1000, 2000, 3000, 4000, 5000};
312 
313 const lws_retry_bo_t kRetry = {
314     .retry_ms_table = backoff_ms,
315     .retry_ms_table_count = LWS_ARRAY_SIZE(backoff_ms),
316     .conceal_count = LWS_ARRAY_SIZE(backoff_ms),
317 
318     .secs_since_valid_ping = 3,    /* force PINGs after secs idle */
319     .secs_since_valid_hangup = 10, /* hangup after secs idle */
320 
321     .jitter_percent = 20,
322 };
323 
324 const struct lws_protocols kProtocols[2] = {
325     {kProtocolName, LwsCallback, 0, kBufferSize, 0, NULL, 0},
326     {NULL, NULL, 0, 0, 0, NULL, 0}};
327 
328 }  // namespace
329 
Create()330 std::shared_ptr<WsConnectionContext> WsConnectionContext::Create() {
331   struct lws_context_creation_info context_info = {};
332   context_info.port = CONTEXT_PORT_NO_LISTEN;
333   context_info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
334   context_info.protocols = kProtocols;
335   struct lws_context* lws_ctx = lws_create_context(&context_info);
336   if (!lws_ctx) {
337     return nullptr;
338   }
339   return std::shared_ptr<WsConnectionContext>(new WsConnectionContext(lws_ctx));
340 }
341 
WsConnectionContext(struct lws_context * lws_ctx)342 WsConnectionContext::WsConnectionContext(struct lws_context* lws_ctx)
343     : lws_context_(lws_ctx) {
344   Start();
345 }
346 
~WsConnectionContext()347 WsConnectionContext::~WsConnectionContext() {
348   lws_context_destroy(lws_context_);
349   if (message_loop_.joinable()) {
350     message_loop_.join();
351   }
352 }
353 
Start()354 void WsConnectionContext::Start() {
355   message_loop_ = std::thread([this]() {
356     for (;;) {
357       if (lws_service(lws_context_, 0) < 0) {
358         break;
359       }
360     }
361   });
362 }
363 
364 // This wrapper is needed because the ServerConnection objects are meant to be
365 // referenced by std::unique_ptr but WsConnection needs to be referenced by
366 // std::shared_ptr because it's also (weakly) referenced by the websocket
367 // thread.
368 class WsConnectionWrapper : public ServerConnection {
369  public:
WsConnectionWrapper(std::shared_ptr<WsConnection> conn)370   WsConnectionWrapper(std::shared_ptr<WsConnection> conn) : conn_(conn) {}
371 
Send(const Json::Value & msg)372   bool Send(const Json::Value& msg) override { return conn_->Send(msg); }
373 
374  private:
Connect()375   void Connect() override { return conn_->Connect(); }
376   std::shared_ptr<WsConnection> conn_;
377 };
378 
CreateConnection(int port,const std::string & addr,const std::string & path,ServerConfig::Security security,std::weak_ptr<ServerConnectionObserver> observer,const std::vector<std::pair<std::string,std::string>> & headers)379 std::unique_ptr<ServerConnection> WsConnectionContext::CreateConnection(
380     int port, const std::string& addr, const std::string& path,
381     ServerConfig::Security security,
382     std::weak_ptr<ServerConnectionObserver> observer,
383     const std::vector<std::pair<std::string, std::string>>& headers) {
384   return std::unique_ptr<ServerConnection>(
385       new WsConnectionWrapper(std::make_shared<WsConnection>(
386           port, addr, path, security, headers, observer, shared_from_this())));
387 }
388 
GetConnection(void * raw)389 std::shared_ptr<WsConnection> WsConnectionContext::GetConnection(void* raw) {
390   std::shared_ptr<WsConnection> connection;
391   {
392     std::lock_guard<std::mutex> lock(map_mutex_);
393     if (weak_by_ptr_.count(raw) == 0) {
394       return nullptr;
395     }
396     connection = weak_by_ptr_[raw].lock();
397     if (!connection) {
398       weak_by_ptr_.erase(raw);
399     }
400   }
401   return connection;
402 }
403 
RememberConnection(void * raw,std::weak_ptr<WsConnection> conn)404 void WsConnectionContext::RememberConnection(void* raw,
405                                              std::weak_ptr<WsConnection> conn) {
406   std::lock_guard<std::mutex> lock(map_mutex_);
407   weak_by_ptr_.emplace(
408       std::pair<void*, std::weak_ptr<WsConnection>>(raw, conn));
409 }
410 
ForgetConnection(void * raw)411 void WsConnectionContext::ForgetConnection(void* raw) {
412   std::lock_guard<std::mutex> lock(map_mutex_);
413   weak_by_ptr_.erase(raw);
414 }
415 
WsConnection(int port,const std::string & addr,const std::string & path,ServerConfig::Security security,const std::vector<std::pair<std::string,std::string>> & headers,std::weak_ptr<ServerConnectionObserver> observer,std::shared_ptr<WsConnectionContext> context)416 WsConnection::WsConnection(
417     int port, const std::string& addr, const std::string& path,
418     ServerConfig::Security security,
419     const std::vector<std::pair<std::string, std::string>>& headers,
420     std::weak_ptr<ServerConnectionObserver> observer,
421     std::shared_ptr<WsConnectionContext> context)
422     : port_(port),
423       addr_(addr),
424       path_(path),
425       security_(security),
426       headers_(headers),
427       observer_(observer),
428       context_(context) {}
429 
~WsConnection()430 WsConnection::~WsConnection() {
431   context_->ForgetConnection(this);
432   // This will cause the callback to be called which will drop the connection
433   // after seeing the context doesn't remember this object
434   lws_callback_on_writable(wsi_);
435 }
436 
Connect()437 void WsConnection::Connect() {
438   memset(&extended_sul_.sul, 0, sizeof(extended_sul_.sul));
439   extended_sul_.weak_this = weak_from_this();
440   lws_sul_schedule(context_->lws_context(), 0, &extended_sul_.sul,
441                    CreateConnectionCallback, 1);
442 }
443 
AddHttpHeaders(unsigned char ** p,unsigned char * end) const444 void WsConnection::AddHttpHeaders(unsigned char** p, unsigned char* end) const {
445   for (const auto& header_entry : headers_) {
446     const auto& name = header_entry.first;
447     const auto& value = header_entry.second;
448     auto res = lws_add_http_header_by_name(
449         wsi_, reinterpret_cast<const unsigned char*>(name.c_str()),
450         reinterpret_cast<const unsigned char*>(value.c_str()), value.size(), p,
451         end);
452     if (res != 0) {
453       LOG(ERROR) << "Unable to add header: " << name;
454     }
455   }
456   if (!headers_.empty()) {
457     // Let LWS know we added some headers.
458     lws_client_http_body_pending(wsi_, 1);
459   }
460 }
461 
OnError(const std::string & error)462 void WsConnection::OnError(const std::string& error) {
463   auto observer = observer_.lock();
464   if (observer) {
465     observer->OnError(error);
466   }
467 }
OnReceive(const uint8_t * data,size_t len,bool is_binary)468 void WsConnection::OnReceive(const uint8_t* data, size_t len, bool is_binary) {
469   auto observer = observer_.lock();
470   if (observer) {
471     observer->OnReceive(data, len, is_binary);
472   }
473 }
OnOpen()474 void WsConnection::OnOpen() {
475   auto observer = observer_.lock();
476   if (observer) {
477     observer->OnOpen();
478   }
479 }
OnClose()480 void WsConnection::OnClose() {
481   auto observer = observer_.lock();
482   if (observer) {
483     observer->OnClose();
484   }
485 }
486 
OnWriteable()487 void WsConnection::OnWriteable() {
488   WsBuffer buffer;
489   {
490     std::lock_guard<std::mutex> lock(write_queue_mutex_);
491     if (write_queue_.size() == 0) {
492       return;
493     }
494     buffer = std::move(write_queue_.front());
495     write_queue_.pop_front();
496   }
497   auto flags = lws_write_ws_flags(
498       buffer.is_binary() ? LWS_WRITE_BINARY : LWS_WRITE_TEXT, true, true);
499   auto res = lws_write(wsi_, buffer.data(), buffer.size(),
500                        (enum lws_write_protocol)flags);
501   if (res != buffer.size()) {
502     LOG(WARNING) << "Unable to send the entire message!";
503   }
504 }
505 
Send(const Json::Value & msg)506 bool WsConnection::Send(const Json::Value& msg) {
507   Json::StreamWriterBuilder factory;
508   auto str = Json::writeString(factory, msg);
509   return Send(reinterpret_cast<const uint8_t*>(str.c_str()), str.size());
510 }
511 
Send(const uint8_t * data,size_t len,bool binary)512 bool WsConnection::Send(const uint8_t* data, size_t len, bool binary) {
513   if (!wsi_) {
514     LOG(WARNING) << "Send called on an uninitialized connection!!";
515     return false;
516   }
517   WsBuffer buffer(data, len, binary);
518   {
519     std::lock_guard<std::mutex> lock(write_queue_mutex_);
520     write_queue_.emplace_back(std::move(buffer));
521   }
522 
523   lws_callback_on_writable(wsi_);
524   return true;
525 }
526 
LwsCallback(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)527 int LwsCallback(struct lws* wsi, enum lws_callback_reasons reason, void* user,
528                 void* in, size_t len) {
529   constexpr int DROP = -1;
530   constexpr int OK = 0;
531 
532   // For some values of `reason`, `user` doesn't point to the value provided
533   // when the connection was created. This function object should be used with
534   // care.
535   auto with_connection =
536       [wsi, user](std::function<void(std::shared_ptr<WsConnection>)> cb) {
537         auto context = reinterpret_cast<WsConnectionContext*>(user);
538         auto connection = context->GetConnection(wsi);
539         if (!connection) {
540           return DROP;
541         }
542         cb(connection);
543         return OK;
544       };
545 
546   switch (reason) {
547     case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
548       return with_connection([in](std::shared_ptr<WsConnection> connection) {
549         connection->OnError(in ? (char*)in : "(null)");
550       });
551 
552     case LWS_CALLBACK_CLIENT_RECEIVE:
553       return with_connection(
554           [in, len, wsi](std::shared_ptr<WsConnection> connection) {
555             connection->OnReceive((const uint8_t*)in, len,
556                                   lws_frame_is_binary(wsi));
557           });
558 
559     case LWS_CALLBACK_CLIENT_ESTABLISHED:
560       return with_connection([](std::shared_ptr<WsConnection> connection) {
561         connection->OnOpen();
562       });
563 
564     case LWS_CALLBACK_CLIENT_CLOSED:
565       return with_connection([](std::shared_ptr<WsConnection> connection) {
566         connection->OnClose();
567       });
568 
569     case LWS_CALLBACK_CLIENT_WRITEABLE:
570       return with_connection([](std::shared_ptr<WsConnection> connection) {
571         connection->OnWriteable();
572       });
573 
574     case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER:
575       return with_connection(
576           [in, len](std::shared_ptr<WsConnection> connection) {
577             auto p = reinterpret_cast<unsigned char**>(in);
578             auto end = (*p) + len;
579             connection->AddHttpHeaders(p, end);
580           });
581 
582     case LWS_CALLBACK_CLIENT_HTTP_WRITEABLE:
583       // This callback is only called when we add additional HTTP headers, let
584       // LWS know we're done modifying the HTTP request.
585       lws_client_http_body_pending(wsi, 0);
586       return 0;
587 
588     default:
589       LOG(VERBOSE) << "Unhandled value: " << reason;
590       return lws_callback_http_dummy(wsi, reason, user, in, len);
591   }
592 }
593 
CreateConnectionCallback(lws_sorted_usec_list_t * sul)594 void CreateConnectionCallback(lws_sorted_usec_list_t* sul) {
595   std::shared_ptr<WsConnection> connection =
596       reinterpret_cast<WsConnection::CreateConnectionSul*>(sul)
597           ->weak_this.lock();
598   if (!connection) {
599     LOG(WARNING) << "The object was already destroyed by the time of the first "
600                  << "connection attempt. That's unusual.";
601     return;
602   }
603   connection->ConnectInner();
604 }
605 
ConnectInner()606 void WsConnection::ConnectInner() {
607   struct lws_client_connect_info connect_info;
608 
609   memset(&connect_info, 0, sizeof(connect_info));
610 
611   connect_info.context = context_->lws_context();
612   connect_info.port = port_;
613   connect_info.address = addr_.c_str();
614   connect_info.path = path_.c_str();
615   connect_info.host = connect_info.address;
616   connect_info.origin = connect_info.address;
617   switch (security_) {
618     case ServerConfig::Security::kAllowSelfSigned:
619       connect_info.ssl_connection = LCCSCF_ALLOW_SELFSIGNED |
620                                     LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK |
621                                     LCCSCF_USE_SSL;
622       break;
623     case ServerConfig::Security::kStrict:
624       connect_info.ssl_connection = LCCSCF_USE_SSL;
625       break;
626     case ServerConfig::Security::kInsecure:
627       connect_info.ssl_connection = 0;
628       break;
629   }
630   connect_info.protocol = "webrtc-operator";
631   connect_info.local_protocol_name = kProtocolName;
632   connect_info.pwsi = &wsi_;
633   connect_info.retry_and_idle_policy = &kRetry;
634   // There is no guarantee the connection object still exists when the callback
635   // is called. Put the context instead as the user data which is guaranteed to
636   // still exist and holds a weak ptr to the connection.
637   connect_info.userdata = context_.get();
638 
639   if (lws_client_connect_via_info(&connect_info)) {
640     // wsi_ is not initialized until after the call to
641     // lws_client_connect_via_info(). Luckily, this is guaranteed to run before
642     // the protocol callback is called because it runs in the same loop.
643     context_->RememberConnection(wsi_, weak_from_this());
644   } else {
645     LOG(ERROR) << "Connection failed!";
646   }
647 }
648 
649 }  // namespace webrtc_streaming
650 }  // namespace cuttlefish
651