• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2015 The Weave Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "src/notification/xmpp_channel.h"
6 
7 #include <string>
8 
9 #include <base/bind.h>
10 #include <base/strings/string_number_conversions.h>
11 #include <weave/provider/network.h>
12 #include <weave/provider/task_runner.h>
13 
14 #include "src/backoff_entry.h"
15 #include "src/data_encoding.h"
16 #include "src/notification/notification_delegate.h"
17 #include "src/notification/notification_parser.h"
18 #include "src/notification/xml_node.h"
19 #include "src/privet/openssl_utils.h"
20 #include "src/string_utils.h"
21 #include "src/utils.h"
22 
23 namespace weave {
24 
25 namespace {
26 
BuildXmppStartStreamCommand()27 std::string BuildXmppStartStreamCommand() {
28   return "<stream:stream to='clouddevices.gserviceaccount.com' "
29          "xmlns:stream='http://etherx.jabber.org/streams' "
30          "xml:lang='*' version='1.0' xmlns='jabber:client'>";
31 }
32 
BuildXmppAuthenticateCommand(const std::string & account,const std::string & token)33 std::string BuildXmppAuthenticateCommand(const std::string& account,
34                                          const std::string& token) {
35   std::vector<uint8_t> credentials;
36   credentials.push_back(0);
37   credentials.insert(credentials.end(), account.begin(), account.end());
38   credentials.push_back(0);
39   credentials.insert(credentials.end(), token.begin(), token.end());
40   std::string msg =
41       "<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' "
42       "mechanism='X-OAUTH2' auth:service='oauth2' "
43       "auth:allow-non-google-login='true' "
44       "auth:client-uses-full-bind-result='true' "
45       "xmlns:auth='http://www.google.com/talk/protocol/auth'>" +
46       Base64Encode(credentials) + "</auth>";
47   return msg;
48 }
49 
50 // Backoff policy.
51 // Note: In order to ensure a minimum of 20 seconds between server errors,
52 // we have a 30s +- 10s (33%) jitter initial backoff.
53 const BackoffEntry::Policy kDefaultBackoffPolicy = {
54     // Number of initial errors (in sequence) to ignore before applying
55     // exponential back-off rules.
56     0,
57 
58     // Initial delay for exponential back-off in ms.
59     30 * 1000,  // 30 seconds.
60 
61     // Factor by which the waiting time will be multiplied.
62     2,
63 
64     // Fuzzing percentage. ex: 10% will spread requests randomly
65     // between 90%-100% of the calculated time.
66     0.33,  // 33%.
67 
68     // Maximum amount of time we are willing to delay our request in ms.
69     10 * 60 * 1000,  // 10 minutes.
70 
71     // Time to keep an entry from being discarded even when it
72     // has no significant state, -1 to never discard.
73     -1,
74 
75     // Don't use initial delay unless the last request was an error.
76     false,
77 };
78 
79 // Used for keeping connection alive.
80 const int kRegularPingIntervalSeconds = 60;
81 const int kRegularPingTimeoutSeconds = 30;
82 
83 // Used for diagnostic when connectivity changed.
84 const int kAgressivePingIntervalSeconds = 5;
85 const int kAgressivePingTimeoutSeconds = 10;
86 
87 const int kConnectingTimeoutAfterNetChangeSeconds = 30;
88 
89 }  // namespace
90 
XmppChannel(const std::string & account,const std::string & access_token,const std::string & xmpp_endpoint,provider::TaskRunner * task_runner,provider::Network * network)91 XmppChannel::XmppChannel(const std::string& account,
92                          const std::string& access_token,
93                          const std::string& xmpp_endpoint,
94                          provider::TaskRunner* task_runner,
95                          provider::Network* network)
96     : account_{account},
97       access_token_{access_token},
98       xmpp_endpoint_{xmpp_endpoint},
99       network_{network},
100       backoff_entry_{&kDefaultBackoffPolicy},
101       task_runner_{task_runner},
102       iq_stanza_handler_{new IqStanzaHandler{this, task_runner}} {
103   read_socket_data_.resize(4096);
104   if (network) {
105     network->AddConnectionChangedCallback(base::Bind(
106         &XmppChannel::OnConnectivityChanged, weak_ptr_factory_.GetWeakPtr()));
107   }
108 }
109 
OnMessageRead(size_t size,ErrorPtr error)110 void XmppChannel::OnMessageRead(size_t size, ErrorPtr error) {
111   read_pending_ = false;
112   if (error)
113     return Restart();
114   std::string msg(read_socket_data_.data(), size);
115   VLOG(2) << "Received XMPP packet: '" << msg << "'";
116 
117   if (!size)
118     return Restart();
119 
120   stream_parser_.ParseData(msg);
121   WaitForMessage();
122 }
123 
OnStreamStart(const std::string & node_name,std::map<std::string,std::string> attributes)124 void XmppChannel::OnStreamStart(const std::string& node_name,
125                                 std::map<std::string, std::string> attributes) {
126   VLOG(2) << "XMPP stream start: " << node_name;
127 }
128 
OnStreamEnd(const std::string & node_name)129 void XmppChannel::OnStreamEnd(const std::string& node_name) {
130   VLOG(2) << "XMPP stream ended: " << node_name;
131   Stop();
132   if (IsConnected()) {
133     // If we had a fully-established connection, restart it now.
134     // However, if the connection has never been established yet (e.g.
135     // authorization failed), do not restart right now. Wait till we get
136     // new credentials.
137     task_runner_->PostDelayedTask(
138         FROM_HERE,
139         base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()), {});
140   } else if (delegate_) {
141     delegate_->OnPermanentFailure();
142   }
143 }
144 
OnStanza(std::unique_ptr<XmlNode> stanza)145 void XmppChannel::OnStanza(std::unique_ptr<XmlNode> stanza) {
146   // Handle stanza asynchronously, since XmppChannel::OnStanza() is a callback
147   // from expat XML parser and some stanza could cause the XMPP stream to be
148   // reset and the parser to be re-initialized. We don't want to destroy the
149   // parser while it is performing a callback invocation.
150   task_runner_->PostDelayedTask(
151       FROM_HERE,
152       base::Bind(&XmppChannel::HandleStanza, task_ptr_factory_.GetWeakPtr(),
153                  base::Passed(std::move(stanza))),
154       {});
155 }
156 
HandleStanza(std::unique_ptr<XmlNode> stanza)157 void XmppChannel::HandleStanza(std::unique_ptr<XmlNode> stanza) {
158   VLOG(2) << "XMPP stanza received: " << stanza->ToString();
159 
160   switch (state_) {
161     case XmppState::kConnected:
162       if (stanza->name() == "stream:features") {
163         auto children = stanza->FindChildren("mechanisms/mechanism", false);
164         for (const auto& child : children) {
165           if (child->text() == "X-OAUTH2") {
166             state_ = XmppState::kAuthenticationStarted;
167             SendMessage(BuildXmppAuthenticateCommand(account_, access_token_));
168             return;
169           }
170         }
171       }
172       break;
173     case XmppState::kAuthenticationStarted:
174       if (stanza->name() == "success") {
175         state_ = XmppState::kStreamRestartedPostAuthentication;
176         RestartXmppStream();
177         return;
178       } else if (stanza->name() == "failure") {
179         if (stanza->FindFirstChild("not-authorized", false)) {
180           state_ = XmppState::kAuthenticationFailed;
181           return;
182         }
183       }
184       break;
185     case XmppState::kStreamRestartedPostAuthentication:
186       if (stanza->name() == "stream:features" &&
187           stanza->FindFirstChild("bind", false)) {
188         state_ = XmppState::kBindSent;
189         iq_stanza_handler_->SendRequest(
190             "set", "", "", "<bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'/>",
191             base::Bind(&XmppChannel::OnBindCompleted,
192                        task_ptr_factory_.GetWeakPtr()),
193             base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()));
194         return;
195       }
196       break;
197     default:
198       if (stanza->name() == "message") {
199         HandleMessageStanza(std::move(stanza));
200         return;
201       } else if (stanza->name() == "iq") {
202         if (!iq_stanza_handler_->HandleIqStanza(std::move(stanza))) {
203           LOG(ERROR) << "Failed to handle IQ stanza";
204           CloseStream();
205         }
206         return;
207       }
208       LOG(INFO) << "Unexpected XMPP stanza ignored: " << stanza->ToString();
209       return;
210   }
211   // Something bad happened. Close the stream and start over.
212   LOG(ERROR) << "Error condition occurred handling stanza: "
213              << stanza->ToString() << " in state: " << static_cast<int>(state_);
214   CloseStream();
215 }
216 
CloseStream()217 void XmppChannel::CloseStream() {
218   SendMessage("</stream:stream>");
219 }
220 
OnBindCompleted(std::unique_ptr<XmlNode> reply)221 void XmppChannel::OnBindCompleted(std::unique_ptr<XmlNode> reply) {
222   if (reply->GetAttributeOrEmpty("type") != "result") {
223     CloseStream();
224     return;
225   }
226   const XmlNode* jid_node = reply->FindFirstChild("bind/jid", false);
227   if (!jid_node) {
228     LOG(ERROR) << "XMPP Bind response is missing JID";
229     CloseStream();
230     return;
231   }
232 
233   jid_ = jid_node->text();
234   state_ = XmppState::kSessionStarted;
235   iq_stanza_handler_->SendRequest(
236       "set", "", "", "<session xmlns='urn:ietf:params:xml:ns:xmpp-session'/>",
237       base::Bind(&XmppChannel::OnSessionEstablished,
238                  task_ptr_factory_.GetWeakPtr()),
239       base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()));
240 }
241 
OnSessionEstablished(std::unique_ptr<XmlNode> reply)242 void XmppChannel::OnSessionEstablished(std::unique_ptr<XmlNode> reply) {
243   if (reply->GetAttributeOrEmpty("type") != "result") {
244     CloseStream();
245     return;
246   }
247   state_ = XmppState::kSubscribeStarted;
248   std::string body =
249       "<subscribe xmlns='google:push'>"
250       "<item channel='cloud_devices' from=''/></subscribe>";
251   iq_stanza_handler_->SendRequest(
252       "set", "", account_, body,
253       base::Bind(&XmppChannel::OnSubscribed, task_ptr_factory_.GetWeakPtr()),
254       base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()));
255 }
256 
OnSubscribed(std::unique_ptr<XmlNode> reply)257 void XmppChannel::OnSubscribed(std::unique_ptr<XmlNode> reply) {
258   if (reply->GetAttributeOrEmpty("type") != "result") {
259     CloseStream();
260     return;
261   }
262   state_ = XmppState::kSubscribed;
263   if (delegate_)
264     delegate_->OnConnected(GetName());
265 }
266 
HandleMessageStanza(std::unique_ptr<XmlNode> stanza)267 void XmppChannel::HandleMessageStanza(std::unique_ptr<XmlNode> stanza) {
268   const XmlNode* node = stanza->FindFirstChild("push:push/push:data", true);
269   if (!node) {
270     LOG(WARNING) << "XMPP message stanza is missing <push:data> element";
271     return;
272   }
273   std::string data = node->text();
274   std::string json_data;
275   if (!Base64Decode(data, &json_data)) {
276     LOG(WARNING) << "Failed to decode base64-encoded message payload: " << data;
277     return;
278   }
279 
280   VLOG(2) << "XMPP push notification data: " << json_data;
281   auto json_dict = LoadJsonDict(json_data, nullptr);
282   if (json_dict && delegate_)
283     ParseNotificationJson(*json_dict, delegate_, GetName());
284 }
285 
CreateSslSocket()286 void XmppChannel::CreateSslSocket() {
287   CHECK(!stream_);
288   state_ = XmppState::kConnecting;
289   LOG(INFO) << "Starting XMPP connection to: " << xmpp_endpoint_;
290 
291   std::pair<std::string, std::string> host_port =
292       SplitAtFirst(xmpp_endpoint_, ":", true);
293   CHECK(!host_port.first.empty());
294   CHECK(!host_port.second.empty());
295   uint32_t port = 0;
296   CHECK(base::StringToUint(host_port.second, &port)) << xmpp_endpoint_;
297 
298   network_->OpenSslSocket(host_port.first, port,
299                           base::Bind(&XmppChannel::OnSslSocketReady,
300                                      task_ptr_factory_.GetWeakPtr()));
301 }
302 
OnSslSocketReady(std::unique_ptr<Stream> stream,ErrorPtr error)303 void XmppChannel::OnSslSocketReady(std::unique_ptr<Stream> stream,
304                                    ErrorPtr error) {
305   if (error) {
306     LOG(ERROR) << "TLS handshake failed. Restarting XMPP connection";
307     backoff_entry_.InformOfRequest(false);
308 
309     LOG(INFO) << "Delaying connection to XMPP server for "
310               << backoff_entry_.GetTimeUntilRelease();
311     return task_runner_->PostDelayedTask(
312         FROM_HERE, base::Bind(&XmppChannel::CreateSslSocket,
313                               task_ptr_factory_.GetWeakPtr()),
314         backoff_entry_.GetTimeUntilRelease());
315   }
316   CHECK(XmppState::kConnecting == state_);
317   backoff_entry_.InformOfRequest(true);
318   stream_ = std::move(stream);
319   state_ = XmppState::kConnected;
320   RestartXmppStream();
321   ScheduleRegularPing();
322 }
323 
SendMessage(const std::string & message)324 void XmppChannel::SendMessage(const std::string& message) {
325   CHECK(stream_) << "No XMPP socket stream available";
326   if (write_pending_) {
327     queued_write_data_ += message;
328     return;
329   }
330   write_socket_data_ = queued_write_data_ + message;
331   queued_write_data_.clear();
332   VLOG(2) << "Sending XMPP message: " << message;
333 
334   write_pending_ = true;
335   stream_->Write(
336       write_socket_data_.data(), write_socket_data_.size(),
337       base::Bind(&XmppChannel::OnMessageSent, task_ptr_factory_.GetWeakPtr()));
338 }
339 
OnMessageSent(ErrorPtr error)340 void XmppChannel::OnMessageSent(ErrorPtr error) {
341   write_pending_ = false;
342   if (error)
343     return Restart();
344   if (queued_write_data_.empty()) {
345     WaitForMessage();
346   } else {
347     SendMessage(std::string{});
348   }
349 }
350 
WaitForMessage()351 void XmppChannel::WaitForMessage() {
352   if (read_pending_ || !stream_)
353     return;
354 
355   read_pending_ = true;
356   stream_->Read(
357       read_socket_data_.data(), read_socket_data_.size(),
358       base::Bind(&XmppChannel::OnMessageRead, task_ptr_factory_.GetWeakPtr()));
359 }
360 
GetName() const361 std::string XmppChannel::GetName() const {
362   return "xmpp";
363 }
364 
IsConnected() const365 bool XmppChannel::IsConnected() const {
366   return state_ == XmppState::kSubscribed;
367 }
368 
AddChannelParameters(base::DictionaryValue * channel_json)369 void XmppChannel::AddChannelParameters(base::DictionaryValue* channel_json) {
370   // No extra parameters needed for XMPP.
371 }
372 
Restart()373 void XmppChannel::Restart() {
374   LOG(INFO) << "Restarting XMPP";
375   Stop();
376   Start(delegate_);
377 }
378 
Start(NotificationDelegate * delegate)379 void XmppChannel::Start(NotificationDelegate* delegate) {
380   CHECK(state_ == XmppState::kNotStarted);
381   delegate_ = delegate;
382 
383   CreateSslSocket();
384 }
385 
Stop()386 void XmppChannel::Stop() {
387   if (IsConnected() && delegate_)
388     delegate_->OnDisconnected();
389 
390   task_ptr_factory_.InvalidateWeakPtrs();
391   ping_ptr_factory_.InvalidateWeakPtrs();
392 
393   stream_.reset();
394   state_ = XmppState::kNotStarted;
395 }
396 
RestartXmppStream()397 void XmppChannel::RestartXmppStream() {
398   stream_parser_.Reset();
399   stream_->CancelPendingOperations();
400   read_pending_ = false;
401   write_pending_ = false;
402   SendMessage(BuildXmppStartStreamCommand());
403 }
404 
SchedulePing(base::TimeDelta interval,base::TimeDelta timeout)405 void XmppChannel::SchedulePing(base::TimeDelta interval,
406                                base::TimeDelta timeout) {
407   VLOG(1) << "Next XMPP ping in " << interval << " with timeout " << timeout;
408   ping_ptr_factory_.InvalidateWeakPtrs();
409   task_runner_->PostDelayedTask(
410       FROM_HERE, base::Bind(&XmppChannel::PingServer,
411                             ping_ptr_factory_.GetWeakPtr(), timeout),
412       interval);
413 }
414 
ScheduleRegularPing()415 void XmppChannel::ScheduleRegularPing() {
416   SchedulePing(base::TimeDelta::FromSeconds(kRegularPingIntervalSeconds),
417                base::TimeDelta::FromSeconds(kRegularPingTimeoutSeconds));
418 }
419 
ScheduleFastPing()420 void XmppChannel::ScheduleFastPing() {
421   SchedulePing(base::TimeDelta::FromSeconds(kAgressivePingIntervalSeconds),
422                base::TimeDelta::FromSeconds(kAgressivePingTimeoutSeconds));
423 }
424 
PingServer(base::TimeDelta timeout)425 void XmppChannel::PingServer(base::TimeDelta timeout) {
426   VLOG(1) << "Sending XMPP ping";
427   if (!IsConnected()) {
428     LOG(WARNING) << "XMPP channel is not connected";
429     Restart();
430     return;
431   }
432 
433   // Send an XMPP Ping request as defined in XEP-0199 extension:
434   // http://xmpp.org/extensions/xep-0199.html
435   iq_stanza_handler_->SendRequestWithCustomTimeout(
436       "get", jid_, account_, "<ping xmlns='urn:xmpp:ping'/>", timeout,
437       base::Bind(&XmppChannel::OnPingResponse, task_ptr_factory_.GetWeakPtr(),
438                  base::Time::Now()),
439       base::Bind(&XmppChannel::OnPingTimeout, task_ptr_factory_.GetWeakPtr(),
440                  base::Time::Now()));
441 }
442 
OnPingResponse(base::Time sent_time,std::unique_ptr<XmlNode> reply)443 void XmppChannel::OnPingResponse(base::Time sent_time,
444                                  std::unique_ptr<XmlNode> reply) {
445   VLOG(1) << "XMPP response received after " << (base::Time::Now() - sent_time);
446   // Ping response received from server. Everything seems to be in order.
447   // Reschedule with default intervals.
448   ScheduleRegularPing();
449 }
450 
OnPingTimeout(base::Time sent_time)451 void XmppChannel::OnPingTimeout(base::Time sent_time) {
452   LOG(WARNING) << "XMPP channel seems to be disconnected. Ping timed out after "
453                << (base::Time::Now() - sent_time);
454   Restart();
455 }
456 
OnConnectivityChanged()457 void XmppChannel::OnConnectivityChanged() {
458   if (state_ == XmppState::kNotStarted)
459     return;
460 
461   if (state_ == XmppState::kConnecting &&
462       backoff_entry_.GetTimeUntilRelease() <
463           base::TimeDelta::FromSeconds(
464               kConnectingTimeoutAfterNetChangeSeconds)) {
465     VLOG(1) << "Next reconnect in " << backoff_entry_.GetTimeUntilRelease();
466     return;
467   }
468 
469   ScheduleFastPing();
470 }
471 
472 }  // namespace weave
473