• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2018 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "osp/impl/quic/quic_client.h"
6 
7 #include <algorithm>
8 #include <functional>
9 #include <memory>
10 
11 #include "platform/api/task_runner.h"
12 #include "platform/api/time.h"
13 #include "util/osp_logging.h"
14 
15 namespace openscreen {
16 namespace osp {
17 
QuicClient(MessageDemuxer * demuxer,std::unique_ptr<QuicConnectionFactory> connection_factory,ProtocolConnectionServiceObserver * observer,ClockNowFunctionPtr now_function,TaskRunner * task_runner)18 QuicClient::QuicClient(
19     MessageDemuxer* demuxer,
20     std::unique_ptr<QuicConnectionFactory> connection_factory,
21     ProtocolConnectionServiceObserver* observer,
22     ClockNowFunctionPtr now_function,
23     TaskRunner* task_runner)
24     : ProtocolConnectionClient(demuxer, observer),
25       connection_factory_(std::move(connection_factory)),
26       cleanup_alarm_(now_function, task_runner) {}
27 
~QuicClient()28 QuicClient::~QuicClient() {
29   CloseAllConnections();
30 }
31 
Start()32 bool QuicClient::Start() {
33   if (state_ == State::kRunning)
34     return false;
35   state_ = State::kRunning;
36   Cleanup();  // Start periodic clean-ups.
37   observer_->OnRunning();
38   return true;
39 }
40 
Stop()41 bool QuicClient::Stop() {
42   if (state_ == State::kStopped)
43     return false;
44   CloseAllConnections();
45   state_ = State::kStopped;
46   Cleanup();  // Final clean-up.
47   observer_->OnStopped();
48   return true;
49 }
50 
Cleanup()51 void QuicClient::Cleanup() {
52   for (auto& entry : connections_) {
53     entry.second.delegate->DestroyClosedStreams();
54     if (!entry.second.delegate->has_streams())
55       entry.second.connection->Close();
56   }
57 
58   for (uint64_t endpoint_id : delete_connections_) {
59     auto it = connections_.find(endpoint_id);
60     if (it != connections_.end()) {
61       connections_.erase(it);
62     }
63   }
64   delete_connections_.clear();
65 
66   constexpr Clock::duration kQuicCleanupPeriod = std::chrono::milliseconds(500);
67   if (state_ != State::kStopped) {
68     cleanup_alarm_.ScheduleFromNow([this] { Cleanup(); }, kQuicCleanupPeriod);
69   }
70 }
71 
Connect(const IPEndpoint & endpoint,ConnectionRequestCallback * request)72 QuicClient::ConnectRequest QuicClient::Connect(
73     const IPEndpoint& endpoint,
74     ConnectionRequestCallback* request) {
75   if (state_ != State::kRunning)
76     return ConnectRequest(this, 0);
77   auto endpoint_entry = endpoint_map_.find(endpoint);
78   if (endpoint_entry != endpoint_map_.end()) {
79     auto immediate_result = CreateProtocolConnection(endpoint_entry->second);
80     OSP_DCHECK(immediate_result);
81     request->OnConnectionOpened(0, std::move(immediate_result));
82     return ConnectRequest(this, 0);
83   }
84 
85   return CreatePendingConnection(endpoint, request);
86 }
87 
CreateProtocolConnection(uint64_t endpoint_id)88 std::unique_ptr<ProtocolConnection> QuicClient::CreateProtocolConnection(
89     uint64_t endpoint_id) {
90   if (state_ != State::kRunning)
91     return nullptr;
92   auto connection_entry = connections_.find(endpoint_id);
93   if (connection_entry == connections_.end())
94     return nullptr;
95   return QuicProtocolConnection::FromExisting(
96       this, connection_entry->second.connection.get(),
97       connection_entry->second.delegate.get(), endpoint_id);
98 }
99 
OnConnectionDestroyed(QuicProtocolConnection * connection)100 void QuicClient::OnConnectionDestroyed(QuicProtocolConnection* connection) {
101   if (!connection->stream())
102     return;
103 
104   auto connection_entry = connections_.find(connection->endpoint_id());
105   if (connection_entry == connections_.end())
106     return;
107 
108   connection_entry->second.delegate->DropProtocolConnection(connection);
109 }
110 
OnCryptoHandshakeComplete(ServiceConnectionDelegate * delegate,uint64_t connection_id)111 uint64_t QuicClient::OnCryptoHandshakeComplete(
112     ServiceConnectionDelegate* delegate,
113     uint64_t connection_id) {
114   const IPEndpoint& endpoint = delegate->endpoint();
115   auto pending_entry = pending_connections_.find(endpoint);
116   if (pending_entry == pending_connections_.end())
117     return 0;
118 
119   ServiceConnectionData connection_data = std::move(pending_entry->second.data);
120   auto* connection = connection_data.connection.get();
121   uint64_t endpoint_id = next_endpoint_id_++;
122   endpoint_map_[endpoint] = endpoint_id;
123   connections_.emplace(endpoint_id, std::move(connection_data));
124 
125   for (auto& request : pending_entry->second.callbacks) {
126     request_map_.erase(request.first);
127     std::unique_ptr<QuicProtocolConnection> pc =
128         QuicProtocolConnection::FromExisting(this, connection, delegate,
129                                              endpoint_id);
130     request_map_.erase(request.first);
131     request.second->OnConnectionOpened(request.first, std::move(pc));
132   }
133   pending_connections_.erase(pending_entry);
134   return endpoint_id;
135 }
136 
OnIncomingStream(std::unique_ptr<QuicProtocolConnection> connection)137 void QuicClient::OnIncomingStream(
138     std::unique_ptr<QuicProtocolConnection> connection) {
139   // TODO(jophba): Change to just use OnIncomingConnection when the observer
140   // is properly set up.
141   connection->CloseWriteEnd();
142   connection.reset();
143 }
144 
OnConnectionClosed(uint64_t endpoint_id,uint64_t connection_id)145 void QuicClient::OnConnectionClosed(uint64_t endpoint_id,
146                                     uint64_t connection_id) {
147   // TODO(btolsch): Is this how handshake failure is communicated to the
148   // delegate?
149   auto connection_entry = connections_.find(endpoint_id);
150   if (connection_entry == connections_.end())
151     return;
152   delete_connections_.push_back(endpoint_id);
153 
154   // TODO(crbug.com/openscreen/42): If we reset request IDs when a connection is
155   // closed, we might end up re-using request IDs when a new connection is
156   // created to the same endpoint.
157   endpoint_request_ids_.ResetRequestId(endpoint_id);
158 }
159 
OnDataReceived(uint64_t endpoint_id,uint64_t connection_id,const uint8_t * data,size_t data_size)160 void QuicClient::OnDataReceived(uint64_t endpoint_id,
161                                 uint64_t connection_id,
162                                 const uint8_t* data,
163                                 size_t data_size) {
164   demuxer_->OnStreamData(endpoint_id, connection_id, data, data_size);
165 }
166 
PendingConnectionData(ServiceConnectionData && data)167 QuicClient::PendingConnectionData::PendingConnectionData(
168     ServiceConnectionData&& data)
169     : data(std::move(data)) {}
170 QuicClient::PendingConnectionData::PendingConnectionData(
171     PendingConnectionData&&) noexcept = default;
172 QuicClient::PendingConnectionData::~PendingConnectionData() = default;
173 QuicClient::PendingConnectionData& QuicClient::PendingConnectionData::operator=(
174     PendingConnectionData&&) noexcept = default;
175 
CreatePendingConnection(const IPEndpoint & endpoint,ConnectionRequestCallback * request)176 QuicClient::ConnectRequest QuicClient::CreatePendingConnection(
177     const IPEndpoint& endpoint,
178     ConnectionRequestCallback* request) {
179   auto pending_entry = pending_connections_.find(endpoint);
180   if (pending_entry == pending_connections_.end()) {
181     uint64_t request_id = StartConnectionRequest(endpoint, request);
182     return ConnectRequest(this, request_id);
183   } else {
184     uint64_t request_id = next_request_id_++;
185     pending_entry->second.callbacks.emplace_back(request_id, request);
186     return ConnectRequest(this, request_id);
187   }
188 }
189 
StartConnectionRequest(const IPEndpoint & endpoint,ConnectionRequestCallback * request)190 uint64_t QuicClient::StartConnectionRequest(
191     const IPEndpoint& endpoint,
192     ConnectionRequestCallback* request) {
193   auto delegate = std::make_unique<ServiceConnectionDelegate>(this, endpoint);
194   std::unique_ptr<QuicConnection> connection =
195       connection_factory_->Connect(endpoint, delegate.get());
196   if (!connection) {
197     // TODO(btolsch): Need interface/handling for Connect() failures. Or, should
198     // request->OnConnectionFailed() be called?
199     OSP_DCHECK(false)
200         << __func__
201         << ": Factory connect failed, but requestor will never know.";
202     return 0;
203   }
204   auto pending_result = pending_connections_.emplace(
205       endpoint, PendingConnectionData(ServiceConnectionData(
206                     std::move(connection), std::move(delegate))));
207   uint64_t request_id = next_request_id_++;
208   pending_result.first->second.callbacks.emplace_back(request_id, request);
209   return request_id;
210 }
211 
CloseAllConnections()212 void QuicClient::CloseAllConnections() {
213   for (auto& conn : pending_connections_)
214     conn.second.data.connection->Close();
215 
216   pending_connections_.clear();
217   for (auto& conn : connections_)
218     conn.second.connection->Close();
219 
220   connections_.clear();
221   endpoint_map_.clear();
222   next_endpoint_id_ = 0;
223   endpoint_request_ids_.Reset();
224   for (auto& request : request_map_) {
225     request.second.second->OnConnectionFailed(request.first);
226   }
227   request_map_.clear();
228 }
229 
CancelConnectRequest(uint64_t request_id)230 void QuicClient::CancelConnectRequest(uint64_t request_id) {
231   auto request_entry = request_map_.find(request_id);
232   if (request_entry == request_map_.end())
233     return;
234 
235   auto pending_entry = pending_connections_.find(request_entry->second.first);
236   if (pending_entry != pending_connections_.end()) {
237     auto& callbacks = pending_entry->second.callbacks;
238     callbacks.erase(
239         std::remove_if(
240             callbacks.begin(), callbacks.end(),
241             [request_id](const std::pair<uint64_t, ConnectionRequestCallback*>&
242                              callback) {
243               return request_id == callback.first;
244             }),
245         callbacks.end());
246     if (callbacks.empty())
247       pending_connections_.erase(pending_entry);
248   }
249   request_map_.erase(request_entry);
250 }
251 
252 }  // namespace osp
253 }  // namespace openscreen
254