1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h"
6
7 #include <string>
8
9 #include "base/base64.h"
10 #include "base/callback.h"
11 #include "base/compiler_specific.h"
12 #include "base/logging.h"
13 #include "base/rand_util.h"
14 #include "base/string_number_conversions.h"
15 #include "google/cacheinvalidation/invalidation-client.h"
16 #include "jingle/notifier/listener/xml_element_util.h"
17 #include "talk/xmpp/constants.h"
18 #include "talk/xmpp/jid.h"
19 #include "talk/xmpp/xmppclient.h"
20 #include "talk/xmpp/xmpptask.h"
21
22 namespace sync_notifier {
23
24 namespace {
25
26 const char kBotJid[] = "tango@bot.talk.google.com";
27 const char kServiceUrl[] = "http://www.google.com/chrome/sync";
28
29 const buzz::QName kQnData("google:notifier", "data");
30 const buzz::QName kQnSeq("", "seq");
31 const buzz::QName kQnSid("", "sid");
32 const buzz::QName kQnServiceUrl("", "serviceUrl");
33
34 // TODO(akalin): Move these task classes out so that they can be
35 // unit-tested. This'll probably be done easier once we consolidate
36 // all the packet sending/receiving classes.
37
38 // A task that listens for ClientInvalidation messages and calls the
39 // given callback on them.
40 class CacheInvalidationListenTask : public buzz::XmppTask {
41 public:
42 // Takes ownership of callback.
CacheInvalidationListenTask(Task * parent,Callback1<const std::string &>::Type * callback)43 CacheInvalidationListenTask(Task* parent,
44 Callback1<const std::string&>::Type* callback)
45 : XmppTask(parent, buzz::XmppEngine::HL_TYPE), callback_(callback) {}
~CacheInvalidationListenTask()46 virtual ~CacheInvalidationListenTask() {}
47
ProcessStart()48 virtual int ProcessStart() {
49 VLOG(2) << "CacheInvalidationListenTask started";
50 return STATE_RESPONSE;
51 }
52
ProcessResponse()53 virtual int ProcessResponse() {
54 const buzz::XmlElement* stanza = NextStanza();
55 if (stanza == NULL) {
56 VLOG(2) << "CacheInvalidationListenTask blocked";
57 return STATE_BLOCKED;
58 }
59 VLOG(2) << "CacheInvalidationListenTask response received";
60 std::string data;
61 if (GetCacheInvalidationIqPacketData(stanza, &data)) {
62 callback_->Run(data);
63 } else {
64 LOG(ERROR) << "Could not get packet data";
65 }
66 // Acknowledge receipt of the iq to the buzz server.
67 // TODO(akalin): Send an error response for malformed packets.
68 scoped_ptr<buzz::XmlElement> response_stanza(MakeIqResult(stanza));
69 SendStanza(response_stanza.get());
70 return STATE_RESPONSE;
71 }
72
HandleStanza(const buzz::XmlElement * stanza)73 virtual bool HandleStanza(const buzz::XmlElement* stanza) {
74 VLOG(1) << "Stanza received: "
75 << notifier::XmlElementToString(*stanza);
76 if (IsValidCacheInvalidationIqPacket(stanza)) {
77 VLOG(2) << "Queueing stanza";
78 QueueStanza(stanza);
79 return true;
80 }
81 VLOG(2) << "Stanza skipped";
82 return false;
83 }
84
85 private:
IsValidCacheInvalidationIqPacket(const buzz::XmlElement * stanza)86 bool IsValidCacheInvalidationIqPacket(const buzz::XmlElement* stanza) {
87 // We deliberately minimize the verification we do here: see
88 // http://crbug.com/71285 .
89 return MatchRequestIq(stanza, buzz::STR_SET, kQnData);
90 }
91
GetCacheInvalidationIqPacketData(const buzz::XmlElement * stanza,std::string * data)92 bool GetCacheInvalidationIqPacketData(const buzz::XmlElement* stanza,
93 std::string* data) {
94 DCHECK(IsValidCacheInvalidationIqPacket(stanza));
95 const buzz::XmlElement* cache_invalidation_iq_packet =
96 stanza->FirstNamed(kQnData);
97 if (!cache_invalidation_iq_packet) {
98 LOG(ERROR) << "Could not find cache invalidation IQ packet element";
99 return false;
100 }
101 *data = cache_invalidation_iq_packet->BodyText();
102 return true;
103 }
104
105 scoped_ptr<Callback1<const std::string&>::Type> callback_;
106 DISALLOW_COPY_AND_ASSIGN(CacheInvalidationListenTask);
107 };
108
109 // A task that sends a single outbound ClientInvalidation message.
110 class CacheInvalidationSendMessageTask : public buzz::XmppTask {
111 public:
CacheInvalidationSendMessageTask(Task * parent,const buzz::Jid & to_jid,const std::string & msg,int seq,const std::string & sid)112 CacheInvalidationSendMessageTask(Task* parent,
113 const buzz::Jid& to_jid,
114 const std::string& msg,
115 int seq,
116 const std::string& sid)
117 : XmppTask(parent, buzz::XmppEngine::HL_SINGLE),
118 to_jid_(to_jid), msg_(msg), seq_(seq), sid_(sid) {}
~CacheInvalidationSendMessageTask()119 virtual ~CacheInvalidationSendMessageTask() {}
120
ProcessStart()121 virtual int ProcessStart() {
122 scoped_ptr<buzz::XmlElement> stanza(
123 MakeCacheInvalidationIqPacket(to_jid_, task_id(), msg_,
124 seq_, sid_));
125 VLOG(1) << "Sending message: "
126 << notifier::XmlElementToString(*stanza.get());
127 if (SendStanza(stanza.get()) != buzz::XMPP_RETURN_OK) {
128 VLOG(2) << "Error when sending message";
129 return STATE_ERROR;
130 }
131 return STATE_RESPONSE;
132 }
133
ProcessResponse()134 virtual int ProcessResponse() {
135 const buzz::XmlElement* stanza = NextStanza();
136 if (stanza == NULL) {
137 VLOG(2) << "CacheInvalidationSendMessageTask blocked...";
138 return STATE_BLOCKED;
139 }
140 VLOG(2) << "CacheInvalidationSendMessageTask response received: "
141 << notifier::XmlElementToString(*stanza);
142 // TODO(akalin): Handle errors here.
143 return STATE_DONE;
144 }
145
HandleStanza(const buzz::XmlElement * stanza)146 virtual bool HandleStanza(const buzz::XmlElement* stanza) {
147 VLOG(1) << "Stanza received: "
148 << notifier::XmlElementToString(*stanza);
149 if (!MatchResponseIq(stanza, to_jid_, task_id())) {
150 VLOG(2) << "Stanza skipped";
151 return false;
152 }
153 VLOG(2) << "Queueing stanza";
154 QueueStanza(stanza);
155 return true;
156 }
157
158 private:
MakeCacheInvalidationIqPacket(const buzz::Jid & to_jid,const std::string & task_id,const std::string & msg,int seq,const std::string & sid)159 static buzz::XmlElement* MakeCacheInvalidationIqPacket(
160 const buzz::Jid& to_jid,
161 const std::string& task_id,
162 const std::string& msg,
163 int seq, const std::string& sid) {
164 buzz::XmlElement* iq = MakeIq(buzz::STR_SET, to_jid, task_id);
165 buzz::XmlElement* cache_invalidation_iq_packet =
166 new buzz::XmlElement(kQnData, true);
167 iq->AddElement(cache_invalidation_iq_packet);
168 cache_invalidation_iq_packet->SetAttr(kQnSeq, base::IntToString(seq));
169 cache_invalidation_iq_packet->SetAttr(kQnSid, sid);
170 cache_invalidation_iq_packet->SetAttr(kQnServiceUrl, kServiceUrl);
171 cache_invalidation_iq_packet->SetBodyText(msg);
172 return iq;
173 }
174
175 const buzz::Jid to_jid_;
176 std::string msg_;
177 int seq_;
178 std::string sid_;
179
180 DISALLOW_COPY_AND_ASSIGN(CacheInvalidationSendMessageTask);
181 };
182
MakeSid()183 std::string MakeSid() {
184 uint64 sid = base::RandUint64();
185 return std::string("chrome-sync-") + base::Uint64ToString(sid);
186 }
187
188 } // namespace
189
CacheInvalidationPacketHandler(base::WeakPtr<talk_base::Task> base_task,invalidation::InvalidationClient * invalidation_client)190 CacheInvalidationPacketHandler::CacheInvalidationPacketHandler(
191 base::WeakPtr<talk_base::Task> base_task,
192 invalidation::InvalidationClient* invalidation_client)
193 : scoped_callback_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
194 base_task_(base_task),
195 invalidation_client_(invalidation_client),
196 seq_(0),
197 sid_(MakeSid()) {
198 CHECK(base_task_.get());
199 // Owned by base_task. Takes ownership of the callback.
200 CacheInvalidationListenTask* listen_task =
201 new CacheInvalidationListenTask(
202 base_task_, scoped_callback_factory_.NewCallback(
203 &CacheInvalidationPacketHandler::HandleInboundPacket));
204 listen_task->Start();
205 }
206
~CacheInvalidationPacketHandler()207 CacheInvalidationPacketHandler::~CacheInvalidationPacketHandler() {
208 DCHECK(non_thread_safe_.CalledOnValidThread());
209 }
210
HandleOutboundPacket(invalidation::NetworkEndpoint * network_endpoint)211 void CacheInvalidationPacketHandler::HandleOutboundPacket(
212 invalidation::NetworkEndpoint* network_endpoint) {
213 DCHECK(non_thread_safe_.CalledOnValidThread());
214 if (!base_task_.get()) {
215 return;
216 }
217 CHECK_EQ(network_endpoint, invalidation_client_->network_endpoint());
218 invalidation::string message;
219 network_endpoint->TakeOutboundMessage(&message);
220 std::string encoded_message;
221 if (!base::Base64Encode(message, &encoded_message)) {
222 LOG(ERROR) << "Could not base64-encode message to send: "
223 << message;
224 return;
225 }
226 // Owned by base_task_.
227 CacheInvalidationSendMessageTask* send_message_task =
228 new CacheInvalidationSendMessageTask(base_task_,
229 buzz::Jid(kBotJid),
230 encoded_message,
231 seq_, sid_);
232 send_message_task->Start();
233 ++seq_;
234 }
235
HandleInboundPacket(const std::string & packet)236 void CacheInvalidationPacketHandler::HandleInboundPacket(
237 const std::string& packet) {
238 DCHECK(non_thread_safe_.CalledOnValidThread());
239 invalidation::NetworkEndpoint* network_endpoint =
240 invalidation_client_->network_endpoint();
241 std::string decoded_message;
242 if (!base::Base64Decode(packet, &decoded_message)) {
243 LOG(ERROR) << "Could not base64-decode received message: "
244 << packet;
245 return;
246 }
247 network_endpoint->HandleInboundMessage(decoded_message);
248 }
249
250 } // namespace sync_notifier
251