• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h"
6 
7 #include <string>
8 
9 #include "base/base64.h"
10 #include "base/callback.h"
11 #include "base/compiler_specific.h"
12 #include "base/logging.h"
13 #include "base/rand_util.h"
14 #include "base/string_number_conversions.h"
15 #include "google/cacheinvalidation/invalidation-client.h"
16 #include "jingle/notifier/listener/xml_element_util.h"
17 #include "talk/xmpp/constants.h"
18 #include "talk/xmpp/jid.h"
19 #include "talk/xmpp/xmppclient.h"
20 #include "talk/xmpp/xmpptask.h"
21 
22 namespace sync_notifier {
23 
24 namespace {
25 
26 const char kBotJid[] = "tango@bot.talk.google.com";
27 const char kServiceUrl[] = "http://www.google.com/chrome/sync";
28 
29 const buzz::QName kQnData("google:notifier", "data");
30 const buzz::QName kQnSeq("", "seq");
31 const buzz::QName kQnSid("", "sid");
32 const buzz::QName kQnServiceUrl("", "serviceUrl");
33 
34 // TODO(akalin): Move these task classes out so that they can be
35 // unit-tested.  This'll probably be done easier once we consolidate
36 // all the packet sending/receiving classes.
37 
38 // A task that listens for ClientInvalidation messages and calls the
39 // given callback on them.
40 class CacheInvalidationListenTask : public buzz::XmppTask {
41  public:
42   // Takes ownership of callback.
CacheInvalidationListenTask(Task * parent,Callback1<const std::string &>::Type * callback)43   CacheInvalidationListenTask(Task* parent,
44                               Callback1<const std::string&>::Type* callback)
45       : XmppTask(parent, buzz::XmppEngine::HL_TYPE), callback_(callback) {}
~CacheInvalidationListenTask()46   virtual ~CacheInvalidationListenTask() {}
47 
ProcessStart()48   virtual int ProcessStart() {
49     VLOG(2) << "CacheInvalidationListenTask started";
50     return STATE_RESPONSE;
51   }
52 
ProcessResponse()53   virtual int ProcessResponse() {
54     const buzz::XmlElement* stanza = NextStanza();
55     if (stanza == NULL) {
56       VLOG(2) << "CacheInvalidationListenTask blocked";
57       return STATE_BLOCKED;
58     }
59     VLOG(2) << "CacheInvalidationListenTask response received";
60     std::string data;
61     if (GetCacheInvalidationIqPacketData(stanza, &data)) {
62       callback_->Run(data);
63     } else {
64       LOG(ERROR) << "Could not get packet data";
65     }
66     // Acknowledge receipt of the iq to the buzz server.
67     // TODO(akalin): Send an error response for malformed packets.
68     scoped_ptr<buzz::XmlElement> response_stanza(MakeIqResult(stanza));
69     SendStanza(response_stanza.get());
70     return STATE_RESPONSE;
71   }
72 
HandleStanza(const buzz::XmlElement * stanza)73   virtual bool HandleStanza(const buzz::XmlElement* stanza) {
74     VLOG(1) << "Stanza received: "
75               << notifier::XmlElementToString(*stanza);
76     if (IsValidCacheInvalidationIqPacket(stanza)) {
77       VLOG(2) << "Queueing stanza";
78       QueueStanza(stanza);
79       return true;
80     }
81     VLOG(2) << "Stanza skipped";
82     return false;
83   }
84 
85  private:
IsValidCacheInvalidationIqPacket(const buzz::XmlElement * stanza)86   bool IsValidCacheInvalidationIqPacket(const buzz::XmlElement* stanza) {
87     // We deliberately minimize the verification we do here: see
88     // http://crbug.com/71285 .
89     return MatchRequestIq(stanza, buzz::STR_SET, kQnData);
90   }
91 
GetCacheInvalidationIqPacketData(const buzz::XmlElement * stanza,std::string * data)92   bool GetCacheInvalidationIqPacketData(const buzz::XmlElement* stanza,
93                             std::string* data) {
94     DCHECK(IsValidCacheInvalidationIqPacket(stanza));
95     const buzz::XmlElement* cache_invalidation_iq_packet =
96         stanza->FirstNamed(kQnData);
97     if (!cache_invalidation_iq_packet) {
98       LOG(ERROR) << "Could not find cache invalidation IQ packet element";
99       return false;
100     }
101     *data = cache_invalidation_iq_packet->BodyText();
102     return true;
103   }
104 
105   scoped_ptr<Callback1<const std::string&>::Type> callback_;
106   DISALLOW_COPY_AND_ASSIGN(CacheInvalidationListenTask);
107 };
108 
109 // A task that sends a single outbound ClientInvalidation message.
110 class CacheInvalidationSendMessageTask : public buzz::XmppTask {
111  public:
CacheInvalidationSendMessageTask(Task * parent,const buzz::Jid & to_jid,const std::string & msg,int seq,const std::string & sid)112   CacheInvalidationSendMessageTask(Task* parent,
113                                    const buzz::Jid& to_jid,
114                                    const std::string& msg,
115                                    int seq,
116                                    const std::string& sid)
117       : XmppTask(parent, buzz::XmppEngine::HL_SINGLE),
118         to_jid_(to_jid), msg_(msg), seq_(seq), sid_(sid) {}
~CacheInvalidationSendMessageTask()119   virtual ~CacheInvalidationSendMessageTask() {}
120 
ProcessStart()121   virtual int ProcessStart() {
122     scoped_ptr<buzz::XmlElement> stanza(
123         MakeCacheInvalidationIqPacket(to_jid_, task_id(), msg_,
124                                       seq_, sid_));
125     VLOG(1) << "Sending message: "
126               << notifier::XmlElementToString(*stanza.get());
127     if (SendStanza(stanza.get()) != buzz::XMPP_RETURN_OK) {
128       VLOG(2) << "Error when sending message";
129       return STATE_ERROR;
130     }
131     return STATE_RESPONSE;
132   }
133 
ProcessResponse()134   virtual int ProcessResponse() {
135     const buzz::XmlElement* stanza = NextStanza();
136     if (stanza == NULL) {
137       VLOG(2) << "CacheInvalidationSendMessageTask blocked...";
138       return STATE_BLOCKED;
139     }
140     VLOG(2) << "CacheInvalidationSendMessageTask response received: "
141               << notifier::XmlElementToString(*stanza);
142     // TODO(akalin): Handle errors here.
143     return STATE_DONE;
144   }
145 
HandleStanza(const buzz::XmlElement * stanza)146   virtual bool HandleStanza(const buzz::XmlElement* stanza) {
147     VLOG(1) << "Stanza received: "
148               << notifier::XmlElementToString(*stanza);
149     if (!MatchResponseIq(stanza, to_jid_, task_id())) {
150       VLOG(2) << "Stanza skipped";
151       return false;
152     }
153     VLOG(2) << "Queueing stanza";
154     QueueStanza(stanza);
155     return true;
156   }
157 
158  private:
MakeCacheInvalidationIqPacket(const buzz::Jid & to_jid,const std::string & task_id,const std::string & msg,int seq,const std::string & sid)159   static buzz::XmlElement* MakeCacheInvalidationIqPacket(
160       const buzz::Jid& to_jid,
161       const std::string& task_id,
162       const std::string& msg,
163       int seq, const std::string& sid) {
164     buzz::XmlElement* iq = MakeIq(buzz::STR_SET, to_jid, task_id);
165     buzz::XmlElement* cache_invalidation_iq_packet =
166         new buzz::XmlElement(kQnData, true);
167     iq->AddElement(cache_invalidation_iq_packet);
168     cache_invalidation_iq_packet->SetAttr(kQnSeq, base::IntToString(seq));
169     cache_invalidation_iq_packet->SetAttr(kQnSid, sid);
170     cache_invalidation_iq_packet->SetAttr(kQnServiceUrl, kServiceUrl);
171     cache_invalidation_iq_packet->SetBodyText(msg);
172     return iq;
173   }
174 
175   const buzz::Jid to_jid_;
176   std::string msg_;
177   int seq_;
178   std::string sid_;
179 
180   DISALLOW_COPY_AND_ASSIGN(CacheInvalidationSendMessageTask);
181 };
182 
MakeSid()183 std::string MakeSid() {
184   uint64 sid = base::RandUint64();
185   return std::string("chrome-sync-") + base::Uint64ToString(sid);
186 }
187 
188 }  // namespace
189 
CacheInvalidationPacketHandler(base::WeakPtr<talk_base::Task> base_task,invalidation::InvalidationClient * invalidation_client)190 CacheInvalidationPacketHandler::CacheInvalidationPacketHandler(
191     base::WeakPtr<talk_base::Task> base_task,
192     invalidation::InvalidationClient* invalidation_client)
193     : scoped_callback_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
194       base_task_(base_task),
195       invalidation_client_(invalidation_client),
196       seq_(0),
197       sid_(MakeSid()) {
198   CHECK(base_task_.get());
199   // Owned by base_task.  Takes ownership of the callback.
200   CacheInvalidationListenTask* listen_task =
201       new CacheInvalidationListenTask(
202           base_task_, scoped_callback_factory_.NewCallback(
203               &CacheInvalidationPacketHandler::HandleInboundPacket));
204   listen_task->Start();
205 }
206 
~CacheInvalidationPacketHandler()207 CacheInvalidationPacketHandler::~CacheInvalidationPacketHandler() {
208   DCHECK(non_thread_safe_.CalledOnValidThread());
209 }
210 
HandleOutboundPacket(invalidation::NetworkEndpoint * network_endpoint)211 void CacheInvalidationPacketHandler::HandleOutboundPacket(
212     invalidation::NetworkEndpoint* network_endpoint) {
213   DCHECK(non_thread_safe_.CalledOnValidThread());
214   if (!base_task_.get()) {
215     return;
216   }
217   CHECK_EQ(network_endpoint, invalidation_client_->network_endpoint());
218   invalidation::string message;
219   network_endpoint->TakeOutboundMessage(&message);
220   std::string encoded_message;
221   if (!base::Base64Encode(message, &encoded_message)) {
222     LOG(ERROR) << "Could not base64-encode message to send: "
223                << message;
224     return;
225   }
226   // Owned by base_task_.
227   CacheInvalidationSendMessageTask* send_message_task =
228       new CacheInvalidationSendMessageTask(base_task_,
229                                            buzz::Jid(kBotJid),
230                                            encoded_message,
231                                            seq_, sid_);
232   send_message_task->Start();
233   ++seq_;
234 }
235 
HandleInboundPacket(const std::string & packet)236 void CacheInvalidationPacketHandler::HandleInboundPacket(
237     const std::string& packet) {
238   DCHECK(non_thread_safe_.CalledOnValidThread());
239   invalidation::NetworkEndpoint* network_endpoint =
240       invalidation_client_->network_endpoint();
241   std::string decoded_message;
242   if (!base::Base64Decode(packet, &decoded_message)) {
243     LOG(ERROR) << "Could not base64-decode received message: "
244                << packet;
245     return;
246   }
247   network_endpoint->HandleInboundMessage(decoded_message);
248 }
249 
250 }  // namespace sync_notifier
251