• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "google_apis/gcm/engine/mcs_client.h"
6 
7 #include <set>
8 
9 #include "base/basictypes.h"
10 #include "base/message_loop/message_loop.h"
11 #include "base/metrics/histogram.h"
12 #include "base/strings/string_number_conversions.h"
13 #include "base/time/clock.h"
14 #include "base/time/time.h"
15 #include "google_apis/gcm/base/mcs_util.h"
16 #include "google_apis/gcm/base/socket_stream.h"
17 #include "google_apis/gcm/engine/connection_factory.h"
18 #include "google_apis/gcm/monitoring/gcm_stats_recorder.h"
19 
20 using namespace google::protobuf::io;
21 
22 namespace gcm {
23 
24 namespace {
25 
26 typedef scoped_ptr<google::protobuf::MessageLite> MCSProto;
27 
28 // The category of messages intended for the GCM client itself from MCS.
29 const char kMCSCategory[] = "com.google.android.gsf.gtalkservice";
30 
31 // The from field for messages originating in the GCM client.
32 const char kGCMFromField[] = "gcm@android.com";
33 
34 // MCS status message types.
35 // TODO(zea): handle these at the GCMClient layer.
36 const char kIdleNotification[] = "IdleNotification";
37 // const char kAlwaysShowOnIdle[] = "ShowAwayOnIdle";
38 // const char kPowerNotification[] = "PowerNotification";
39 // const char kDataActiveNotification[] = "DataActiveNotification";
40 
41 // The number of unacked messages to allow before sending a stream ack.
42 // Applies to both incoming and outgoing messages.
43 // TODO(zea): make this server configurable.
44 const int kUnackedMessageBeforeStreamAck = 10;
45 
46 // The global maximum number of pending messages to have in the send queue.
47 const size_t kMaxSendQueueSize = 10 * 1024;
48 
49 // The maximum message size that can be sent to the server.
50 const int kMaxMessageBytes = 4 * 1024;  // 4KB, like the server.
51 
52 // Helper for converting a proto persistent id list to a vector of strings.
BuildPersistentIdListFromProto(const google::protobuf::string & bytes,std::vector<std::string> * id_list)53 bool BuildPersistentIdListFromProto(const google::protobuf::string& bytes,
54                                     std::vector<std::string>* id_list) {
55   mcs_proto::SelectiveAck selective_ack;
56   if (!selective_ack.ParseFromString(bytes))
57     return false;
58   std::vector<std::string> new_list;
59   for (int i = 0; i < selective_ack.id_size(); ++i) {
60     DCHECK(!selective_ack.id(i).empty());
61     new_list.push_back(selective_ack.id(i));
62   }
63   id_list->swap(new_list);
64   return true;
65 }
66 
67 }  // namespace
68 
69 class CollapseKey {
70  public:
71   explicit CollapseKey(const mcs_proto::DataMessageStanza& message);
72   ~CollapseKey();
73 
74   // Comparison operator for use in maps.
75   bool operator<(const CollapseKey& right) const;
76 
77   // Whether the message had a valid collapse key.
78   bool IsValid() const;
79 
token() const80   std::string token() const { return token_; }
app_id() const81   std::string app_id() const { return app_id_; }
device_user_id() const82   int64 device_user_id() const { return device_user_id_; }
83 
84  private:
85   const std::string token_;
86   const std::string app_id_;
87   const int64 device_user_id_;
88 };
89 
CollapseKey(const mcs_proto::DataMessageStanza & message)90 CollapseKey::CollapseKey(const mcs_proto::DataMessageStanza& message)
91     : token_(message.token()),
92       app_id_(message.category()),
93       device_user_id_(message.device_user_id()) {}
94 
~CollapseKey()95 CollapseKey::~CollapseKey() {}
96 
IsValid() const97 bool CollapseKey::IsValid() const {
98   // Device user id is optional, but the application id and token are not.
99   return !token_.empty() && !app_id_.empty();
100 }
101 
operator <(const CollapseKey & right) const102 bool CollapseKey::operator<(const CollapseKey& right) const {
103   if (device_user_id_ != right.device_user_id())
104     return device_user_id_ < right.device_user_id();
105   if (app_id_ != right.app_id())
106     return app_id_ < right.app_id();
107   return token_ < right.token();
108 }
109 
110 struct ReliablePacketInfo {
111   ReliablePacketInfo();
112   ~ReliablePacketInfo();
113 
114   // The stream id with which the message was sent.
115   uint32 stream_id;
116 
117   // If reliable delivery was requested, the persistent id of the message.
118   std::string persistent_id;
119 
120   // The type of message itself (for easier lookup).
121   uint8 tag;
122 
123   // The protobuf of the message itself.
124   MCSProto protobuf;
125 };
126 
ReliablePacketInfo()127 ReliablePacketInfo::ReliablePacketInfo()
128   : stream_id(0), tag(0) {
129 }
~ReliablePacketInfo()130 ReliablePacketInfo::~ReliablePacketInfo() {}
131 
GetSendQueueSize() const132 int MCSClient::GetSendQueueSize() const {
133   return to_send_.size();
134 }
135 
GetResendQueueSize() const136 int MCSClient::GetResendQueueSize() const {
137   return to_resend_.size();
138 }
139 
GetStateString() const140 std::string MCSClient::GetStateString() const {
141   switch(state_) {
142     case UNINITIALIZED:
143       return "UNINITIALIZED";
144     case LOADED:
145       return "LOADED";
146     case CONNECTING:
147       return "CONNECTING";
148     case CONNECTED:
149       return "CONNECTED";
150     default:
151       NOTREACHED();
152       return std::string();
153   }
154 }
155 
MCSClient(const std::string & version_string,base::Clock * clock,ConnectionFactory * connection_factory,GCMStore * gcm_store,GCMStatsRecorder * recorder)156 MCSClient::MCSClient(const std::string& version_string,
157                      base::Clock* clock,
158                      ConnectionFactory* connection_factory,
159                      GCMStore* gcm_store,
160                      GCMStatsRecorder* recorder)
161     : version_string_(version_string),
162       clock_(clock),
163       state_(UNINITIALIZED),
164       android_id_(0),
165       security_token_(0),
166       connection_factory_(connection_factory),
167       connection_handler_(NULL),
168       last_device_to_server_stream_id_received_(0),
169       last_server_to_device_stream_id_received_(0),
170       stream_id_out_(0),
171       stream_id_in_(0),
172       gcm_store_(gcm_store),
173       recorder_(recorder),
174       weak_ptr_factory_(this) {
175 }
176 
~MCSClient()177 MCSClient::~MCSClient() {
178 }
179 
Initialize(const ErrorCallback & error_callback,const OnMessageReceivedCallback & message_received_callback,const OnMessageSentCallback & message_sent_callback,scoped_ptr<GCMStore::LoadResult> load_result)180 void MCSClient::Initialize(
181     const ErrorCallback& error_callback,
182     const OnMessageReceivedCallback& message_received_callback,
183     const OnMessageSentCallback& message_sent_callback,
184     scoped_ptr<GCMStore::LoadResult> load_result) {
185   DCHECK_EQ(state_, UNINITIALIZED);
186 
187   state_ = LOADED;
188   mcs_error_callback_ = error_callback;
189   message_received_callback_ = message_received_callback;
190   message_sent_callback_ = message_sent_callback;
191 
192   connection_factory_->Initialize(
193       base::Bind(&MCSClient::ResetStateAndBuildLoginRequest,
194                  weak_ptr_factory_.GetWeakPtr()),
195       base::Bind(&MCSClient::HandlePacketFromWire,
196                  weak_ptr_factory_.GetWeakPtr()),
197       base::Bind(&MCSClient::MaybeSendMessage,
198                  weak_ptr_factory_.GetWeakPtr()));
199   connection_handler_ = connection_factory_->GetConnectionHandler();
200 
201   stream_id_out_ = 1;  // Login request is hardcoded to id 1.
202 
203   android_id_ = load_result->device_android_id;
204   security_token_ = load_result->device_security_token;
205 
206   if (android_id_ == 0) {
207     DVLOG(1) << "No device credentials found, assuming new client.";
208     // No need to try and load RMQ data in that case.
209     return;
210   }
211 
212   // |android_id_| is non-zero, so should |security_token_|.
213   DCHECK_NE(0u, security_token_) << "Security token invalid, while android id"
214                                  << " is non-zero.";
215 
216   DVLOG(1) << "RMQ Load finished with " << load_result->incoming_messages.size()
217            << " incoming acks pending and "
218            << load_result->outgoing_messages.size()
219            << " outgoing messages pending.";
220 
221   restored_unackeds_server_ids_ = load_result->incoming_messages;
222 
223   // First go through and order the outgoing messages by recency.
224   std::map<uint64, google::protobuf::MessageLite*> ordered_messages;
225   std::vector<PersistentId> expired_ttl_ids;
226   for (GCMStore::OutgoingMessageMap::iterator iter =
227            load_result->outgoing_messages.begin();
228        iter != load_result->outgoing_messages.end(); ++iter) {
229     uint64 timestamp = 0;
230     if (!base::StringToUint64(iter->first, &timestamp)) {
231       LOG(ERROR) << "Invalid restored message.";
232       // TODO(fgorski): Error: data unreadable
233       mcs_error_callback_.Run();
234       return;
235     }
236 
237     // Check if the TTL has expired for this message.
238     if (HasTTLExpired(*iter->second, clock_)) {
239       expired_ttl_ids.push_back(iter->first);
240       NotifyMessageSendStatus(*iter->second, TTL_EXCEEDED);
241       continue;
242     }
243 
244     ordered_messages[timestamp] = iter->second.release();
245   }
246 
247   if (!expired_ttl_ids.empty()) {
248     gcm_store_->RemoveOutgoingMessages(
249         expired_ttl_ids,
250         base::Bind(&MCSClient::OnGCMUpdateFinished,
251                    weak_ptr_factory_.GetWeakPtr()));
252   }
253 
254   // Now go through and add the outgoing messages to the send queue in their
255   // appropriate order (oldest at front, most recent at back).
256   for (std::map<uint64, google::protobuf::MessageLite*>::iterator
257            iter = ordered_messages.begin();
258        iter != ordered_messages.end(); ++iter) {
259     ReliablePacketInfo* packet_info = new ReliablePacketInfo();
260     packet_info->protobuf.reset(iter->second);
261     packet_info->tag = GetMCSProtoTag(*iter->second);
262     packet_info->persistent_id = base::Uint64ToString(iter->first);
263     to_send_.push_back(make_linked_ptr(packet_info));
264 
265     if (packet_info->tag == kDataMessageStanzaTag) {
266       mcs_proto::DataMessageStanza* data_message =
267           reinterpret_cast<mcs_proto::DataMessageStanza*>(
268               packet_info->protobuf.get());
269       CollapseKey collapse_key(*data_message);
270       if (collapse_key.IsValid())
271         collapse_key_map_[collapse_key] = packet_info;
272     }
273   }
274 }
275 
Login(uint64 android_id,uint64 security_token)276 void MCSClient::Login(uint64 android_id, uint64 security_token) {
277   DCHECK_EQ(state_, LOADED);
278   DCHECK(android_id_ == 0 || android_id_ == android_id);
279   DCHECK(security_token_ == 0 || security_token_ == security_token);
280 
281   if (android_id != android_id_ && security_token != security_token_) {
282     DCHECK(android_id);
283     DCHECK(security_token);
284     android_id_ = android_id;
285     security_token_ = security_token;
286   }
287 
288   DCHECK(android_id_ != 0 || restored_unackeds_server_ids_.empty());
289 
290   state_ = CONNECTING;
291   connection_factory_->Connect();
292 }
293 
SendMessage(const MCSMessage & message)294 void MCSClient::SendMessage(const MCSMessage& message) {
295   int ttl = GetTTL(message.GetProtobuf());
296   DCHECK_GE(ttl, 0);
297   if (to_send_.size() > kMaxSendQueueSize) {
298     NotifyMessageSendStatus(message.GetProtobuf(), QUEUE_SIZE_LIMIT_REACHED);
299     return;
300   }
301   if (message.size() > kMaxMessageBytes) {
302     NotifyMessageSendStatus(message.GetProtobuf(), MESSAGE_TOO_LARGE);
303     return;
304   }
305 
306   scoped_ptr<ReliablePacketInfo> packet_info(new ReliablePacketInfo());
307   packet_info->tag = message.tag();
308   packet_info->protobuf = message.CloneProtobuf();
309 
310   if (ttl > 0) {
311     DCHECK_EQ(message.tag(), kDataMessageStanzaTag);
312 
313     // First check if this message should replace a pending message with the
314     // same collapse key.
315     mcs_proto::DataMessageStanza* data_message =
316         reinterpret_cast<mcs_proto::DataMessageStanza*>(
317             packet_info->protobuf.get());
318     CollapseKey collapse_key(*data_message);
319     if (collapse_key.IsValid() && collapse_key_map_.count(collapse_key) > 0) {
320       ReliablePacketInfo* original_packet = collapse_key_map_[collapse_key];
321       DVLOG(1) << "Found matching collapse key, Reusing persistent id of "
322                << original_packet->persistent_id;
323       original_packet->protobuf = packet_info->protobuf.Pass();
324       SetPersistentId(original_packet->persistent_id,
325                       original_packet->protobuf.get());
326       gcm_store_->OverwriteOutgoingMessage(
327           original_packet->persistent_id,
328           message,
329           base::Bind(&MCSClient::OnGCMUpdateFinished,
330                      weak_ptr_factory_.GetWeakPtr()));
331 
332       // The message is already queued, return.
333       return;
334     } else {
335       PersistentId persistent_id = GetNextPersistentId();
336       DVLOG(1) << "Setting persistent id to " << persistent_id;
337       packet_info->persistent_id = persistent_id;
338       SetPersistentId(persistent_id, packet_info->protobuf.get());
339       if (!gcm_store_->AddOutgoingMessage(
340                persistent_id,
341                MCSMessage(message.tag(), *(packet_info->protobuf)),
342                base::Bind(&MCSClient::OnGCMUpdateFinished,
343                           weak_ptr_factory_.GetWeakPtr()))) {
344         NotifyMessageSendStatus(message.GetProtobuf(),
345                                 APP_QUEUE_SIZE_LIMIT_REACHED);
346         return;
347       }
348     }
349 
350     if (collapse_key.IsValid())
351       collapse_key_map_[collapse_key] = packet_info.get();
352   } else if (!connection_factory_->IsEndpointReachable()) {
353     DVLOG(1) << "No active connection, dropping message.";
354     NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL);
355     return;
356   }
357 
358   to_send_.push_back(make_linked_ptr(packet_info.release()));
359 
360   // Notify that the messages has been succsfully queued for sending.
361   // TODO(jianli): We should report QUEUED after writing to GCM store succeeds.
362   NotifyMessageSendStatus(message.GetProtobuf(), QUEUED);
363 
364   MaybeSendMessage();
365 }
366 
ResetStateAndBuildLoginRequest(mcs_proto::LoginRequest * request)367 void MCSClient::ResetStateAndBuildLoginRequest(
368     mcs_proto::LoginRequest* request) {
369   DCHECK(android_id_);
370   DCHECK(security_token_);
371   stream_id_in_ = 0;
372   stream_id_out_ = 1;
373   last_device_to_server_stream_id_received_ = 0;
374   last_server_to_device_stream_id_received_ = 0;
375 
376   heartbeat_manager_.Stop();
377 
378   // Add any pending acknowledgments to the list of ids.
379   for (StreamIdToPersistentIdMap::const_iterator iter =
380            unacked_server_ids_.begin();
381        iter != unacked_server_ids_.end(); ++iter) {
382     restored_unackeds_server_ids_.push_back(iter->second);
383   }
384   unacked_server_ids_.clear();
385 
386   // Any acknowledged server ids which have not been confirmed by the server
387   // are treated like unacknowledged ids.
388   for (std::map<StreamId, PersistentIdList>::const_iterator iter =
389            acked_server_ids_.begin();
390        iter != acked_server_ids_.end(); ++iter) {
391     restored_unackeds_server_ids_.insert(restored_unackeds_server_ids_.end(),
392                                          iter->second.begin(),
393                                          iter->second.end());
394   }
395   acked_server_ids_.clear();
396 
397   // Then build the request, consuming all pending acknowledgments.
398   request->Swap(BuildLoginRequest(android_id_,
399                                   security_token_,
400                                   version_string_).get());
401   for (PersistentIdList::const_iterator iter =
402            restored_unackeds_server_ids_.begin();
403        iter != restored_unackeds_server_ids_.end(); ++iter) {
404     request->add_received_persistent_id(*iter);
405   }
406   acked_server_ids_[stream_id_out_] = restored_unackeds_server_ids_;
407   restored_unackeds_server_ids_.clear();
408 
409   // Push all unacknowledged messages to front of send queue. No need to save
410   // to RMQ, as all messages that reach this point should already have been
411   // saved as necessary.
412   while (!to_resend_.empty()) {
413     to_send_.push_front(to_resend_.back());
414     to_resend_.pop_back();
415   }
416 
417   // Drop all TTL == 0 or expired TTL messages from the queue.
418   std::deque<MCSPacketInternal> new_to_send;
419   std::vector<PersistentId> expired_ttl_ids;
420   while (!to_send_.empty()) {
421     MCSPacketInternal packet = PopMessageForSend();
422     if (GetTTL(*packet->protobuf) > 0 &&
423         !HasTTLExpired(*packet->protobuf, clock_)) {
424       new_to_send.push_back(packet);
425     } else {
426       // If the TTL was 0 there is no persistent id, so no need to remove the
427       // message from the persistent store.
428       if (!packet->persistent_id.empty())
429         expired_ttl_ids.push_back(packet->persistent_id);
430       NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED);
431     }
432   }
433 
434   if (!expired_ttl_ids.empty()) {
435     DVLOG(1) << "Connection reset, " << expired_ttl_ids.size()
436              << " messages expired.";
437     gcm_store_->RemoveOutgoingMessages(
438         expired_ttl_ids,
439         base::Bind(&MCSClient::OnGCMUpdateFinished,
440                    weak_ptr_factory_.GetWeakPtr()));
441   }
442 
443   to_send_.swap(new_to_send);
444 
445   DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size()
446            << " incoming acks pending, and " << to_send_.size()
447            << " pending outgoing messages.";
448 
449   state_ = CONNECTING;
450 }
451 
SendHeartbeat()452 void MCSClient::SendHeartbeat() {
453   SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()));
454 }
455 
OnGCMUpdateFinished(bool success)456 void MCSClient::OnGCMUpdateFinished(bool success) {
457   LOG_IF(ERROR, !success) << "GCM Update failed!";
458   UMA_HISTOGRAM_BOOLEAN("GCM.StoreUpdateSucceeded", success);
459   // TODO(zea): Rebuild the store from scratch in case of persistence failure?
460 }
461 
MaybeSendMessage()462 void MCSClient::MaybeSendMessage() {
463   if (to_send_.empty())
464     return;
465 
466   // If the connection has been reset, do nothing. On reconnection
467   // MaybeSendMessage will be automatically invoked again.
468   // TODO(zea): consider doing TTL expiration at connection reset time, rather
469   // than reconnect time.
470   if (!connection_factory_->IsEndpointReachable())
471     return;
472 
473   MCSPacketInternal packet = PopMessageForSend();
474   if (HasTTLExpired(*packet->protobuf, clock_)) {
475     DCHECK(!packet->persistent_id.empty());
476     DVLOG(1) << "Dropping expired message " << packet->persistent_id << ".";
477     NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED);
478     gcm_store_->RemoveOutgoingMessage(
479         packet->persistent_id,
480         base::Bind(&MCSClient::OnGCMUpdateFinished,
481                    weak_ptr_factory_.GetWeakPtr()));
482     base::MessageLoop::current()->PostTask(
483             FROM_HERE,
484             base::Bind(&MCSClient::MaybeSendMessage,
485                        weak_ptr_factory_.GetWeakPtr()));
486     return;
487   }
488   DVLOG(1) << "Pending output message found, sending.";
489   if (!packet->persistent_id.empty())
490     to_resend_.push_back(packet);
491   SendPacketToWire(packet.get());
492 }
493 
SendPacketToWire(ReliablePacketInfo * packet_info)494 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) {
495   packet_info->stream_id = ++stream_id_out_;
496   DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName();
497 
498   // Set the queued time as necessary.
499   if (packet_info->tag == kDataMessageStanzaTag) {
500     mcs_proto::DataMessageStanza* data_message =
501         reinterpret_cast<mcs_proto::DataMessageStanza*>(
502             packet_info->protobuf.get());
503     uint64 sent = data_message->sent();
504     DCHECK_GT(sent, 0U);
505     int queued = (clock_->Now().ToInternalValue() /
506         base::Time::kMicrosecondsPerSecond) - sent;
507     DVLOG(1) << "Message was queued for " << queued << " seconds.";
508     data_message->set_queued(queued);
509     recorder_->RecordDataSentToWire(
510         data_message->category(),
511         data_message->to(),
512         data_message->id(),
513         queued);
514   }
515 
516   // Set the proper last received stream id to acknowledge received server
517   // packets.
518   DVLOG(1) << "Setting last stream id received to "
519            << stream_id_in_;
520   SetLastStreamIdReceived(stream_id_in_,
521                           packet_info->protobuf.get());
522   if (stream_id_in_ != last_server_to_device_stream_id_received_) {
523     last_server_to_device_stream_id_received_ = stream_id_in_;
524     // Mark all acknowledged server messages as such. Note: they're not dropped,
525     // as it may be that they'll need to be re-acked if this message doesn't
526     // make it.
527     PersistentIdList persistent_id_list;
528     for (StreamIdToPersistentIdMap::const_iterator iter =
529              unacked_server_ids_.begin();
530          iter != unacked_server_ids_.end(); ++iter) {
531       DCHECK_LE(iter->first, last_server_to_device_stream_id_received_);
532       persistent_id_list.push_back(iter->second);
533     }
534     unacked_server_ids_.clear();
535     acked_server_ids_[stream_id_out_] = persistent_id_list;
536   }
537 
538   connection_handler_->SendMessage(*packet_info->protobuf);
539 }
540 
HandleMCSDataMesssage(scoped_ptr<google::protobuf::MessageLite> protobuf)541 void MCSClient::HandleMCSDataMesssage(
542     scoped_ptr<google::protobuf::MessageLite> protobuf) {
543   mcs_proto::DataMessageStanza* data_message =
544       reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get());
545   // TODO(zea): implement a proper status manager rather than hardcoding these
546   // values.
547   scoped_ptr<mcs_proto::DataMessageStanza> response(
548       new mcs_proto::DataMessageStanza());
549   response->set_from(kGCMFromField);
550   response->set_sent(clock_->Now().ToInternalValue() /
551                          base::Time::kMicrosecondsPerSecond);
552   response->set_ttl(0);
553   bool send = false;
554   for (int i = 0; i < data_message->app_data_size(); ++i) {
555     const mcs_proto::AppData& app_data = data_message->app_data(i);
556     if (app_data.key() == kIdleNotification) {
557       // Tell the MCS server the client is not idle.
558       send = true;
559       mcs_proto::AppData data;
560       data.set_key(kIdleNotification);
561       data.set_value("false");
562       response->add_app_data()->CopyFrom(data);
563       response->set_category(kMCSCategory);
564     }
565   }
566 
567   if (send) {
568     SendMessage(
569         MCSMessage(kDataMessageStanzaTag,
570                    response.PassAs<const google::protobuf::MessageLite>()));
571   }
572 }
573 
HandlePacketFromWire(scoped_ptr<google::protobuf::MessageLite> protobuf)574 void MCSClient::HandlePacketFromWire(
575     scoped_ptr<google::protobuf::MessageLite> protobuf) {
576   if (!protobuf.get())
577     return;
578   uint8 tag = GetMCSProtoTag(*protobuf);
579   PersistentId persistent_id = GetPersistentId(*protobuf);
580   StreamId last_stream_id_received = GetLastStreamIdReceived(*protobuf);
581 
582   if (last_stream_id_received != 0) {
583     last_device_to_server_stream_id_received_ = last_stream_id_received;
584 
585     // Process device to server messages that have now been acknowledged by the
586     // server. Because messages are stored in order, just pop off all that have
587     // a stream id lower than server's last received stream id.
588     HandleStreamAck(last_stream_id_received);
589 
590     // Process server_to_device_messages that the server now knows were
591     // acknowledged. Again, they're in order, so just keep going until the
592     // stream id is reached.
593     StreamIdList acked_stream_ids_to_remove;
594     for (std::map<StreamId, PersistentIdList>::iterator iter =
595              acked_server_ids_.begin();
596          iter != acked_server_ids_.end() &&
597              iter->first <= last_stream_id_received; ++iter) {
598       acked_stream_ids_to_remove.push_back(iter->first);
599     }
600     for (StreamIdList::iterator iter = acked_stream_ids_to_remove.begin();
601          iter != acked_stream_ids_to_remove.end(); ++iter) {
602       acked_server_ids_.erase(*iter);
603     }
604   }
605 
606   ++stream_id_in_;
607   if (!persistent_id.empty()) {
608     unacked_server_ids_[stream_id_in_] = persistent_id;
609     gcm_store_->AddIncomingMessage(persistent_id,
610                                    base::Bind(&MCSClient::OnGCMUpdateFinished,
611                                               weak_ptr_factory_.GetWeakPtr()));
612   }
613 
614   DVLOG(1) << "Received message of type " << protobuf->GetTypeName()
615            << " with persistent id "
616            << (persistent_id.empty() ? "NULL" : persistent_id)
617            << ", stream id " << stream_id_in_ << " and last stream id received "
618            << last_stream_id_received;
619 
620   if (unacked_server_ids_.size() > 0 &&
621       unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) {
622     SendMessage(MCSMessage(kIqStanzaTag,
623                            BuildStreamAck().
624                                PassAs<const google::protobuf::MessageLite>()));
625   }
626 
627   // The connection is alive, treat this message as a heartbeat ack.
628   heartbeat_manager_.OnHeartbeatAcked();
629 
630   switch (tag) {
631     case kLoginResponseTag: {
632       DCHECK_EQ(CONNECTING, state_);
633       mcs_proto::LoginResponse* login_response =
634           reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get());
635       DVLOG(1) << "Received login response:";
636       DVLOG(1) << "  Id: " << login_response->id();
637       DVLOG(1) << "  Timestamp: " << login_response->server_timestamp();
638       if (login_response->has_error() && login_response->error().code() != 0) {
639         state_ = UNINITIALIZED;
640         DVLOG(1) << "  Error code: " << login_response->error().code();
641         DVLOG(1) << "  Error message: " << login_response->error().message();
642         LOG(ERROR) << "Failed to log in to GCM, resetting connection.";
643         connection_factory_->SignalConnectionReset(
644             ConnectionFactory::LOGIN_FAILURE);
645         mcs_error_callback_.Run();
646         return;
647       }
648 
649       if (login_response->has_heartbeat_config()) {
650         heartbeat_manager_.UpdateHeartbeatConfig(
651             login_response->heartbeat_config());
652       }
653 
654       state_ = CONNECTED;
655       stream_id_in_ = 1;  // To account for the login response.
656       DCHECK_EQ(1U, stream_id_out_);
657 
658       // Pass the login response on up.
659       base::MessageLoop::current()->PostTask(
660           FROM_HERE,
661           base::Bind(message_received_callback_,
662                      MCSMessage(tag,
663                                 protobuf.PassAs<
664                                     const google::protobuf::MessageLite>())));
665 
666       // If there are pending messages, attempt to send one.
667       if (!to_send_.empty()) {
668         base::MessageLoop::current()->PostTask(
669             FROM_HERE,
670             base::Bind(&MCSClient::MaybeSendMessage,
671                        weak_ptr_factory_.GetWeakPtr()));
672       }
673 
674       heartbeat_manager_.Start(
675           base::Bind(&MCSClient::SendHeartbeat,
676                      weak_ptr_factory_.GetWeakPtr()),
677           base::Bind(&MCSClient::OnConnectionResetByHeartbeat,
678                      weak_ptr_factory_.GetWeakPtr()));
679       return;
680     }
681     case kHeartbeatPingTag:
682       DCHECK_GE(stream_id_in_, 1U);
683       DVLOG(1) << "Received heartbeat ping, sending ack.";
684       SendMessage(
685           MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()));
686       return;
687     case kHeartbeatAckTag:
688       DCHECK_GE(stream_id_in_, 1U);
689       DVLOG(1) << "Received heartbeat ack.";
690       // Do nothing else, all messages act as heartbeat acks.
691       return;
692     case kCloseTag:
693       LOG(ERROR) << "Received close command, resetting connection.";
694       state_ = LOADED;
695       connection_factory_->SignalConnectionReset(
696           ConnectionFactory::CLOSE_COMMAND);
697       return;
698     case kIqStanzaTag: {
699       DCHECK_GE(stream_id_in_, 1U);
700       mcs_proto::IqStanza* iq_stanza =
701           reinterpret_cast<mcs_proto::IqStanza*>(protobuf.get());
702       const mcs_proto::Extension& iq_extension = iq_stanza->extension();
703       switch (iq_extension.id()) {
704         case kSelectiveAck: {
705           PersistentIdList acked_ids;
706           if (BuildPersistentIdListFromProto(iq_extension.data(),
707                                              &acked_ids)) {
708             HandleSelectiveAck(acked_ids);
709           }
710           return;
711         }
712         case kStreamAck:
713           // Do nothing. The last received stream id is always processed if it's
714           // present.
715           return;
716         default:
717           LOG(WARNING) << "Received invalid iq stanza extension "
718                        << iq_extension.id();
719           return;
720       }
721     }
722     case kDataMessageStanzaTag: {
723       DCHECK_GE(stream_id_in_, 1U);
724       mcs_proto::DataMessageStanza* data_message =
725           reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get());
726       if (data_message->category() == kMCSCategory) {
727         HandleMCSDataMesssage(protobuf.Pass());
728         return;
729       }
730 
731       DCHECK(protobuf.get());
732       base::MessageLoop::current()->PostTask(
733           FROM_HERE,
734           base::Bind(message_received_callback_,
735                      MCSMessage(tag,
736                                 protobuf.PassAs<
737                                     const google::protobuf::MessageLite>())));
738       return;
739     }
740     default:
741       LOG(ERROR) << "Received unexpected message of type "
742                  << static_cast<int>(tag);
743       return;
744   }
745 }
746 
HandleStreamAck(StreamId last_stream_id_received)747 void MCSClient::HandleStreamAck(StreamId last_stream_id_received) {
748   PersistentIdList acked_outgoing_persistent_ids;
749   StreamIdList acked_outgoing_stream_ids;
750   while (!to_resend_.empty() &&
751          to_resend_.front()->stream_id <= last_stream_id_received) {
752     const MCSPacketInternal& outgoing_packet = to_resend_.front();
753     acked_outgoing_persistent_ids.push_back(outgoing_packet->persistent_id);
754     acked_outgoing_stream_ids.push_back(outgoing_packet->stream_id);
755     NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
756     to_resend_.pop_front();
757   }
758 
759   DVLOG(1) << "Server acked " << acked_outgoing_persistent_ids.size()
760            << " outgoing messages, " << to_resend_.size()
761            << " remaining unacked";
762   gcm_store_->RemoveOutgoingMessages(
763       acked_outgoing_persistent_ids,
764       base::Bind(&MCSClient::OnGCMUpdateFinished,
765                  weak_ptr_factory_.GetWeakPtr()));
766 
767   HandleServerConfirmedReceipt(last_stream_id_received);
768 }
769 
HandleSelectiveAck(const PersistentIdList & id_list)770 void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) {
771   std::set<PersistentId> remaining_ids(id_list.begin(), id_list.end());
772 
773   StreamId last_stream_id_received = 0;
774 
775   // First check the to_resend_ queue. Acknowledgments are always contiguous,
776   // so if there's a pending message that hasn't been acked, all newer messages
777   // must also be unacked.
778   while(!to_resend_.empty() && !remaining_ids.empty()) {
779     const MCSPacketInternal& outgoing_packet = to_resend_.front();
780     if (remaining_ids.count(outgoing_packet->persistent_id) == 0)
781       break;  // Newer message must be unacked too.
782     remaining_ids.erase(outgoing_packet->persistent_id);
783     NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
784 
785     // No need to re-acknowledge any server messages this message already
786     // acknowledged.
787     StreamId device_stream_id = outgoing_packet->stream_id;
788     if (device_stream_id > last_stream_id_received)
789       last_stream_id_received = device_stream_id;
790     to_resend_.pop_front();
791   }
792 
793   // If the acknowledged ids aren't all there, they might be in the to_send_
794   // queue (typically when a SelectiveAck confirms messages as part of a login
795   // response).
796   while (!to_send_.empty() && !remaining_ids.empty()) {
797     const MCSPacketInternal& outgoing_packet = to_send_.front();
798     if (remaining_ids.count(outgoing_packet->persistent_id) == 0)
799       break;  // Newer messages must be unacked too.
800     remaining_ids.erase(outgoing_packet->persistent_id);
801     NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
802 
803     // No need to re-acknowledge any server messages this message already
804     // acknowledged.
805     StreamId device_stream_id = outgoing_packet->stream_id;
806     if (device_stream_id > last_stream_id_received)
807       last_stream_id_received = device_stream_id;
808     PopMessageForSend();
809   }
810 
811   // Only handle the largest stream id value. All other stream ids are
812   // implicitly handled.
813   if (last_stream_id_received > 0)
814     HandleServerConfirmedReceipt(last_stream_id_received);
815 
816   // At this point, all remaining acked ids are redundant.
817   PersistentIdList acked_ids;
818   if (remaining_ids.size() > 0) {
819     for (size_t i = 0; i < id_list.size(); ++i) {
820       if (remaining_ids.count(id_list[i]) > 0)
821         continue;
822       acked_ids.push_back(id_list[i]);
823     }
824   } else {
825     acked_ids = id_list;
826   }
827 
828   DVLOG(1) << "Server acked " << acked_ids.size()
829            << " messages, " << to_resend_.size() << " remaining unacked.";
830   gcm_store_->RemoveOutgoingMessages(
831       acked_ids,
832       base::Bind(&MCSClient::OnGCMUpdateFinished,
833                  weak_ptr_factory_.GetWeakPtr()));
834 
835   // Resend any remaining outgoing messages, as they were not received by the
836   // server.
837   DVLOG(1) << "Resending " << to_resend_.size() << " messages.";
838   while (!to_resend_.empty()) {
839     to_send_.push_front(to_resend_.back());
840     to_resend_.pop_back();
841   }
842   base::MessageLoop::current()->PostTask(
843       FROM_HERE,
844       base::Bind(&MCSClient::MaybeSendMessage,
845                  weak_ptr_factory_.GetWeakPtr()));
846 }
847 
HandleServerConfirmedReceipt(StreamId device_stream_id)848 void MCSClient::HandleServerConfirmedReceipt(StreamId device_stream_id) {
849   PersistentIdList acked_incoming_ids;
850   for (std::map<StreamId, PersistentIdList>::iterator iter =
851            acked_server_ids_.begin();
852        iter != acked_server_ids_.end() &&
853            iter->first <= device_stream_id;) {
854     acked_incoming_ids.insert(acked_incoming_ids.end(),
855                               iter->second.begin(),
856                               iter->second.end());
857     acked_server_ids_.erase(iter++);
858   }
859 
860   DVLOG(1) << "Server confirmed receipt of " << acked_incoming_ids.size()
861            << " acknowledged server messages.";
862   gcm_store_->RemoveIncomingMessages(
863       acked_incoming_ids,
864       base::Bind(&MCSClient::OnGCMUpdateFinished,
865                  weak_ptr_factory_.GetWeakPtr()));
866 }
867 
GetNextPersistentId()868 MCSClient::PersistentId MCSClient::GetNextPersistentId() {
869   return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue());
870 }
871 
OnConnectionResetByHeartbeat()872 void MCSClient::OnConnectionResetByHeartbeat() {
873   connection_factory_->SignalConnectionReset(
874       ConnectionFactory::HEARTBEAT_FAILURE);
875 }
876 
NotifyMessageSendStatus(const google::protobuf::MessageLite & protobuf,MessageSendStatus status)877 void MCSClient::NotifyMessageSendStatus(
878     const google::protobuf::MessageLite& protobuf,
879     MessageSendStatus status) {
880   if (GetMCSProtoTag(protobuf) != kDataMessageStanzaTag)
881     return;
882 
883   const mcs_proto::DataMessageStanza* data_message_stanza =
884       reinterpret_cast<const mcs_proto::DataMessageStanza*>(&protobuf);
885   recorder_->RecordNotifySendStatus(
886       data_message_stanza->category(),
887       data_message_stanza->to(),
888       data_message_stanza->id(),
889       status,
890       protobuf.ByteSize(),
891       data_message_stanza->ttl());
892   message_sent_callback_.Run(
893       data_message_stanza->device_user_id(),
894       data_message_stanza->category(),
895       data_message_stanza->id(),
896       status);
897 }
898 
PopMessageForSend()899 MCSClient::MCSPacketInternal MCSClient::PopMessageForSend() {
900   MCSPacketInternal packet = to_send_.front();
901   to_send_.pop_front();
902 
903   if (packet->tag == kDataMessageStanzaTag) {
904     mcs_proto::DataMessageStanza* data_message =
905         reinterpret_cast<mcs_proto::DataMessageStanza*>(packet->protobuf.get());
906     CollapseKey collapse_key(*data_message);
907     if (collapse_key.IsValid())
908       collapse_key_map_.erase(collapse_key);
909   }
910 
911   return packet;
912 }
913 
914 } // namespace gcm
915