1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "google_apis/gcm/engine/mcs_client.h"
6
7 #include <set>
8
9 #include "base/basictypes.h"
10 #include "base/message_loop/message_loop.h"
11 #include "base/metrics/histogram.h"
12 #include "base/strings/string_number_conversions.h"
13 #include "base/time/clock.h"
14 #include "base/time/time.h"
15 #include "google_apis/gcm/base/mcs_util.h"
16 #include "google_apis/gcm/base/socket_stream.h"
17 #include "google_apis/gcm/engine/connection_factory.h"
18 #include "google_apis/gcm/monitoring/gcm_stats_recorder.h"
19
20 using namespace google::protobuf::io;
21
22 namespace gcm {
23
24 namespace {
25
26 typedef scoped_ptr<google::protobuf::MessageLite> MCSProto;
27
28 // The category of messages intended for the GCM client itself from MCS.
29 const char kMCSCategory[] = "com.google.android.gsf.gtalkservice";
30
31 // The from field for messages originating in the GCM client.
32 const char kGCMFromField[] = "gcm@android.com";
33
34 // MCS status message types.
35 // TODO(zea): handle these at the GCMClient layer.
36 const char kIdleNotification[] = "IdleNotification";
37 // const char kAlwaysShowOnIdle[] = "ShowAwayOnIdle";
38 // const char kPowerNotification[] = "PowerNotification";
39 // const char kDataActiveNotification[] = "DataActiveNotification";
40
41 // The number of unacked messages to allow before sending a stream ack.
42 // Applies to both incoming and outgoing messages.
43 // TODO(zea): make this server configurable.
44 const int kUnackedMessageBeforeStreamAck = 10;
45
46 // The global maximum number of pending messages to have in the send queue.
47 const size_t kMaxSendQueueSize = 10 * 1024;
48
49 // The maximum message size that can be sent to the server.
50 const int kMaxMessageBytes = 4 * 1024; // 4KB, like the server.
51
52 // Helper for converting a proto persistent id list to a vector of strings.
BuildPersistentIdListFromProto(const google::protobuf::string & bytes,std::vector<std::string> * id_list)53 bool BuildPersistentIdListFromProto(const google::protobuf::string& bytes,
54 std::vector<std::string>* id_list) {
55 mcs_proto::SelectiveAck selective_ack;
56 if (!selective_ack.ParseFromString(bytes))
57 return false;
58 std::vector<std::string> new_list;
59 for (int i = 0; i < selective_ack.id_size(); ++i) {
60 DCHECK(!selective_ack.id(i).empty());
61 new_list.push_back(selective_ack.id(i));
62 }
63 id_list->swap(new_list);
64 return true;
65 }
66
67 } // namespace
68
69 class CollapseKey {
70 public:
71 explicit CollapseKey(const mcs_proto::DataMessageStanza& message);
72 ~CollapseKey();
73
74 // Comparison operator for use in maps.
75 bool operator<(const CollapseKey& right) const;
76
77 // Whether the message had a valid collapse key.
78 bool IsValid() const;
79
token() const80 std::string token() const { return token_; }
app_id() const81 std::string app_id() const { return app_id_; }
device_user_id() const82 int64 device_user_id() const { return device_user_id_; }
83
84 private:
85 const std::string token_;
86 const std::string app_id_;
87 const int64 device_user_id_;
88 };
89
CollapseKey(const mcs_proto::DataMessageStanza & message)90 CollapseKey::CollapseKey(const mcs_proto::DataMessageStanza& message)
91 : token_(message.token()),
92 app_id_(message.category()),
93 device_user_id_(message.device_user_id()) {}
94
~CollapseKey()95 CollapseKey::~CollapseKey() {}
96
IsValid() const97 bool CollapseKey::IsValid() const {
98 // Device user id is optional, but the application id and token are not.
99 return !token_.empty() && !app_id_.empty();
100 }
101
operator <(const CollapseKey & right) const102 bool CollapseKey::operator<(const CollapseKey& right) const {
103 if (device_user_id_ != right.device_user_id())
104 return device_user_id_ < right.device_user_id();
105 if (app_id_ != right.app_id())
106 return app_id_ < right.app_id();
107 return token_ < right.token();
108 }
109
110 struct ReliablePacketInfo {
111 ReliablePacketInfo();
112 ~ReliablePacketInfo();
113
114 // The stream id with which the message was sent.
115 uint32 stream_id;
116
117 // If reliable delivery was requested, the persistent id of the message.
118 std::string persistent_id;
119
120 // The type of message itself (for easier lookup).
121 uint8 tag;
122
123 // The protobuf of the message itself.
124 MCSProto protobuf;
125 };
126
ReliablePacketInfo()127 ReliablePacketInfo::ReliablePacketInfo()
128 : stream_id(0), tag(0) {
129 }
~ReliablePacketInfo()130 ReliablePacketInfo::~ReliablePacketInfo() {}
131
GetSendQueueSize() const132 int MCSClient::GetSendQueueSize() const {
133 return to_send_.size();
134 }
135
GetResendQueueSize() const136 int MCSClient::GetResendQueueSize() const {
137 return to_resend_.size();
138 }
139
GetStateString() const140 std::string MCSClient::GetStateString() const {
141 switch(state_) {
142 case UNINITIALIZED:
143 return "UNINITIALIZED";
144 case LOADED:
145 return "LOADED";
146 case CONNECTING:
147 return "CONNECTING";
148 case CONNECTED:
149 return "CONNECTED";
150 default:
151 NOTREACHED();
152 return std::string();
153 }
154 }
155
MCSClient(const std::string & version_string,base::Clock * clock,ConnectionFactory * connection_factory,GCMStore * gcm_store,GCMStatsRecorder * recorder)156 MCSClient::MCSClient(const std::string& version_string,
157 base::Clock* clock,
158 ConnectionFactory* connection_factory,
159 GCMStore* gcm_store,
160 GCMStatsRecorder* recorder)
161 : version_string_(version_string),
162 clock_(clock),
163 state_(UNINITIALIZED),
164 android_id_(0),
165 security_token_(0),
166 connection_factory_(connection_factory),
167 connection_handler_(NULL),
168 last_device_to_server_stream_id_received_(0),
169 last_server_to_device_stream_id_received_(0),
170 stream_id_out_(0),
171 stream_id_in_(0),
172 gcm_store_(gcm_store),
173 recorder_(recorder),
174 weak_ptr_factory_(this) {
175 }
176
~MCSClient()177 MCSClient::~MCSClient() {
178 }
179
Initialize(const ErrorCallback & error_callback,const OnMessageReceivedCallback & message_received_callback,const OnMessageSentCallback & message_sent_callback,scoped_ptr<GCMStore::LoadResult> load_result)180 void MCSClient::Initialize(
181 const ErrorCallback& error_callback,
182 const OnMessageReceivedCallback& message_received_callback,
183 const OnMessageSentCallback& message_sent_callback,
184 scoped_ptr<GCMStore::LoadResult> load_result) {
185 DCHECK_EQ(state_, UNINITIALIZED);
186
187 state_ = LOADED;
188 mcs_error_callback_ = error_callback;
189 message_received_callback_ = message_received_callback;
190 message_sent_callback_ = message_sent_callback;
191
192 connection_factory_->Initialize(
193 base::Bind(&MCSClient::ResetStateAndBuildLoginRequest,
194 weak_ptr_factory_.GetWeakPtr()),
195 base::Bind(&MCSClient::HandlePacketFromWire,
196 weak_ptr_factory_.GetWeakPtr()),
197 base::Bind(&MCSClient::MaybeSendMessage,
198 weak_ptr_factory_.GetWeakPtr()));
199 connection_handler_ = connection_factory_->GetConnectionHandler();
200
201 stream_id_out_ = 1; // Login request is hardcoded to id 1.
202
203 android_id_ = load_result->device_android_id;
204 security_token_ = load_result->device_security_token;
205
206 if (android_id_ == 0) {
207 DVLOG(1) << "No device credentials found, assuming new client.";
208 // No need to try and load RMQ data in that case.
209 return;
210 }
211
212 // |android_id_| is non-zero, so should |security_token_|.
213 DCHECK_NE(0u, security_token_) << "Security token invalid, while android id"
214 << " is non-zero.";
215
216 DVLOG(1) << "RMQ Load finished with " << load_result->incoming_messages.size()
217 << " incoming acks pending and "
218 << load_result->outgoing_messages.size()
219 << " outgoing messages pending.";
220
221 restored_unackeds_server_ids_ = load_result->incoming_messages;
222
223 // First go through and order the outgoing messages by recency.
224 std::map<uint64, google::protobuf::MessageLite*> ordered_messages;
225 std::vector<PersistentId> expired_ttl_ids;
226 for (GCMStore::OutgoingMessageMap::iterator iter =
227 load_result->outgoing_messages.begin();
228 iter != load_result->outgoing_messages.end(); ++iter) {
229 uint64 timestamp = 0;
230 if (!base::StringToUint64(iter->first, ×tamp)) {
231 LOG(ERROR) << "Invalid restored message.";
232 // TODO(fgorski): Error: data unreadable
233 mcs_error_callback_.Run();
234 return;
235 }
236
237 // Check if the TTL has expired for this message.
238 if (HasTTLExpired(*iter->second, clock_)) {
239 expired_ttl_ids.push_back(iter->first);
240 NotifyMessageSendStatus(*iter->second, TTL_EXCEEDED);
241 continue;
242 }
243
244 ordered_messages[timestamp] = iter->second.release();
245 }
246
247 if (!expired_ttl_ids.empty()) {
248 gcm_store_->RemoveOutgoingMessages(
249 expired_ttl_ids,
250 base::Bind(&MCSClient::OnGCMUpdateFinished,
251 weak_ptr_factory_.GetWeakPtr()));
252 }
253
254 // Now go through and add the outgoing messages to the send queue in their
255 // appropriate order (oldest at front, most recent at back).
256 for (std::map<uint64, google::protobuf::MessageLite*>::iterator
257 iter = ordered_messages.begin();
258 iter != ordered_messages.end(); ++iter) {
259 ReliablePacketInfo* packet_info = new ReliablePacketInfo();
260 packet_info->protobuf.reset(iter->second);
261 packet_info->tag = GetMCSProtoTag(*iter->second);
262 packet_info->persistent_id = base::Uint64ToString(iter->first);
263 to_send_.push_back(make_linked_ptr(packet_info));
264
265 if (packet_info->tag == kDataMessageStanzaTag) {
266 mcs_proto::DataMessageStanza* data_message =
267 reinterpret_cast<mcs_proto::DataMessageStanza*>(
268 packet_info->protobuf.get());
269 CollapseKey collapse_key(*data_message);
270 if (collapse_key.IsValid())
271 collapse_key_map_[collapse_key] = packet_info;
272 }
273 }
274 }
275
Login(uint64 android_id,uint64 security_token)276 void MCSClient::Login(uint64 android_id, uint64 security_token) {
277 DCHECK_EQ(state_, LOADED);
278 DCHECK(android_id_ == 0 || android_id_ == android_id);
279 DCHECK(security_token_ == 0 || security_token_ == security_token);
280
281 if (android_id != android_id_ && security_token != security_token_) {
282 DCHECK(android_id);
283 DCHECK(security_token);
284 android_id_ = android_id;
285 security_token_ = security_token;
286 }
287
288 DCHECK(android_id_ != 0 || restored_unackeds_server_ids_.empty());
289
290 state_ = CONNECTING;
291 connection_factory_->Connect();
292 }
293
SendMessage(const MCSMessage & message)294 void MCSClient::SendMessage(const MCSMessage& message) {
295 int ttl = GetTTL(message.GetProtobuf());
296 DCHECK_GE(ttl, 0);
297 if (to_send_.size() > kMaxSendQueueSize) {
298 NotifyMessageSendStatus(message.GetProtobuf(), QUEUE_SIZE_LIMIT_REACHED);
299 return;
300 }
301 if (message.size() > kMaxMessageBytes) {
302 NotifyMessageSendStatus(message.GetProtobuf(), MESSAGE_TOO_LARGE);
303 return;
304 }
305
306 scoped_ptr<ReliablePacketInfo> packet_info(new ReliablePacketInfo());
307 packet_info->tag = message.tag();
308 packet_info->protobuf = message.CloneProtobuf();
309
310 if (ttl > 0) {
311 DCHECK_EQ(message.tag(), kDataMessageStanzaTag);
312
313 // First check if this message should replace a pending message with the
314 // same collapse key.
315 mcs_proto::DataMessageStanza* data_message =
316 reinterpret_cast<mcs_proto::DataMessageStanza*>(
317 packet_info->protobuf.get());
318 CollapseKey collapse_key(*data_message);
319 if (collapse_key.IsValid() && collapse_key_map_.count(collapse_key) > 0) {
320 ReliablePacketInfo* original_packet = collapse_key_map_[collapse_key];
321 DVLOG(1) << "Found matching collapse key, Reusing persistent id of "
322 << original_packet->persistent_id;
323 original_packet->protobuf = packet_info->protobuf.Pass();
324 SetPersistentId(original_packet->persistent_id,
325 original_packet->protobuf.get());
326 gcm_store_->OverwriteOutgoingMessage(
327 original_packet->persistent_id,
328 message,
329 base::Bind(&MCSClient::OnGCMUpdateFinished,
330 weak_ptr_factory_.GetWeakPtr()));
331
332 // The message is already queued, return.
333 return;
334 } else {
335 PersistentId persistent_id = GetNextPersistentId();
336 DVLOG(1) << "Setting persistent id to " << persistent_id;
337 packet_info->persistent_id = persistent_id;
338 SetPersistentId(persistent_id, packet_info->protobuf.get());
339 if (!gcm_store_->AddOutgoingMessage(
340 persistent_id,
341 MCSMessage(message.tag(), *(packet_info->protobuf)),
342 base::Bind(&MCSClient::OnGCMUpdateFinished,
343 weak_ptr_factory_.GetWeakPtr()))) {
344 NotifyMessageSendStatus(message.GetProtobuf(),
345 APP_QUEUE_SIZE_LIMIT_REACHED);
346 return;
347 }
348 }
349
350 if (collapse_key.IsValid())
351 collapse_key_map_[collapse_key] = packet_info.get();
352 } else if (!connection_factory_->IsEndpointReachable()) {
353 DVLOG(1) << "No active connection, dropping message.";
354 NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL);
355 return;
356 }
357
358 to_send_.push_back(make_linked_ptr(packet_info.release()));
359
360 // Notify that the messages has been succsfully queued for sending.
361 // TODO(jianli): We should report QUEUED after writing to GCM store succeeds.
362 NotifyMessageSendStatus(message.GetProtobuf(), QUEUED);
363
364 MaybeSendMessage();
365 }
366
ResetStateAndBuildLoginRequest(mcs_proto::LoginRequest * request)367 void MCSClient::ResetStateAndBuildLoginRequest(
368 mcs_proto::LoginRequest* request) {
369 DCHECK(android_id_);
370 DCHECK(security_token_);
371 stream_id_in_ = 0;
372 stream_id_out_ = 1;
373 last_device_to_server_stream_id_received_ = 0;
374 last_server_to_device_stream_id_received_ = 0;
375
376 heartbeat_manager_.Stop();
377
378 // Add any pending acknowledgments to the list of ids.
379 for (StreamIdToPersistentIdMap::const_iterator iter =
380 unacked_server_ids_.begin();
381 iter != unacked_server_ids_.end(); ++iter) {
382 restored_unackeds_server_ids_.push_back(iter->second);
383 }
384 unacked_server_ids_.clear();
385
386 // Any acknowledged server ids which have not been confirmed by the server
387 // are treated like unacknowledged ids.
388 for (std::map<StreamId, PersistentIdList>::const_iterator iter =
389 acked_server_ids_.begin();
390 iter != acked_server_ids_.end(); ++iter) {
391 restored_unackeds_server_ids_.insert(restored_unackeds_server_ids_.end(),
392 iter->second.begin(),
393 iter->second.end());
394 }
395 acked_server_ids_.clear();
396
397 // Then build the request, consuming all pending acknowledgments.
398 request->Swap(BuildLoginRequest(android_id_,
399 security_token_,
400 version_string_).get());
401 for (PersistentIdList::const_iterator iter =
402 restored_unackeds_server_ids_.begin();
403 iter != restored_unackeds_server_ids_.end(); ++iter) {
404 request->add_received_persistent_id(*iter);
405 }
406 acked_server_ids_[stream_id_out_] = restored_unackeds_server_ids_;
407 restored_unackeds_server_ids_.clear();
408
409 // Push all unacknowledged messages to front of send queue. No need to save
410 // to RMQ, as all messages that reach this point should already have been
411 // saved as necessary.
412 while (!to_resend_.empty()) {
413 to_send_.push_front(to_resend_.back());
414 to_resend_.pop_back();
415 }
416
417 // Drop all TTL == 0 or expired TTL messages from the queue.
418 std::deque<MCSPacketInternal> new_to_send;
419 std::vector<PersistentId> expired_ttl_ids;
420 while (!to_send_.empty()) {
421 MCSPacketInternal packet = PopMessageForSend();
422 if (GetTTL(*packet->protobuf) > 0 &&
423 !HasTTLExpired(*packet->protobuf, clock_)) {
424 new_to_send.push_back(packet);
425 } else {
426 // If the TTL was 0 there is no persistent id, so no need to remove the
427 // message from the persistent store.
428 if (!packet->persistent_id.empty())
429 expired_ttl_ids.push_back(packet->persistent_id);
430 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED);
431 }
432 }
433
434 if (!expired_ttl_ids.empty()) {
435 DVLOG(1) << "Connection reset, " << expired_ttl_ids.size()
436 << " messages expired.";
437 gcm_store_->RemoveOutgoingMessages(
438 expired_ttl_ids,
439 base::Bind(&MCSClient::OnGCMUpdateFinished,
440 weak_ptr_factory_.GetWeakPtr()));
441 }
442
443 to_send_.swap(new_to_send);
444
445 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size()
446 << " incoming acks pending, and " << to_send_.size()
447 << " pending outgoing messages.";
448
449 state_ = CONNECTING;
450 }
451
SendHeartbeat()452 void MCSClient::SendHeartbeat() {
453 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()));
454 }
455
OnGCMUpdateFinished(bool success)456 void MCSClient::OnGCMUpdateFinished(bool success) {
457 LOG_IF(ERROR, !success) << "GCM Update failed!";
458 UMA_HISTOGRAM_BOOLEAN("GCM.StoreUpdateSucceeded", success);
459 // TODO(zea): Rebuild the store from scratch in case of persistence failure?
460 }
461
MaybeSendMessage()462 void MCSClient::MaybeSendMessage() {
463 if (to_send_.empty())
464 return;
465
466 // If the connection has been reset, do nothing. On reconnection
467 // MaybeSendMessage will be automatically invoked again.
468 // TODO(zea): consider doing TTL expiration at connection reset time, rather
469 // than reconnect time.
470 if (!connection_factory_->IsEndpointReachable())
471 return;
472
473 MCSPacketInternal packet = PopMessageForSend();
474 if (HasTTLExpired(*packet->protobuf, clock_)) {
475 DCHECK(!packet->persistent_id.empty());
476 DVLOG(1) << "Dropping expired message " << packet->persistent_id << ".";
477 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED);
478 gcm_store_->RemoveOutgoingMessage(
479 packet->persistent_id,
480 base::Bind(&MCSClient::OnGCMUpdateFinished,
481 weak_ptr_factory_.GetWeakPtr()));
482 base::MessageLoop::current()->PostTask(
483 FROM_HERE,
484 base::Bind(&MCSClient::MaybeSendMessage,
485 weak_ptr_factory_.GetWeakPtr()));
486 return;
487 }
488 DVLOG(1) << "Pending output message found, sending.";
489 if (!packet->persistent_id.empty())
490 to_resend_.push_back(packet);
491 SendPacketToWire(packet.get());
492 }
493
SendPacketToWire(ReliablePacketInfo * packet_info)494 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) {
495 packet_info->stream_id = ++stream_id_out_;
496 DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName();
497
498 // Set the queued time as necessary.
499 if (packet_info->tag == kDataMessageStanzaTag) {
500 mcs_proto::DataMessageStanza* data_message =
501 reinterpret_cast<mcs_proto::DataMessageStanza*>(
502 packet_info->protobuf.get());
503 uint64 sent = data_message->sent();
504 DCHECK_GT(sent, 0U);
505 int queued = (clock_->Now().ToInternalValue() /
506 base::Time::kMicrosecondsPerSecond) - sent;
507 DVLOG(1) << "Message was queued for " << queued << " seconds.";
508 data_message->set_queued(queued);
509 recorder_->RecordDataSentToWire(
510 data_message->category(),
511 data_message->to(),
512 data_message->id(),
513 queued);
514 }
515
516 // Set the proper last received stream id to acknowledge received server
517 // packets.
518 DVLOG(1) << "Setting last stream id received to "
519 << stream_id_in_;
520 SetLastStreamIdReceived(stream_id_in_,
521 packet_info->protobuf.get());
522 if (stream_id_in_ != last_server_to_device_stream_id_received_) {
523 last_server_to_device_stream_id_received_ = stream_id_in_;
524 // Mark all acknowledged server messages as such. Note: they're not dropped,
525 // as it may be that they'll need to be re-acked if this message doesn't
526 // make it.
527 PersistentIdList persistent_id_list;
528 for (StreamIdToPersistentIdMap::const_iterator iter =
529 unacked_server_ids_.begin();
530 iter != unacked_server_ids_.end(); ++iter) {
531 DCHECK_LE(iter->first, last_server_to_device_stream_id_received_);
532 persistent_id_list.push_back(iter->second);
533 }
534 unacked_server_ids_.clear();
535 acked_server_ids_[stream_id_out_] = persistent_id_list;
536 }
537
538 connection_handler_->SendMessage(*packet_info->protobuf);
539 }
540
HandleMCSDataMesssage(scoped_ptr<google::protobuf::MessageLite> protobuf)541 void MCSClient::HandleMCSDataMesssage(
542 scoped_ptr<google::protobuf::MessageLite> protobuf) {
543 mcs_proto::DataMessageStanza* data_message =
544 reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get());
545 // TODO(zea): implement a proper status manager rather than hardcoding these
546 // values.
547 scoped_ptr<mcs_proto::DataMessageStanza> response(
548 new mcs_proto::DataMessageStanza());
549 response->set_from(kGCMFromField);
550 response->set_sent(clock_->Now().ToInternalValue() /
551 base::Time::kMicrosecondsPerSecond);
552 response->set_ttl(0);
553 bool send = false;
554 for (int i = 0; i < data_message->app_data_size(); ++i) {
555 const mcs_proto::AppData& app_data = data_message->app_data(i);
556 if (app_data.key() == kIdleNotification) {
557 // Tell the MCS server the client is not idle.
558 send = true;
559 mcs_proto::AppData data;
560 data.set_key(kIdleNotification);
561 data.set_value("false");
562 response->add_app_data()->CopyFrom(data);
563 response->set_category(kMCSCategory);
564 }
565 }
566
567 if (send) {
568 SendMessage(
569 MCSMessage(kDataMessageStanzaTag,
570 response.PassAs<const google::protobuf::MessageLite>()));
571 }
572 }
573
HandlePacketFromWire(scoped_ptr<google::protobuf::MessageLite> protobuf)574 void MCSClient::HandlePacketFromWire(
575 scoped_ptr<google::protobuf::MessageLite> protobuf) {
576 if (!protobuf.get())
577 return;
578 uint8 tag = GetMCSProtoTag(*protobuf);
579 PersistentId persistent_id = GetPersistentId(*protobuf);
580 StreamId last_stream_id_received = GetLastStreamIdReceived(*protobuf);
581
582 if (last_stream_id_received != 0) {
583 last_device_to_server_stream_id_received_ = last_stream_id_received;
584
585 // Process device to server messages that have now been acknowledged by the
586 // server. Because messages are stored in order, just pop off all that have
587 // a stream id lower than server's last received stream id.
588 HandleStreamAck(last_stream_id_received);
589
590 // Process server_to_device_messages that the server now knows were
591 // acknowledged. Again, they're in order, so just keep going until the
592 // stream id is reached.
593 StreamIdList acked_stream_ids_to_remove;
594 for (std::map<StreamId, PersistentIdList>::iterator iter =
595 acked_server_ids_.begin();
596 iter != acked_server_ids_.end() &&
597 iter->first <= last_stream_id_received; ++iter) {
598 acked_stream_ids_to_remove.push_back(iter->first);
599 }
600 for (StreamIdList::iterator iter = acked_stream_ids_to_remove.begin();
601 iter != acked_stream_ids_to_remove.end(); ++iter) {
602 acked_server_ids_.erase(*iter);
603 }
604 }
605
606 ++stream_id_in_;
607 if (!persistent_id.empty()) {
608 unacked_server_ids_[stream_id_in_] = persistent_id;
609 gcm_store_->AddIncomingMessage(persistent_id,
610 base::Bind(&MCSClient::OnGCMUpdateFinished,
611 weak_ptr_factory_.GetWeakPtr()));
612 }
613
614 DVLOG(1) << "Received message of type " << protobuf->GetTypeName()
615 << " with persistent id "
616 << (persistent_id.empty() ? "NULL" : persistent_id)
617 << ", stream id " << stream_id_in_ << " and last stream id received "
618 << last_stream_id_received;
619
620 if (unacked_server_ids_.size() > 0 &&
621 unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) {
622 SendMessage(MCSMessage(kIqStanzaTag,
623 BuildStreamAck().
624 PassAs<const google::protobuf::MessageLite>()));
625 }
626
627 // The connection is alive, treat this message as a heartbeat ack.
628 heartbeat_manager_.OnHeartbeatAcked();
629
630 switch (tag) {
631 case kLoginResponseTag: {
632 DCHECK_EQ(CONNECTING, state_);
633 mcs_proto::LoginResponse* login_response =
634 reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get());
635 DVLOG(1) << "Received login response:";
636 DVLOG(1) << " Id: " << login_response->id();
637 DVLOG(1) << " Timestamp: " << login_response->server_timestamp();
638 if (login_response->has_error() && login_response->error().code() != 0) {
639 state_ = UNINITIALIZED;
640 DVLOG(1) << " Error code: " << login_response->error().code();
641 DVLOG(1) << " Error message: " << login_response->error().message();
642 LOG(ERROR) << "Failed to log in to GCM, resetting connection.";
643 connection_factory_->SignalConnectionReset(
644 ConnectionFactory::LOGIN_FAILURE);
645 mcs_error_callback_.Run();
646 return;
647 }
648
649 if (login_response->has_heartbeat_config()) {
650 heartbeat_manager_.UpdateHeartbeatConfig(
651 login_response->heartbeat_config());
652 }
653
654 state_ = CONNECTED;
655 stream_id_in_ = 1; // To account for the login response.
656 DCHECK_EQ(1U, stream_id_out_);
657
658 // Pass the login response on up.
659 base::MessageLoop::current()->PostTask(
660 FROM_HERE,
661 base::Bind(message_received_callback_,
662 MCSMessage(tag,
663 protobuf.PassAs<
664 const google::protobuf::MessageLite>())));
665
666 // If there are pending messages, attempt to send one.
667 if (!to_send_.empty()) {
668 base::MessageLoop::current()->PostTask(
669 FROM_HERE,
670 base::Bind(&MCSClient::MaybeSendMessage,
671 weak_ptr_factory_.GetWeakPtr()));
672 }
673
674 heartbeat_manager_.Start(
675 base::Bind(&MCSClient::SendHeartbeat,
676 weak_ptr_factory_.GetWeakPtr()),
677 base::Bind(&MCSClient::OnConnectionResetByHeartbeat,
678 weak_ptr_factory_.GetWeakPtr()));
679 return;
680 }
681 case kHeartbeatPingTag:
682 DCHECK_GE(stream_id_in_, 1U);
683 DVLOG(1) << "Received heartbeat ping, sending ack.";
684 SendMessage(
685 MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()));
686 return;
687 case kHeartbeatAckTag:
688 DCHECK_GE(stream_id_in_, 1U);
689 DVLOG(1) << "Received heartbeat ack.";
690 // Do nothing else, all messages act as heartbeat acks.
691 return;
692 case kCloseTag:
693 LOG(ERROR) << "Received close command, resetting connection.";
694 state_ = LOADED;
695 connection_factory_->SignalConnectionReset(
696 ConnectionFactory::CLOSE_COMMAND);
697 return;
698 case kIqStanzaTag: {
699 DCHECK_GE(stream_id_in_, 1U);
700 mcs_proto::IqStanza* iq_stanza =
701 reinterpret_cast<mcs_proto::IqStanza*>(protobuf.get());
702 const mcs_proto::Extension& iq_extension = iq_stanza->extension();
703 switch (iq_extension.id()) {
704 case kSelectiveAck: {
705 PersistentIdList acked_ids;
706 if (BuildPersistentIdListFromProto(iq_extension.data(),
707 &acked_ids)) {
708 HandleSelectiveAck(acked_ids);
709 }
710 return;
711 }
712 case kStreamAck:
713 // Do nothing. The last received stream id is always processed if it's
714 // present.
715 return;
716 default:
717 LOG(WARNING) << "Received invalid iq stanza extension "
718 << iq_extension.id();
719 return;
720 }
721 }
722 case kDataMessageStanzaTag: {
723 DCHECK_GE(stream_id_in_, 1U);
724 mcs_proto::DataMessageStanza* data_message =
725 reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get());
726 if (data_message->category() == kMCSCategory) {
727 HandleMCSDataMesssage(protobuf.Pass());
728 return;
729 }
730
731 DCHECK(protobuf.get());
732 base::MessageLoop::current()->PostTask(
733 FROM_HERE,
734 base::Bind(message_received_callback_,
735 MCSMessage(tag,
736 protobuf.PassAs<
737 const google::protobuf::MessageLite>())));
738 return;
739 }
740 default:
741 LOG(ERROR) << "Received unexpected message of type "
742 << static_cast<int>(tag);
743 return;
744 }
745 }
746
HandleStreamAck(StreamId last_stream_id_received)747 void MCSClient::HandleStreamAck(StreamId last_stream_id_received) {
748 PersistentIdList acked_outgoing_persistent_ids;
749 StreamIdList acked_outgoing_stream_ids;
750 while (!to_resend_.empty() &&
751 to_resend_.front()->stream_id <= last_stream_id_received) {
752 const MCSPacketInternal& outgoing_packet = to_resend_.front();
753 acked_outgoing_persistent_ids.push_back(outgoing_packet->persistent_id);
754 acked_outgoing_stream_ids.push_back(outgoing_packet->stream_id);
755 NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
756 to_resend_.pop_front();
757 }
758
759 DVLOG(1) << "Server acked " << acked_outgoing_persistent_ids.size()
760 << " outgoing messages, " << to_resend_.size()
761 << " remaining unacked";
762 gcm_store_->RemoveOutgoingMessages(
763 acked_outgoing_persistent_ids,
764 base::Bind(&MCSClient::OnGCMUpdateFinished,
765 weak_ptr_factory_.GetWeakPtr()));
766
767 HandleServerConfirmedReceipt(last_stream_id_received);
768 }
769
HandleSelectiveAck(const PersistentIdList & id_list)770 void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) {
771 std::set<PersistentId> remaining_ids(id_list.begin(), id_list.end());
772
773 StreamId last_stream_id_received = 0;
774
775 // First check the to_resend_ queue. Acknowledgments are always contiguous,
776 // so if there's a pending message that hasn't been acked, all newer messages
777 // must also be unacked.
778 while(!to_resend_.empty() && !remaining_ids.empty()) {
779 const MCSPacketInternal& outgoing_packet = to_resend_.front();
780 if (remaining_ids.count(outgoing_packet->persistent_id) == 0)
781 break; // Newer message must be unacked too.
782 remaining_ids.erase(outgoing_packet->persistent_id);
783 NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
784
785 // No need to re-acknowledge any server messages this message already
786 // acknowledged.
787 StreamId device_stream_id = outgoing_packet->stream_id;
788 if (device_stream_id > last_stream_id_received)
789 last_stream_id_received = device_stream_id;
790 to_resend_.pop_front();
791 }
792
793 // If the acknowledged ids aren't all there, they might be in the to_send_
794 // queue (typically when a SelectiveAck confirms messages as part of a login
795 // response).
796 while (!to_send_.empty() && !remaining_ids.empty()) {
797 const MCSPacketInternal& outgoing_packet = to_send_.front();
798 if (remaining_ids.count(outgoing_packet->persistent_id) == 0)
799 break; // Newer messages must be unacked too.
800 remaining_ids.erase(outgoing_packet->persistent_id);
801 NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
802
803 // No need to re-acknowledge any server messages this message already
804 // acknowledged.
805 StreamId device_stream_id = outgoing_packet->stream_id;
806 if (device_stream_id > last_stream_id_received)
807 last_stream_id_received = device_stream_id;
808 PopMessageForSend();
809 }
810
811 // Only handle the largest stream id value. All other stream ids are
812 // implicitly handled.
813 if (last_stream_id_received > 0)
814 HandleServerConfirmedReceipt(last_stream_id_received);
815
816 // At this point, all remaining acked ids are redundant.
817 PersistentIdList acked_ids;
818 if (remaining_ids.size() > 0) {
819 for (size_t i = 0; i < id_list.size(); ++i) {
820 if (remaining_ids.count(id_list[i]) > 0)
821 continue;
822 acked_ids.push_back(id_list[i]);
823 }
824 } else {
825 acked_ids = id_list;
826 }
827
828 DVLOG(1) << "Server acked " << acked_ids.size()
829 << " messages, " << to_resend_.size() << " remaining unacked.";
830 gcm_store_->RemoveOutgoingMessages(
831 acked_ids,
832 base::Bind(&MCSClient::OnGCMUpdateFinished,
833 weak_ptr_factory_.GetWeakPtr()));
834
835 // Resend any remaining outgoing messages, as they were not received by the
836 // server.
837 DVLOG(1) << "Resending " << to_resend_.size() << " messages.";
838 while (!to_resend_.empty()) {
839 to_send_.push_front(to_resend_.back());
840 to_resend_.pop_back();
841 }
842 base::MessageLoop::current()->PostTask(
843 FROM_HERE,
844 base::Bind(&MCSClient::MaybeSendMessage,
845 weak_ptr_factory_.GetWeakPtr()));
846 }
847
HandleServerConfirmedReceipt(StreamId device_stream_id)848 void MCSClient::HandleServerConfirmedReceipt(StreamId device_stream_id) {
849 PersistentIdList acked_incoming_ids;
850 for (std::map<StreamId, PersistentIdList>::iterator iter =
851 acked_server_ids_.begin();
852 iter != acked_server_ids_.end() &&
853 iter->first <= device_stream_id;) {
854 acked_incoming_ids.insert(acked_incoming_ids.end(),
855 iter->second.begin(),
856 iter->second.end());
857 acked_server_ids_.erase(iter++);
858 }
859
860 DVLOG(1) << "Server confirmed receipt of " << acked_incoming_ids.size()
861 << " acknowledged server messages.";
862 gcm_store_->RemoveIncomingMessages(
863 acked_incoming_ids,
864 base::Bind(&MCSClient::OnGCMUpdateFinished,
865 weak_ptr_factory_.GetWeakPtr()));
866 }
867
GetNextPersistentId()868 MCSClient::PersistentId MCSClient::GetNextPersistentId() {
869 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue());
870 }
871
OnConnectionResetByHeartbeat()872 void MCSClient::OnConnectionResetByHeartbeat() {
873 connection_factory_->SignalConnectionReset(
874 ConnectionFactory::HEARTBEAT_FAILURE);
875 }
876
NotifyMessageSendStatus(const google::protobuf::MessageLite & protobuf,MessageSendStatus status)877 void MCSClient::NotifyMessageSendStatus(
878 const google::protobuf::MessageLite& protobuf,
879 MessageSendStatus status) {
880 if (GetMCSProtoTag(protobuf) != kDataMessageStanzaTag)
881 return;
882
883 const mcs_proto::DataMessageStanza* data_message_stanza =
884 reinterpret_cast<const mcs_proto::DataMessageStanza*>(&protobuf);
885 recorder_->RecordNotifySendStatus(
886 data_message_stanza->category(),
887 data_message_stanza->to(),
888 data_message_stanza->id(),
889 status,
890 protobuf.ByteSize(),
891 data_message_stanza->ttl());
892 message_sent_callback_.Run(
893 data_message_stanza->device_user_id(),
894 data_message_stanza->category(),
895 data_message_stanza->id(),
896 status);
897 }
898
PopMessageForSend()899 MCSClient::MCSPacketInternal MCSClient::PopMessageForSend() {
900 MCSPacketInternal packet = to_send_.front();
901 to_send_.pop_front();
902
903 if (packet->tag == kDataMessageStanzaTag) {
904 mcs_proto::DataMessageStanza* data_message =
905 reinterpret_cast<mcs_proto::DataMessageStanza*>(packet->protobuf.get());
906 CollapseKey collapse_key(*data_message);
907 if (collapse_key.IsValid())
908 collapse_key_map_.erase(collapse_key);
909 }
910
911 return packet;
912 }
913
914 } // namespace gcm
915