1 // Copyright 2018 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "osp/impl/quic/quic_client.h"
6
7 #include <algorithm>
8 #include <functional>
9 #include <memory>
10
11 #include "platform/api/task_runner.h"
12 #include "platform/api/time.h"
13 #include "util/osp_logging.h"
14
15 namespace openscreen {
16 namespace osp {
17
QuicClient(MessageDemuxer * demuxer,std::unique_ptr<QuicConnectionFactory> connection_factory,ProtocolConnectionServiceObserver * observer,ClockNowFunctionPtr now_function,TaskRunner * task_runner)18 QuicClient::QuicClient(
19 MessageDemuxer* demuxer,
20 std::unique_ptr<QuicConnectionFactory> connection_factory,
21 ProtocolConnectionServiceObserver* observer,
22 ClockNowFunctionPtr now_function,
23 TaskRunner* task_runner)
24 : ProtocolConnectionClient(demuxer, observer),
25 connection_factory_(std::move(connection_factory)),
26 cleanup_alarm_(now_function, task_runner) {}
27
~QuicClient()28 QuicClient::~QuicClient() {
29 CloseAllConnections();
30 }
31
Start()32 bool QuicClient::Start() {
33 if (state_ == State::kRunning)
34 return false;
35 state_ = State::kRunning;
36 Cleanup(); // Start periodic clean-ups.
37 observer_->OnRunning();
38 return true;
39 }
40
Stop()41 bool QuicClient::Stop() {
42 if (state_ == State::kStopped)
43 return false;
44 CloseAllConnections();
45 state_ = State::kStopped;
46 Cleanup(); // Final clean-up.
47 observer_->OnStopped();
48 return true;
49 }
50
Cleanup()51 void QuicClient::Cleanup() {
52 for (auto& entry : connections_) {
53 entry.second.delegate->DestroyClosedStreams();
54 if (!entry.second.delegate->has_streams())
55 entry.second.connection->Close();
56 }
57
58 for (uint64_t endpoint_id : delete_connections_) {
59 auto it = connections_.find(endpoint_id);
60 if (it != connections_.end()) {
61 connections_.erase(it);
62 }
63 }
64 delete_connections_.clear();
65
66 constexpr Clock::duration kQuicCleanupPeriod = std::chrono::milliseconds(500);
67 if (state_ != State::kStopped) {
68 cleanup_alarm_.ScheduleFromNow([this] { Cleanup(); }, kQuicCleanupPeriod);
69 }
70 }
71
Connect(const IPEndpoint & endpoint,ConnectionRequestCallback * request)72 QuicClient::ConnectRequest QuicClient::Connect(
73 const IPEndpoint& endpoint,
74 ConnectionRequestCallback* request) {
75 if (state_ != State::kRunning)
76 return ConnectRequest(this, 0);
77 auto endpoint_entry = endpoint_map_.find(endpoint);
78 if (endpoint_entry != endpoint_map_.end()) {
79 auto immediate_result = CreateProtocolConnection(endpoint_entry->second);
80 OSP_DCHECK(immediate_result);
81 request->OnConnectionOpened(0, std::move(immediate_result));
82 return ConnectRequest(this, 0);
83 }
84
85 return CreatePendingConnection(endpoint, request);
86 }
87
CreateProtocolConnection(uint64_t endpoint_id)88 std::unique_ptr<ProtocolConnection> QuicClient::CreateProtocolConnection(
89 uint64_t endpoint_id) {
90 if (state_ != State::kRunning)
91 return nullptr;
92 auto connection_entry = connections_.find(endpoint_id);
93 if (connection_entry == connections_.end())
94 return nullptr;
95 return QuicProtocolConnection::FromExisting(
96 this, connection_entry->second.connection.get(),
97 connection_entry->second.delegate.get(), endpoint_id);
98 }
99
OnConnectionDestroyed(QuicProtocolConnection * connection)100 void QuicClient::OnConnectionDestroyed(QuicProtocolConnection* connection) {
101 if (!connection->stream())
102 return;
103
104 auto connection_entry = connections_.find(connection->endpoint_id());
105 if (connection_entry == connections_.end())
106 return;
107
108 connection_entry->second.delegate->DropProtocolConnection(connection);
109 }
110
OnCryptoHandshakeComplete(ServiceConnectionDelegate * delegate,uint64_t connection_id)111 uint64_t QuicClient::OnCryptoHandshakeComplete(
112 ServiceConnectionDelegate* delegate,
113 uint64_t connection_id) {
114 const IPEndpoint& endpoint = delegate->endpoint();
115 auto pending_entry = pending_connections_.find(endpoint);
116 if (pending_entry == pending_connections_.end())
117 return 0;
118
119 ServiceConnectionData connection_data = std::move(pending_entry->second.data);
120 auto* connection = connection_data.connection.get();
121 uint64_t endpoint_id = next_endpoint_id_++;
122 endpoint_map_[endpoint] = endpoint_id;
123 connections_.emplace(endpoint_id, std::move(connection_data));
124
125 for (auto& request : pending_entry->second.callbacks) {
126 request_map_.erase(request.first);
127 std::unique_ptr<QuicProtocolConnection> pc =
128 QuicProtocolConnection::FromExisting(this, connection, delegate,
129 endpoint_id);
130 request_map_.erase(request.first);
131 request.second->OnConnectionOpened(request.first, std::move(pc));
132 }
133 pending_connections_.erase(pending_entry);
134 return endpoint_id;
135 }
136
OnIncomingStream(std::unique_ptr<QuicProtocolConnection> connection)137 void QuicClient::OnIncomingStream(
138 std::unique_ptr<QuicProtocolConnection> connection) {
139 // TODO(jophba): Change to just use OnIncomingConnection when the observer
140 // is properly set up.
141 connection->CloseWriteEnd();
142 connection.reset();
143 }
144
OnConnectionClosed(uint64_t endpoint_id,uint64_t connection_id)145 void QuicClient::OnConnectionClosed(uint64_t endpoint_id,
146 uint64_t connection_id) {
147 // TODO(btolsch): Is this how handshake failure is communicated to the
148 // delegate?
149 auto connection_entry = connections_.find(endpoint_id);
150 if (connection_entry == connections_.end())
151 return;
152 delete_connections_.push_back(endpoint_id);
153
154 // TODO(crbug.com/openscreen/42): If we reset request IDs when a connection is
155 // closed, we might end up re-using request IDs when a new connection is
156 // created to the same endpoint.
157 endpoint_request_ids_.ResetRequestId(endpoint_id);
158 }
159
OnDataReceived(uint64_t endpoint_id,uint64_t connection_id,const uint8_t * data,size_t data_size)160 void QuicClient::OnDataReceived(uint64_t endpoint_id,
161 uint64_t connection_id,
162 const uint8_t* data,
163 size_t data_size) {
164 demuxer_->OnStreamData(endpoint_id, connection_id, data, data_size);
165 }
166
PendingConnectionData(ServiceConnectionData && data)167 QuicClient::PendingConnectionData::PendingConnectionData(
168 ServiceConnectionData&& data)
169 : data(std::move(data)) {}
170 QuicClient::PendingConnectionData::PendingConnectionData(
171 PendingConnectionData&&) noexcept = default;
172 QuicClient::PendingConnectionData::~PendingConnectionData() = default;
173 QuicClient::PendingConnectionData& QuicClient::PendingConnectionData::operator=(
174 PendingConnectionData&&) noexcept = default;
175
CreatePendingConnection(const IPEndpoint & endpoint,ConnectionRequestCallback * request)176 QuicClient::ConnectRequest QuicClient::CreatePendingConnection(
177 const IPEndpoint& endpoint,
178 ConnectionRequestCallback* request) {
179 auto pending_entry = pending_connections_.find(endpoint);
180 if (pending_entry == pending_connections_.end()) {
181 uint64_t request_id = StartConnectionRequest(endpoint, request);
182 return ConnectRequest(this, request_id);
183 } else {
184 uint64_t request_id = next_request_id_++;
185 pending_entry->second.callbacks.emplace_back(request_id, request);
186 return ConnectRequest(this, request_id);
187 }
188 }
189
StartConnectionRequest(const IPEndpoint & endpoint,ConnectionRequestCallback * request)190 uint64_t QuicClient::StartConnectionRequest(
191 const IPEndpoint& endpoint,
192 ConnectionRequestCallback* request) {
193 auto delegate = std::make_unique<ServiceConnectionDelegate>(this, endpoint);
194 std::unique_ptr<QuicConnection> connection =
195 connection_factory_->Connect(endpoint, delegate.get());
196 if (!connection) {
197 // TODO(btolsch): Need interface/handling for Connect() failures. Or, should
198 // request->OnConnectionFailed() be called?
199 OSP_DCHECK(false)
200 << __func__
201 << ": Factory connect failed, but requestor will never know.";
202 return 0;
203 }
204 auto pending_result = pending_connections_.emplace(
205 endpoint, PendingConnectionData(ServiceConnectionData(
206 std::move(connection), std::move(delegate))));
207 uint64_t request_id = next_request_id_++;
208 pending_result.first->second.callbacks.emplace_back(request_id, request);
209 return request_id;
210 }
211
CloseAllConnections()212 void QuicClient::CloseAllConnections() {
213 for (auto& conn : pending_connections_)
214 conn.second.data.connection->Close();
215
216 pending_connections_.clear();
217 for (auto& conn : connections_)
218 conn.second.connection->Close();
219
220 connections_.clear();
221 endpoint_map_.clear();
222 next_endpoint_id_ = 0;
223 endpoint_request_ids_.Reset();
224 for (auto& request : request_map_) {
225 request.second.second->OnConnectionFailed(request.first);
226 }
227 request_map_.clear();
228 }
229
CancelConnectRequest(uint64_t request_id)230 void QuicClient::CancelConnectRequest(uint64_t request_id) {
231 auto request_entry = request_map_.find(request_id);
232 if (request_entry == request_map_.end())
233 return;
234
235 auto pending_entry = pending_connections_.find(request_entry->second.first);
236 if (pending_entry != pending_connections_.end()) {
237 auto& callbacks = pending_entry->second.callbacks;
238 callbacks.erase(
239 std::remove_if(
240 callbacks.begin(), callbacks.end(),
241 [request_id](const std::pair<uint64_t, ConnectionRequestCallback*>&
242 callback) {
243 return request_id == callback.first;
244 }),
245 callbacks.end());
246 if (callbacks.empty())
247 pending_connections_.erase(pending_entry);
248 }
249 request_map_.erase(request_entry);
250 }
251
252 } // namespace osp
253 } // namespace openscreen
254