• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "sync/notifier/sync_system_resources.h"
6 
7 #include <cstdlib>
8 #include <cstring>
9 #include <string>
10 
11 #include "base/bind.h"
12 #include "base/logging.h"
13 #include "base/message_loop/message_loop.h"
14 #include "base/stl_util.h"
15 #include "base/strings/string_util.h"
16 #include "base/strings/stringprintf.h"
17 #include "google/cacheinvalidation/client_gateway.pb.h"
18 #include "google/cacheinvalidation/deps/callback.h"
19 #include "google/cacheinvalidation/include/types.h"
20 #include "sync/notifier/invalidation_util.h"
21 
22 namespace syncer {
23 
SyncLogger()24 SyncLogger::SyncLogger() {}
~SyncLogger()25 SyncLogger::~SyncLogger() {}
26 
Log(LogLevel level,const char * file,int line,const char * format,...)27 void SyncLogger::Log(LogLevel level, const char* file, int line,
28                      const char* format, ...) {
29   logging::LogSeverity log_severity = -2;  // VLOG(2)
30   bool emit_log = false;
31   switch (level) {
32     case FINE_LEVEL:
33       log_severity = -2;  // VLOG(2)
34       emit_log = VLOG_IS_ON(2);
35       break;
36     case INFO_LEVEL:
37       log_severity = -1;  // VLOG(1)
38       emit_log = VLOG_IS_ON(1);
39       break;
40     case WARNING_LEVEL:
41       log_severity = logging::LOG_WARNING;
42       emit_log = LOG_IS_ON(WARNING);
43       break;
44     case SEVERE_LEVEL:
45       log_severity = logging::LOG_ERROR;
46       emit_log = LOG_IS_ON(ERROR);
47       break;
48   }
49   if (emit_log) {
50     va_list ap;
51     va_start(ap, format);
52     std::string result;
53     base::StringAppendV(&result, format, ap);
54     logging::LogMessage(file, line, log_severity).stream() << result;
55     va_end(ap);
56   }
57 }
58 
SetSystemResources(invalidation::SystemResources * resources)59 void SyncLogger::SetSystemResources(invalidation::SystemResources* resources) {
60   // Do nothing.
61 }
62 
SyncInvalidationScheduler()63 SyncInvalidationScheduler::SyncInvalidationScheduler()
64     : created_on_loop_(base::MessageLoop::current()),
65       is_started_(false),
66       is_stopped_(false),
67       weak_factory_(this) {
68   CHECK(created_on_loop_);
69 }
70 
~SyncInvalidationScheduler()71 SyncInvalidationScheduler::~SyncInvalidationScheduler() {
72   CHECK_EQ(created_on_loop_, base::MessageLoop::current());
73   CHECK(is_stopped_);
74 }
75 
Start()76 void SyncInvalidationScheduler::Start() {
77   CHECK_EQ(created_on_loop_, base::MessageLoop::current());
78   CHECK(!is_started_);
79   is_started_ = true;
80   is_stopped_ = false;
81   weak_factory_.InvalidateWeakPtrs();
82 }
83 
Stop()84 void SyncInvalidationScheduler::Stop() {
85   CHECK_EQ(created_on_loop_, base::MessageLoop::current());
86   is_stopped_ = true;
87   is_started_ = false;
88   weak_factory_.InvalidateWeakPtrs();
89   STLDeleteElements(&posted_tasks_);
90   posted_tasks_.clear();
91 }
92 
Schedule(invalidation::TimeDelta delay,invalidation::Closure * task)93 void SyncInvalidationScheduler::Schedule(invalidation::TimeDelta delay,
94                                          invalidation::Closure* task) {
95   DCHECK(invalidation::IsCallbackRepeatable(task));
96   CHECK_EQ(created_on_loop_, base::MessageLoop::current());
97 
98   if (!is_started_) {
99     delete task;
100     return;
101   }
102 
103   posted_tasks_.insert(task);
104   base::MessageLoop::current()->PostDelayedTask(
105       FROM_HERE, base::Bind(&SyncInvalidationScheduler::RunPostedTask,
106                             weak_factory_.GetWeakPtr(), task),
107       delay);
108 }
109 
IsRunningOnThread() const110 bool SyncInvalidationScheduler::IsRunningOnThread() const {
111   return created_on_loop_ == base::MessageLoop::current();
112 }
113 
GetCurrentTime() const114 invalidation::Time SyncInvalidationScheduler::GetCurrentTime() const {
115   CHECK_EQ(created_on_loop_, base::MessageLoop::current());
116   return base::Time::Now();
117 }
118 
SetSystemResources(invalidation::SystemResources * resources)119 void SyncInvalidationScheduler::SetSystemResources(
120     invalidation::SystemResources* resources) {
121   // Do nothing.
122 }
123 
RunPostedTask(invalidation::Closure * task)124 void SyncInvalidationScheduler::RunPostedTask(invalidation::Closure* task) {
125   CHECK_EQ(created_on_loop_, base::MessageLoop::current());
126   task->Run();
127   posted_tasks_.erase(task);
128   delete task;
129 }
130 
SyncNetworkChannel()131 SyncNetworkChannel::SyncNetworkChannel()
132     : invalidator_state_(DEFAULT_INVALIDATION_ERROR),
133       scheduling_hash_(0) {
134 }
135 
~SyncNetworkChannel()136 SyncNetworkChannel::~SyncNetworkChannel() {
137   STLDeleteElements(&network_status_receivers_);
138 }
139 
SendMessage(const std::string & outgoing_message)140 void SyncNetworkChannel::SendMessage(const std::string& outgoing_message) {
141   std::string encoded_message;
142   EncodeMessage(&encoded_message, outgoing_message, service_context_,
143       scheduling_hash_);
144   SendEncodedMessage(encoded_message);
145 }
146 
SetMessageReceiver(invalidation::MessageCallback * incoming_receiver)147 void SyncNetworkChannel::SetMessageReceiver(
148     invalidation::MessageCallback* incoming_receiver) {
149   incoming_receiver_.reset(incoming_receiver);
150 }
151 
AddNetworkStatusReceiver(invalidation::NetworkStatusCallback * network_status_receiver)152 void SyncNetworkChannel::AddNetworkStatusReceiver(
153     invalidation::NetworkStatusCallback* network_status_receiver) {
154   network_status_receiver->Run(invalidator_state_ == INVALIDATIONS_ENABLED);
155   network_status_receivers_.push_back(network_status_receiver);
156 }
157 
SetSystemResources(invalidation::SystemResources * resources)158 void SyncNetworkChannel::SetSystemResources(
159     invalidation::SystemResources* resources) {
160   // Do nothing.
161 }
162 
AddObserver(Observer * observer)163 void SyncNetworkChannel::AddObserver(Observer* observer) {
164   observers_.AddObserver(observer);
165 }
166 
RemoveObserver(Observer * observer)167 void SyncNetworkChannel::RemoveObserver(Observer* observer) {
168   observers_.RemoveObserver(observer);
169 }
170 
GetServiceContextForTest() const171 const std::string& SyncNetworkChannel::GetServiceContextForTest() const {
172   return service_context_;
173 }
174 
GetSchedulingHashForTest() const175 int64 SyncNetworkChannel::GetSchedulingHashForTest() const {
176   return scheduling_hash_;
177 }
178 
EncodeMessageForTest(const std::string & message,const std::string & service_context,int64 scheduling_hash)179 std::string SyncNetworkChannel::EncodeMessageForTest(
180     const std::string& message, const std::string& service_context,
181     int64 scheduling_hash) {
182   std::string encoded_message;
183   EncodeMessage(&encoded_message, message, service_context, scheduling_hash);
184   return encoded_message;
185 }
186 
DecodeMessageForTest(const std::string & data,std::string * message,std::string * service_context,int64 * scheduling_hash)187 bool SyncNetworkChannel::DecodeMessageForTest(
188     const std::string& data,
189     std::string* message,
190     std::string* service_context,
191     int64* scheduling_hash) {
192   return DecodeMessage(data, message, service_context, scheduling_hash);
193 }
194 
NotifyStateChange(InvalidatorState invalidator_state)195 void SyncNetworkChannel::NotifyStateChange(InvalidatorState invalidator_state) {
196   // Remember state for future NetworkStatusReceivers.
197   invalidator_state_ = invalidator_state;
198   // Notify NetworkStatusReceivers in cacheinvalidation.
199   for (NetworkStatusReceiverList::const_iterator it =
200            network_status_receivers_.begin();
201        it != network_status_receivers_.end(); ++it) {
202     (*it)->Run(invalidator_state_ == INVALIDATIONS_ENABLED);
203   }
204   // Notify observers.
205   FOR_EACH_OBSERVER(Observer, observers_,
206                     OnNetworkChannelStateChanged(invalidator_state_));
207 }
208 
DeliverIncomingMessage(const std::string & data)209 void SyncNetworkChannel::DeliverIncomingMessage(const std::string& data) {
210   if (!incoming_receiver_) {
211     DLOG(ERROR) << "No receiver for incoming notification";
212     return;
213   }
214   std::string message;
215   if (!DecodeMessage(data,
216                      &message, &service_context_, &scheduling_hash_)) {
217     DLOG(ERROR) << "Could not parse ClientGatewayMessage";
218     return;
219   }
220   incoming_receiver_->Run(message);
221 }
222 
EncodeMessage(std::string * encoded_message,const std::string & message,const std::string & service_context,int64 scheduling_hash)223 void SyncNetworkChannel::EncodeMessage(
224     std::string* encoded_message,
225     const std::string& message,
226     const std::string& service_context,
227     int64 scheduling_hash) {
228   ipc::invalidation::ClientGatewayMessage envelope;
229   envelope.set_is_client_to_server(true);
230   if (!service_context.empty()) {
231     envelope.set_service_context(service_context);
232     envelope.set_rpc_scheduling_hash(scheduling_hash);
233   }
234   envelope.set_network_message(message);
235   envelope.SerializeToString(encoded_message);
236 }
237 
238 
DecodeMessage(const std::string & data,std::string * message,std::string * service_context,int64 * scheduling_hash)239 bool SyncNetworkChannel::DecodeMessage(
240     const std::string& data,
241     std::string* message,
242     std::string* service_context,
243     int64* scheduling_hash) {
244   ipc::invalidation::ClientGatewayMessage envelope;
245   if (!envelope.ParseFromString(data)) {
246     return false;
247   }
248   *message = envelope.network_message();
249   if (envelope.has_service_context()) {
250     *service_context = envelope.service_context();
251   }
252   if (envelope.has_rpc_scheduling_hash()) {
253     *scheduling_hash = envelope.rpc_scheduling_hash();
254   }
255   return true;
256 }
257 
258 
SyncStorage(StateWriter * state_writer,invalidation::Scheduler * scheduler)259 SyncStorage::SyncStorage(StateWriter* state_writer,
260                          invalidation::Scheduler* scheduler)
261     : state_writer_(state_writer),
262       scheduler_(scheduler) {
263   DCHECK(state_writer_);
264   DCHECK(scheduler_);
265 }
266 
~SyncStorage()267 SyncStorage::~SyncStorage() {}
268 
WriteKey(const std::string & key,const std::string & value,invalidation::WriteKeyCallback * done)269 void SyncStorage::WriteKey(const std::string& key, const std::string& value,
270                            invalidation::WriteKeyCallback* done) {
271   CHECK(state_writer_);
272   // TODO(ghc): actually write key,value associations, and don't invoke the
273   // callback until the operation completes.
274   state_writer_->WriteState(value);
275   cached_state_ = value;
276   // According to the cache invalidation API folks, we can do this as
277   // long as we make sure to clear the persistent state that we start
278   // up the cache invalidation client with.  However, we musn't do it
279   // right away, as we may be called under a lock that the callback
280   // uses.
281   scheduler_->Schedule(
282       invalidation::Scheduler::NoDelay(),
283       invalidation::NewPermanentCallback(
284           this, &SyncStorage::RunAndDeleteWriteKeyCallback,
285           done));
286 }
287 
ReadKey(const std::string & key,invalidation::ReadKeyCallback * done)288 void SyncStorage::ReadKey(const std::string& key,
289                           invalidation::ReadKeyCallback* done) {
290   DCHECK(scheduler_->IsRunningOnThread()) << "not running on scheduler thread";
291   RunAndDeleteReadKeyCallback(done, cached_state_);
292 }
293 
DeleteKey(const std::string & key,invalidation::DeleteKeyCallback * done)294 void SyncStorage::DeleteKey(const std::string& key,
295                             invalidation::DeleteKeyCallback* done) {
296   // TODO(ghc): Implement.
297   LOG(WARNING) << "ignoring call to DeleteKey(" << key << ", callback)";
298 }
299 
ReadAllKeys(invalidation::ReadAllKeysCallback * done)300 void SyncStorage::ReadAllKeys(invalidation::ReadAllKeysCallback* done) {
301   // TODO(ghc): Implement.
302   LOG(WARNING) << "ignoring call to ReadAllKeys(callback)";
303 }
304 
SetSystemResources(invalidation::SystemResources * resources)305 void SyncStorage::SetSystemResources(
306     invalidation::SystemResources* resources) {
307   // Do nothing.
308 }
309 
RunAndDeleteWriteKeyCallback(invalidation::WriteKeyCallback * callback)310 void SyncStorage::RunAndDeleteWriteKeyCallback(
311     invalidation::WriteKeyCallback* callback) {
312   callback->Run(
313       invalidation::Status(invalidation::Status::SUCCESS, std::string()));
314   delete callback;
315 }
316 
RunAndDeleteReadKeyCallback(invalidation::ReadKeyCallback * callback,const std::string & value)317 void SyncStorage::RunAndDeleteReadKeyCallback(
318     invalidation::ReadKeyCallback* callback, const std::string& value) {
319   callback->Run(std::make_pair(
320       invalidation::Status(invalidation::Status::SUCCESS, std::string()),
321       value));
322   delete callback;
323 }
324 
SyncSystemResources(SyncNetworkChannel * sync_network_channel,StateWriter * state_writer)325 SyncSystemResources::SyncSystemResources(
326     SyncNetworkChannel* sync_network_channel,
327     StateWriter* state_writer)
328     : is_started_(false),
329       logger_(new SyncLogger()),
330       internal_scheduler_(new SyncInvalidationScheduler()),
331       listener_scheduler_(new SyncInvalidationScheduler()),
332       storage_(new SyncStorage(state_writer, internal_scheduler_.get())),
333       sync_network_channel_(sync_network_channel) {
334 }
335 
~SyncSystemResources()336 SyncSystemResources::~SyncSystemResources() {
337   Stop();
338 }
339 
Start()340 void SyncSystemResources::Start() {
341   internal_scheduler_->Start();
342   listener_scheduler_->Start();
343   is_started_ = true;
344 }
345 
Stop()346 void SyncSystemResources::Stop() {
347   internal_scheduler_->Stop();
348   listener_scheduler_->Stop();
349 }
350 
IsStarted() const351 bool SyncSystemResources::IsStarted() const {
352   return is_started_;
353 }
354 
set_platform(const std::string & platform)355 void SyncSystemResources::set_platform(const std::string& platform) {
356   platform_ = platform;
357 }
358 
platform() const359 std::string SyncSystemResources::platform() const {
360   return platform_;
361 }
362 
logger()363 SyncLogger* SyncSystemResources::logger() {
364   return logger_.get();
365 }
366 
storage()367 SyncStorage* SyncSystemResources::storage() {
368   return storage_.get();
369 }
370 
network()371 SyncNetworkChannel* SyncSystemResources::network() {
372   return sync_network_channel_;
373 }
374 
internal_scheduler()375 SyncInvalidationScheduler* SyncSystemResources::internal_scheduler() {
376   return internal_scheduler_.get();
377 }
378 
listener_scheduler()379 SyncInvalidationScheduler* SyncSystemResources::listener_scheduler() {
380   return listener_scheduler_.get();
381 }
382 
383 }  // namespace syncer
384