1 // Copyright 2015 The Weave Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "src/notification/xmpp_channel.h"
6
7 #include <string>
8
9 #include <base/bind.h>
10 #include <base/strings/string_number_conversions.h>
11 #include <weave/provider/network.h>
12 #include <weave/provider/task_runner.h>
13
14 #include "src/backoff_entry.h"
15 #include "src/data_encoding.h"
16 #include "src/notification/notification_delegate.h"
17 #include "src/notification/notification_parser.h"
18 #include "src/notification/xml_node.h"
19 #include "src/privet/openssl_utils.h"
20 #include "src/string_utils.h"
21 #include "src/utils.h"
22
23 namespace weave {
24
25 namespace {
26
BuildXmppStartStreamCommand()27 std::string BuildXmppStartStreamCommand() {
28 return "<stream:stream to='clouddevices.gserviceaccount.com' "
29 "xmlns:stream='http://etherx.jabber.org/streams' "
30 "xml:lang='*' version='1.0' xmlns='jabber:client'>";
31 }
32
BuildXmppAuthenticateCommand(const std::string & account,const std::string & token)33 std::string BuildXmppAuthenticateCommand(const std::string& account,
34 const std::string& token) {
35 std::vector<uint8_t> credentials;
36 credentials.push_back(0);
37 credentials.insert(credentials.end(), account.begin(), account.end());
38 credentials.push_back(0);
39 credentials.insert(credentials.end(), token.begin(), token.end());
40 std::string msg =
41 "<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' "
42 "mechanism='X-OAUTH2' auth:service='oauth2' "
43 "auth:allow-non-google-login='true' "
44 "auth:client-uses-full-bind-result='true' "
45 "xmlns:auth='http://www.google.com/talk/protocol/auth'>" +
46 Base64Encode(credentials) + "</auth>";
47 return msg;
48 }
49
50 // Backoff policy.
51 // Note: In order to ensure a minimum of 20 seconds between server errors,
52 // we have a 30s +- 10s (33%) jitter initial backoff.
53 const BackoffEntry::Policy kDefaultBackoffPolicy = {
54 // Number of initial errors (in sequence) to ignore before applying
55 // exponential back-off rules.
56 0,
57
58 // Initial delay for exponential back-off in ms.
59 30 * 1000, // 30 seconds.
60
61 // Factor by which the waiting time will be multiplied.
62 2,
63
64 // Fuzzing percentage. ex: 10% will spread requests randomly
65 // between 90%-100% of the calculated time.
66 0.33, // 33%.
67
68 // Maximum amount of time we are willing to delay our request in ms.
69 10 * 60 * 1000, // 10 minutes.
70
71 // Time to keep an entry from being discarded even when it
72 // has no significant state, -1 to never discard.
73 -1,
74
75 // Don't use initial delay unless the last request was an error.
76 false,
77 };
78
79 // Used for keeping connection alive.
80 const int kRegularPingIntervalSeconds = 60;
81 const int kRegularPingTimeoutSeconds = 30;
82
83 // Used for diagnostic when connectivity changed.
84 const int kAgressivePingIntervalSeconds = 5;
85 const int kAgressivePingTimeoutSeconds = 10;
86
87 const int kConnectingTimeoutAfterNetChangeSeconds = 30;
88
89 } // namespace
90
XmppChannel(const std::string & account,const std::string & access_token,const std::string & xmpp_endpoint,provider::TaskRunner * task_runner,provider::Network * network)91 XmppChannel::XmppChannel(const std::string& account,
92 const std::string& access_token,
93 const std::string& xmpp_endpoint,
94 provider::TaskRunner* task_runner,
95 provider::Network* network)
96 : account_{account},
97 access_token_{access_token},
98 xmpp_endpoint_{xmpp_endpoint},
99 network_{network},
100 backoff_entry_{&kDefaultBackoffPolicy},
101 task_runner_{task_runner},
102 iq_stanza_handler_{new IqStanzaHandler{this, task_runner}} {
103 read_socket_data_.resize(4096);
104 if (network) {
105 network->AddConnectionChangedCallback(base::Bind(
106 &XmppChannel::OnConnectivityChanged, weak_ptr_factory_.GetWeakPtr()));
107 }
108 }
109
OnMessageRead(size_t size,ErrorPtr error)110 void XmppChannel::OnMessageRead(size_t size, ErrorPtr error) {
111 read_pending_ = false;
112 if (error)
113 return Restart();
114 std::string msg(read_socket_data_.data(), size);
115 VLOG(2) << "Received XMPP packet: '" << msg << "'";
116
117 if (!size)
118 return Restart();
119
120 stream_parser_.ParseData(msg);
121 WaitForMessage();
122 }
123
OnStreamStart(const std::string & node_name,std::map<std::string,std::string> attributes)124 void XmppChannel::OnStreamStart(const std::string& node_name,
125 std::map<std::string, std::string> attributes) {
126 VLOG(2) << "XMPP stream start: " << node_name;
127 }
128
OnStreamEnd(const std::string & node_name)129 void XmppChannel::OnStreamEnd(const std::string& node_name) {
130 VLOG(2) << "XMPP stream ended: " << node_name;
131 Stop();
132 if (IsConnected()) {
133 // If we had a fully-established connection, restart it now.
134 // However, if the connection has never been established yet (e.g.
135 // authorization failed), do not restart right now. Wait till we get
136 // new credentials.
137 task_runner_->PostDelayedTask(
138 FROM_HERE,
139 base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()), {});
140 } else if (delegate_) {
141 delegate_->OnPermanentFailure();
142 }
143 }
144
OnStanza(std::unique_ptr<XmlNode> stanza)145 void XmppChannel::OnStanza(std::unique_ptr<XmlNode> stanza) {
146 // Handle stanza asynchronously, since XmppChannel::OnStanza() is a callback
147 // from expat XML parser and some stanza could cause the XMPP stream to be
148 // reset and the parser to be re-initialized. We don't want to destroy the
149 // parser while it is performing a callback invocation.
150 task_runner_->PostDelayedTask(
151 FROM_HERE,
152 base::Bind(&XmppChannel::HandleStanza, task_ptr_factory_.GetWeakPtr(),
153 base::Passed(std::move(stanza))),
154 {});
155 }
156
HandleStanza(std::unique_ptr<XmlNode> stanza)157 void XmppChannel::HandleStanza(std::unique_ptr<XmlNode> stanza) {
158 VLOG(2) << "XMPP stanza received: " << stanza->ToString();
159
160 switch (state_) {
161 case XmppState::kConnected:
162 if (stanza->name() == "stream:features") {
163 auto children = stanza->FindChildren("mechanisms/mechanism", false);
164 for (const auto& child : children) {
165 if (child->text() == "X-OAUTH2") {
166 state_ = XmppState::kAuthenticationStarted;
167 SendMessage(BuildXmppAuthenticateCommand(account_, access_token_));
168 return;
169 }
170 }
171 }
172 break;
173 case XmppState::kAuthenticationStarted:
174 if (stanza->name() == "success") {
175 state_ = XmppState::kStreamRestartedPostAuthentication;
176 RestartXmppStream();
177 return;
178 } else if (stanza->name() == "failure") {
179 if (stanza->FindFirstChild("not-authorized", false)) {
180 state_ = XmppState::kAuthenticationFailed;
181 return;
182 }
183 }
184 break;
185 case XmppState::kStreamRestartedPostAuthentication:
186 if (stanza->name() == "stream:features" &&
187 stanza->FindFirstChild("bind", false)) {
188 state_ = XmppState::kBindSent;
189 iq_stanza_handler_->SendRequest(
190 "set", "", "", "<bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'/>",
191 base::Bind(&XmppChannel::OnBindCompleted,
192 task_ptr_factory_.GetWeakPtr()),
193 base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()));
194 return;
195 }
196 break;
197 default:
198 if (stanza->name() == "message") {
199 HandleMessageStanza(std::move(stanza));
200 return;
201 } else if (stanza->name() == "iq") {
202 if (!iq_stanza_handler_->HandleIqStanza(std::move(stanza))) {
203 LOG(ERROR) << "Failed to handle IQ stanza";
204 CloseStream();
205 }
206 return;
207 }
208 LOG(INFO) << "Unexpected XMPP stanza ignored: " << stanza->ToString();
209 return;
210 }
211 // Something bad happened. Close the stream and start over.
212 LOG(ERROR) << "Error condition occurred handling stanza: "
213 << stanza->ToString() << " in state: " << static_cast<int>(state_);
214 CloseStream();
215 }
216
CloseStream()217 void XmppChannel::CloseStream() {
218 SendMessage("</stream:stream>");
219 }
220
OnBindCompleted(std::unique_ptr<XmlNode> reply)221 void XmppChannel::OnBindCompleted(std::unique_ptr<XmlNode> reply) {
222 if (reply->GetAttributeOrEmpty("type") != "result") {
223 CloseStream();
224 return;
225 }
226 const XmlNode* jid_node = reply->FindFirstChild("bind/jid", false);
227 if (!jid_node) {
228 LOG(ERROR) << "XMPP Bind response is missing JID";
229 CloseStream();
230 return;
231 }
232
233 jid_ = jid_node->text();
234 state_ = XmppState::kSessionStarted;
235 iq_stanza_handler_->SendRequest(
236 "set", "", "", "<session xmlns='urn:ietf:params:xml:ns:xmpp-session'/>",
237 base::Bind(&XmppChannel::OnSessionEstablished,
238 task_ptr_factory_.GetWeakPtr()),
239 base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()));
240 }
241
OnSessionEstablished(std::unique_ptr<XmlNode> reply)242 void XmppChannel::OnSessionEstablished(std::unique_ptr<XmlNode> reply) {
243 if (reply->GetAttributeOrEmpty("type") != "result") {
244 CloseStream();
245 return;
246 }
247 state_ = XmppState::kSubscribeStarted;
248 std::string body =
249 "<subscribe xmlns='google:push'>"
250 "<item channel='cloud_devices' from=''/></subscribe>";
251 iq_stanza_handler_->SendRequest(
252 "set", "", account_, body,
253 base::Bind(&XmppChannel::OnSubscribed, task_ptr_factory_.GetWeakPtr()),
254 base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()));
255 }
256
OnSubscribed(std::unique_ptr<XmlNode> reply)257 void XmppChannel::OnSubscribed(std::unique_ptr<XmlNode> reply) {
258 if (reply->GetAttributeOrEmpty("type") != "result") {
259 CloseStream();
260 return;
261 }
262 state_ = XmppState::kSubscribed;
263 if (delegate_)
264 delegate_->OnConnected(GetName());
265 }
266
HandleMessageStanza(std::unique_ptr<XmlNode> stanza)267 void XmppChannel::HandleMessageStanza(std::unique_ptr<XmlNode> stanza) {
268 const XmlNode* node = stanza->FindFirstChild("push:push/push:data", true);
269 if (!node) {
270 LOG(WARNING) << "XMPP message stanza is missing <push:data> element";
271 return;
272 }
273 std::string data = node->text();
274 std::string json_data;
275 if (!Base64Decode(data, &json_data)) {
276 LOG(WARNING) << "Failed to decode base64-encoded message payload: " << data;
277 return;
278 }
279
280 VLOG(2) << "XMPP push notification data: " << json_data;
281 auto json_dict = LoadJsonDict(json_data, nullptr);
282 if (json_dict && delegate_)
283 ParseNotificationJson(*json_dict, delegate_, GetName());
284 }
285
CreateSslSocket()286 void XmppChannel::CreateSslSocket() {
287 CHECK(!stream_);
288 state_ = XmppState::kConnecting;
289 LOG(INFO) << "Starting XMPP connection to: " << xmpp_endpoint_;
290
291 std::pair<std::string, std::string> host_port =
292 SplitAtFirst(xmpp_endpoint_, ":", true);
293 CHECK(!host_port.first.empty());
294 CHECK(!host_port.second.empty());
295 uint32_t port = 0;
296 CHECK(base::StringToUint(host_port.second, &port)) << xmpp_endpoint_;
297
298 network_->OpenSslSocket(host_port.first, port,
299 base::Bind(&XmppChannel::OnSslSocketReady,
300 task_ptr_factory_.GetWeakPtr()));
301 }
302
OnSslSocketReady(std::unique_ptr<Stream> stream,ErrorPtr error)303 void XmppChannel::OnSslSocketReady(std::unique_ptr<Stream> stream,
304 ErrorPtr error) {
305 if (error) {
306 LOG(ERROR) << "TLS handshake failed. Restarting XMPP connection";
307 backoff_entry_.InformOfRequest(false);
308
309 LOG(INFO) << "Delaying connection to XMPP server for "
310 << backoff_entry_.GetTimeUntilRelease();
311 return task_runner_->PostDelayedTask(
312 FROM_HERE, base::Bind(&XmppChannel::CreateSslSocket,
313 task_ptr_factory_.GetWeakPtr()),
314 backoff_entry_.GetTimeUntilRelease());
315 }
316 CHECK(XmppState::kConnecting == state_);
317 backoff_entry_.InformOfRequest(true);
318 stream_ = std::move(stream);
319 state_ = XmppState::kConnected;
320 RestartXmppStream();
321 ScheduleRegularPing();
322 }
323
SendMessage(const std::string & message)324 void XmppChannel::SendMessage(const std::string& message) {
325 CHECK(stream_) << "No XMPP socket stream available";
326 if (write_pending_) {
327 queued_write_data_ += message;
328 return;
329 }
330 write_socket_data_ = queued_write_data_ + message;
331 queued_write_data_.clear();
332 VLOG(2) << "Sending XMPP message: " << message;
333
334 write_pending_ = true;
335 stream_->Write(
336 write_socket_data_.data(), write_socket_data_.size(),
337 base::Bind(&XmppChannel::OnMessageSent, task_ptr_factory_.GetWeakPtr()));
338 }
339
OnMessageSent(ErrorPtr error)340 void XmppChannel::OnMessageSent(ErrorPtr error) {
341 write_pending_ = false;
342 if (error)
343 return Restart();
344 if (queued_write_data_.empty()) {
345 WaitForMessage();
346 } else {
347 SendMessage(std::string{});
348 }
349 }
350
WaitForMessage()351 void XmppChannel::WaitForMessage() {
352 if (read_pending_ || !stream_)
353 return;
354
355 read_pending_ = true;
356 stream_->Read(
357 read_socket_data_.data(), read_socket_data_.size(),
358 base::Bind(&XmppChannel::OnMessageRead, task_ptr_factory_.GetWeakPtr()));
359 }
360
GetName() const361 std::string XmppChannel::GetName() const {
362 return "xmpp";
363 }
364
IsConnected() const365 bool XmppChannel::IsConnected() const {
366 return state_ == XmppState::kSubscribed;
367 }
368
AddChannelParameters(base::DictionaryValue * channel_json)369 void XmppChannel::AddChannelParameters(base::DictionaryValue* channel_json) {
370 // No extra parameters needed for XMPP.
371 }
372
Restart()373 void XmppChannel::Restart() {
374 LOG(INFO) << "Restarting XMPP";
375 Stop();
376 Start(delegate_);
377 }
378
Start(NotificationDelegate * delegate)379 void XmppChannel::Start(NotificationDelegate* delegate) {
380 CHECK(state_ == XmppState::kNotStarted);
381 delegate_ = delegate;
382
383 CreateSslSocket();
384 }
385
Stop()386 void XmppChannel::Stop() {
387 if (IsConnected() && delegate_)
388 delegate_->OnDisconnected();
389
390 task_ptr_factory_.InvalidateWeakPtrs();
391 ping_ptr_factory_.InvalidateWeakPtrs();
392
393 stream_.reset();
394 state_ = XmppState::kNotStarted;
395 }
396
RestartXmppStream()397 void XmppChannel::RestartXmppStream() {
398 stream_parser_.Reset();
399 stream_->CancelPendingOperations();
400 read_pending_ = false;
401 write_pending_ = false;
402 SendMessage(BuildXmppStartStreamCommand());
403 }
404
SchedulePing(base::TimeDelta interval,base::TimeDelta timeout)405 void XmppChannel::SchedulePing(base::TimeDelta interval,
406 base::TimeDelta timeout) {
407 VLOG(1) << "Next XMPP ping in " << interval << " with timeout " << timeout;
408 ping_ptr_factory_.InvalidateWeakPtrs();
409 task_runner_->PostDelayedTask(
410 FROM_HERE, base::Bind(&XmppChannel::PingServer,
411 ping_ptr_factory_.GetWeakPtr(), timeout),
412 interval);
413 }
414
ScheduleRegularPing()415 void XmppChannel::ScheduleRegularPing() {
416 SchedulePing(base::TimeDelta::FromSeconds(kRegularPingIntervalSeconds),
417 base::TimeDelta::FromSeconds(kRegularPingTimeoutSeconds));
418 }
419
ScheduleFastPing()420 void XmppChannel::ScheduleFastPing() {
421 SchedulePing(base::TimeDelta::FromSeconds(kAgressivePingIntervalSeconds),
422 base::TimeDelta::FromSeconds(kAgressivePingTimeoutSeconds));
423 }
424
PingServer(base::TimeDelta timeout)425 void XmppChannel::PingServer(base::TimeDelta timeout) {
426 VLOG(1) << "Sending XMPP ping";
427 if (!IsConnected()) {
428 LOG(WARNING) << "XMPP channel is not connected";
429 Restart();
430 return;
431 }
432
433 // Send an XMPP Ping request as defined in XEP-0199 extension:
434 // http://xmpp.org/extensions/xep-0199.html
435 iq_stanza_handler_->SendRequestWithCustomTimeout(
436 "get", jid_, account_, "<ping xmlns='urn:xmpp:ping'/>", timeout,
437 base::Bind(&XmppChannel::OnPingResponse, task_ptr_factory_.GetWeakPtr(),
438 base::Time::Now()),
439 base::Bind(&XmppChannel::OnPingTimeout, task_ptr_factory_.GetWeakPtr(),
440 base::Time::Now()));
441 }
442
OnPingResponse(base::Time sent_time,std::unique_ptr<XmlNode> reply)443 void XmppChannel::OnPingResponse(base::Time sent_time,
444 std::unique_ptr<XmlNode> reply) {
445 VLOG(1) << "XMPP response received after " << (base::Time::Now() - sent_time);
446 // Ping response received from server. Everything seems to be in order.
447 // Reschedule with default intervals.
448 ScheduleRegularPing();
449 }
450
OnPingTimeout(base::Time sent_time)451 void XmppChannel::OnPingTimeout(base::Time sent_time) {
452 LOG(WARNING) << "XMPP channel seems to be disconnected. Ping timed out after "
453 << (base::Time::Now() - sent_time);
454 Restart();
455 }
456
OnConnectivityChanged()457 void XmppChannel::OnConnectivityChanged() {
458 if (state_ == XmppState::kNotStarted)
459 return;
460
461 if (state_ == XmppState::kConnecting &&
462 backoff_entry_.GetTimeUntilRelease() <
463 base::TimeDelta::FromSeconds(
464 kConnectingTimeoutAfterNetChangeSeconds)) {
465 VLOG(1) << "Next reconnect in " << backoff_entry_.GetTimeUntilRelease();
466 return;
467 }
468
469 ScheduleFastPing();
470 }
471
472 } // namespace weave
473