1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "sync/notifier/sync_system_resources.h"
6
7 #include <cstdlib>
8 #include <cstring>
9 #include <string>
10
11 #include "base/bind.h"
12 #include "base/logging.h"
13 #include "base/message_loop/message_loop.h"
14 #include "base/stl_util.h"
15 #include "base/strings/string_util.h"
16 #include "base/strings/stringprintf.h"
17 #include "google/cacheinvalidation/client_gateway.pb.h"
18 #include "google/cacheinvalidation/deps/callback.h"
19 #include "google/cacheinvalidation/include/types.h"
20 #include "sync/notifier/invalidation_util.h"
21
22 namespace syncer {
23
SyncLogger()24 SyncLogger::SyncLogger() {}
~SyncLogger()25 SyncLogger::~SyncLogger() {}
26
Log(LogLevel level,const char * file,int line,const char * format,...)27 void SyncLogger::Log(LogLevel level, const char* file, int line,
28 const char* format, ...) {
29 logging::LogSeverity log_severity = -2; // VLOG(2)
30 bool emit_log = false;
31 switch (level) {
32 case FINE_LEVEL:
33 log_severity = -2; // VLOG(2)
34 emit_log = VLOG_IS_ON(2);
35 break;
36 case INFO_LEVEL:
37 log_severity = -1; // VLOG(1)
38 emit_log = VLOG_IS_ON(1);
39 break;
40 case WARNING_LEVEL:
41 log_severity = logging::LOG_WARNING;
42 emit_log = LOG_IS_ON(WARNING);
43 break;
44 case SEVERE_LEVEL:
45 log_severity = logging::LOG_ERROR;
46 emit_log = LOG_IS_ON(ERROR);
47 break;
48 }
49 if (emit_log) {
50 va_list ap;
51 va_start(ap, format);
52 std::string result;
53 base::StringAppendV(&result, format, ap);
54 logging::LogMessage(file, line, log_severity).stream() << result;
55 va_end(ap);
56 }
57 }
58
SetSystemResources(invalidation::SystemResources * resources)59 void SyncLogger::SetSystemResources(invalidation::SystemResources* resources) {
60 // Do nothing.
61 }
62
SyncInvalidationScheduler()63 SyncInvalidationScheduler::SyncInvalidationScheduler()
64 : created_on_loop_(base::MessageLoop::current()),
65 is_started_(false),
66 is_stopped_(false),
67 weak_factory_(this) {
68 CHECK(created_on_loop_);
69 }
70
~SyncInvalidationScheduler()71 SyncInvalidationScheduler::~SyncInvalidationScheduler() {
72 CHECK_EQ(created_on_loop_, base::MessageLoop::current());
73 CHECK(is_stopped_);
74 }
75
Start()76 void SyncInvalidationScheduler::Start() {
77 CHECK_EQ(created_on_loop_, base::MessageLoop::current());
78 CHECK(!is_started_);
79 is_started_ = true;
80 is_stopped_ = false;
81 weak_factory_.InvalidateWeakPtrs();
82 }
83
Stop()84 void SyncInvalidationScheduler::Stop() {
85 CHECK_EQ(created_on_loop_, base::MessageLoop::current());
86 is_stopped_ = true;
87 is_started_ = false;
88 weak_factory_.InvalidateWeakPtrs();
89 STLDeleteElements(&posted_tasks_);
90 posted_tasks_.clear();
91 }
92
Schedule(invalidation::TimeDelta delay,invalidation::Closure * task)93 void SyncInvalidationScheduler::Schedule(invalidation::TimeDelta delay,
94 invalidation::Closure* task) {
95 DCHECK(invalidation::IsCallbackRepeatable(task));
96 CHECK_EQ(created_on_loop_, base::MessageLoop::current());
97
98 if (!is_started_) {
99 delete task;
100 return;
101 }
102
103 posted_tasks_.insert(task);
104 base::MessageLoop::current()->PostDelayedTask(
105 FROM_HERE, base::Bind(&SyncInvalidationScheduler::RunPostedTask,
106 weak_factory_.GetWeakPtr(), task),
107 delay);
108 }
109
IsRunningOnThread() const110 bool SyncInvalidationScheduler::IsRunningOnThread() const {
111 return created_on_loop_ == base::MessageLoop::current();
112 }
113
GetCurrentTime() const114 invalidation::Time SyncInvalidationScheduler::GetCurrentTime() const {
115 CHECK_EQ(created_on_loop_, base::MessageLoop::current());
116 return base::Time::Now();
117 }
118
SetSystemResources(invalidation::SystemResources * resources)119 void SyncInvalidationScheduler::SetSystemResources(
120 invalidation::SystemResources* resources) {
121 // Do nothing.
122 }
123
RunPostedTask(invalidation::Closure * task)124 void SyncInvalidationScheduler::RunPostedTask(invalidation::Closure* task) {
125 CHECK_EQ(created_on_loop_, base::MessageLoop::current());
126 task->Run();
127 posted_tasks_.erase(task);
128 delete task;
129 }
130
SyncNetworkChannel()131 SyncNetworkChannel::SyncNetworkChannel()
132 : invalidator_state_(DEFAULT_INVALIDATION_ERROR),
133 scheduling_hash_(0) {
134 }
135
~SyncNetworkChannel()136 SyncNetworkChannel::~SyncNetworkChannel() {
137 STLDeleteElements(&network_status_receivers_);
138 }
139
SendMessage(const std::string & outgoing_message)140 void SyncNetworkChannel::SendMessage(const std::string& outgoing_message) {
141 std::string encoded_message;
142 EncodeMessage(&encoded_message, outgoing_message, service_context_,
143 scheduling_hash_);
144 SendEncodedMessage(encoded_message);
145 }
146
SetMessageReceiver(invalidation::MessageCallback * incoming_receiver)147 void SyncNetworkChannel::SetMessageReceiver(
148 invalidation::MessageCallback* incoming_receiver) {
149 incoming_receiver_.reset(incoming_receiver);
150 }
151
AddNetworkStatusReceiver(invalidation::NetworkStatusCallback * network_status_receiver)152 void SyncNetworkChannel::AddNetworkStatusReceiver(
153 invalidation::NetworkStatusCallback* network_status_receiver) {
154 network_status_receiver->Run(invalidator_state_ == INVALIDATIONS_ENABLED);
155 network_status_receivers_.push_back(network_status_receiver);
156 }
157
SetSystemResources(invalidation::SystemResources * resources)158 void SyncNetworkChannel::SetSystemResources(
159 invalidation::SystemResources* resources) {
160 // Do nothing.
161 }
162
AddObserver(Observer * observer)163 void SyncNetworkChannel::AddObserver(Observer* observer) {
164 observers_.AddObserver(observer);
165 }
166
RemoveObserver(Observer * observer)167 void SyncNetworkChannel::RemoveObserver(Observer* observer) {
168 observers_.RemoveObserver(observer);
169 }
170
GetServiceContextForTest() const171 const std::string& SyncNetworkChannel::GetServiceContextForTest() const {
172 return service_context_;
173 }
174
GetSchedulingHashForTest() const175 int64 SyncNetworkChannel::GetSchedulingHashForTest() const {
176 return scheduling_hash_;
177 }
178
EncodeMessageForTest(const std::string & message,const std::string & service_context,int64 scheduling_hash)179 std::string SyncNetworkChannel::EncodeMessageForTest(
180 const std::string& message, const std::string& service_context,
181 int64 scheduling_hash) {
182 std::string encoded_message;
183 EncodeMessage(&encoded_message, message, service_context, scheduling_hash);
184 return encoded_message;
185 }
186
DecodeMessageForTest(const std::string & data,std::string * message,std::string * service_context,int64 * scheduling_hash)187 bool SyncNetworkChannel::DecodeMessageForTest(
188 const std::string& data,
189 std::string* message,
190 std::string* service_context,
191 int64* scheduling_hash) {
192 return DecodeMessage(data, message, service_context, scheduling_hash);
193 }
194
NotifyStateChange(InvalidatorState invalidator_state)195 void SyncNetworkChannel::NotifyStateChange(InvalidatorState invalidator_state) {
196 // Remember state for future NetworkStatusReceivers.
197 invalidator_state_ = invalidator_state;
198 // Notify NetworkStatusReceivers in cacheinvalidation.
199 for (NetworkStatusReceiverList::const_iterator it =
200 network_status_receivers_.begin();
201 it != network_status_receivers_.end(); ++it) {
202 (*it)->Run(invalidator_state_ == INVALIDATIONS_ENABLED);
203 }
204 // Notify observers.
205 FOR_EACH_OBSERVER(Observer, observers_,
206 OnNetworkChannelStateChanged(invalidator_state_));
207 }
208
DeliverIncomingMessage(const std::string & data)209 void SyncNetworkChannel::DeliverIncomingMessage(const std::string& data) {
210 if (!incoming_receiver_) {
211 DLOG(ERROR) << "No receiver for incoming notification";
212 return;
213 }
214 std::string message;
215 if (!DecodeMessage(data,
216 &message, &service_context_, &scheduling_hash_)) {
217 DLOG(ERROR) << "Could not parse ClientGatewayMessage";
218 return;
219 }
220 incoming_receiver_->Run(message);
221 }
222
EncodeMessage(std::string * encoded_message,const std::string & message,const std::string & service_context,int64 scheduling_hash)223 void SyncNetworkChannel::EncodeMessage(
224 std::string* encoded_message,
225 const std::string& message,
226 const std::string& service_context,
227 int64 scheduling_hash) {
228 ipc::invalidation::ClientGatewayMessage envelope;
229 envelope.set_is_client_to_server(true);
230 if (!service_context.empty()) {
231 envelope.set_service_context(service_context);
232 envelope.set_rpc_scheduling_hash(scheduling_hash);
233 }
234 envelope.set_network_message(message);
235 envelope.SerializeToString(encoded_message);
236 }
237
238
DecodeMessage(const std::string & data,std::string * message,std::string * service_context,int64 * scheduling_hash)239 bool SyncNetworkChannel::DecodeMessage(
240 const std::string& data,
241 std::string* message,
242 std::string* service_context,
243 int64* scheduling_hash) {
244 ipc::invalidation::ClientGatewayMessage envelope;
245 if (!envelope.ParseFromString(data)) {
246 return false;
247 }
248 *message = envelope.network_message();
249 if (envelope.has_service_context()) {
250 *service_context = envelope.service_context();
251 }
252 if (envelope.has_rpc_scheduling_hash()) {
253 *scheduling_hash = envelope.rpc_scheduling_hash();
254 }
255 return true;
256 }
257
258
SyncStorage(StateWriter * state_writer,invalidation::Scheduler * scheduler)259 SyncStorage::SyncStorage(StateWriter* state_writer,
260 invalidation::Scheduler* scheduler)
261 : state_writer_(state_writer),
262 scheduler_(scheduler) {
263 DCHECK(state_writer_);
264 DCHECK(scheduler_);
265 }
266
~SyncStorage()267 SyncStorage::~SyncStorage() {}
268
WriteKey(const std::string & key,const std::string & value,invalidation::WriteKeyCallback * done)269 void SyncStorage::WriteKey(const std::string& key, const std::string& value,
270 invalidation::WriteKeyCallback* done) {
271 CHECK(state_writer_);
272 // TODO(ghc): actually write key,value associations, and don't invoke the
273 // callback until the operation completes.
274 state_writer_->WriteState(value);
275 cached_state_ = value;
276 // According to the cache invalidation API folks, we can do this as
277 // long as we make sure to clear the persistent state that we start
278 // up the cache invalidation client with. However, we musn't do it
279 // right away, as we may be called under a lock that the callback
280 // uses.
281 scheduler_->Schedule(
282 invalidation::Scheduler::NoDelay(),
283 invalidation::NewPermanentCallback(
284 this, &SyncStorage::RunAndDeleteWriteKeyCallback,
285 done));
286 }
287
ReadKey(const std::string & key,invalidation::ReadKeyCallback * done)288 void SyncStorage::ReadKey(const std::string& key,
289 invalidation::ReadKeyCallback* done) {
290 DCHECK(scheduler_->IsRunningOnThread()) << "not running on scheduler thread";
291 RunAndDeleteReadKeyCallback(done, cached_state_);
292 }
293
DeleteKey(const std::string & key,invalidation::DeleteKeyCallback * done)294 void SyncStorage::DeleteKey(const std::string& key,
295 invalidation::DeleteKeyCallback* done) {
296 // TODO(ghc): Implement.
297 LOG(WARNING) << "ignoring call to DeleteKey(" << key << ", callback)";
298 }
299
ReadAllKeys(invalidation::ReadAllKeysCallback * done)300 void SyncStorage::ReadAllKeys(invalidation::ReadAllKeysCallback* done) {
301 // TODO(ghc): Implement.
302 LOG(WARNING) << "ignoring call to ReadAllKeys(callback)";
303 }
304
SetSystemResources(invalidation::SystemResources * resources)305 void SyncStorage::SetSystemResources(
306 invalidation::SystemResources* resources) {
307 // Do nothing.
308 }
309
RunAndDeleteWriteKeyCallback(invalidation::WriteKeyCallback * callback)310 void SyncStorage::RunAndDeleteWriteKeyCallback(
311 invalidation::WriteKeyCallback* callback) {
312 callback->Run(
313 invalidation::Status(invalidation::Status::SUCCESS, std::string()));
314 delete callback;
315 }
316
RunAndDeleteReadKeyCallback(invalidation::ReadKeyCallback * callback,const std::string & value)317 void SyncStorage::RunAndDeleteReadKeyCallback(
318 invalidation::ReadKeyCallback* callback, const std::string& value) {
319 callback->Run(std::make_pair(
320 invalidation::Status(invalidation::Status::SUCCESS, std::string()),
321 value));
322 delete callback;
323 }
324
SyncSystemResources(SyncNetworkChannel * sync_network_channel,StateWriter * state_writer)325 SyncSystemResources::SyncSystemResources(
326 SyncNetworkChannel* sync_network_channel,
327 StateWriter* state_writer)
328 : is_started_(false),
329 logger_(new SyncLogger()),
330 internal_scheduler_(new SyncInvalidationScheduler()),
331 listener_scheduler_(new SyncInvalidationScheduler()),
332 storage_(new SyncStorage(state_writer, internal_scheduler_.get())),
333 sync_network_channel_(sync_network_channel) {
334 }
335
~SyncSystemResources()336 SyncSystemResources::~SyncSystemResources() {
337 Stop();
338 }
339
Start()340 void SyncSystemResources::Start() {
341 internal_scheduler_->Start();
342 listener_scheduler_->Start();
343 is_started_ = true;
344 }
345
Stop()346 void SyncSystemResources::Stop() {
347 internal_scheduler_->Stop();
348 listener_scheduler_->Stop();
349 }
350
IsStarted() const351 bool SyncSystemResources::IsStarted() const {
352 return is_started_;
353 }
354
set_platform(const std::string & platform)355 void SyncSystemResources::set_platform(const std::string& platform) {
356 platform_ = platform;
357 }
358
platform() const359 std::string SyncSystemResources::platform() const {
360 return platform_;
361 }
362
logger()363 SyncLogger* SyncSystemResources::logger() {
364 return logger_.get();
365 }
366
storage()367 SyncStorage* SyncSystemResources::storage() {
368 return storage_.get();
369 }
370
network()371 SyncNetworkChannel* SyncSystemResources::network() {
372 return sync_network_channel_;
373 }
374
internal_scheduler()375 SyncInvalidationScheduler* SyncSystemResources::internal_scheduler() {
376 return internal_scheduler_.get();
377 }
378
listener_scheduler()379 SyncInvalidationScheduler* SyncSystemResources::listener_scheduler() {
380 return listener_scheduler_.get();
381 }
382
383 } // namespace syncer
384