1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "chrome/browser/sync/engine/syncer_thread.h"
6
7 #include <algorithm>
8
9 #include "base/rand_util.h"
10 #include "chrome/browser/sync/engine/syncer.h"
11
12 using base::TimeDelta;
13 using base::TimeTicks;
14
15 namespace browser_sync {
16
17 using sessions::SyncSession;
18 using sessions::SyncSessionSnapshot;
19 using sessions::SyncSourceInfo;
20 using syncable::ModelTypePayloadMap;
21 using syncable::ModelTypeBitSet;
22 using sync_pb::GetUpdatesCallerInfo;
23
DelayProvider()24 SyncerThread::DelayProvider::DelayProvider() {}
~DelayProvider()25 SyncerThread::DelayProvider::~DelayProvider() {}
26
WaitInterval()27 SyncerThread::WaitInterval::WaitInterval() {}
~WaitInterval()28 SyncerThread::WaitInterval::~WaitInterval() {}
29
SyncSessionJob()30 SyncerThread::SyncSessionJob::SyncSessionJob() {}
~SyncSessionJob()31 SyncerThread::SyncSessionJob::~SyncSessionJob() {}
32
SyncSessionJob(SyncSessionJobPurpose purpose,base::TimeTicks start,linked_ptr<sessions::SyncSession> session,bool is_canary_job,const tracked_objects::Location & nudge_location)33 SyncerThread::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose,
34 base::TimeTicks start,
35 linked_ptr<sessions::SyncSession> session, bool is_canary_job,
36 const tracked_objects::Location& nudge_location) : purpose(purpose),
37 scheduled_start(start),
38 session(session),
39 is_canary_job(is_canary_job),
40 nudge_location(nudge_location) {
41 }
42
GetDelay(const base::TimeDelta & last_delay)43 TimeDelta SyncerThread::DelayProvider::GetDelay(
44 const base::TimeDelta& last_delay) {
45 return SyncerThread::GetRecommendedDelay(last_delay);
46 }
47
GetUpdatesFromNudgeSource(NudgeSource source)48 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource(
49 NudgeSource source) {
50 switch (source) {
51 case NUDGE_SOURCE_NOTIFICATION:
52 return GetUpdatesCallerInfo::NOTIFICATION;
53 case NUDGE_SOURCE_LOCAL:
54 return GetUpdatesCallerInfo::LOCAL;
55 case NUDGE_SOURCE_CONTINUATION:
56 return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION;
57 case NUDGE_SOURCE_UNKNOWN:
58 return GetUpdatesCallerInfo::UNKNOWN;
59 default:
60 NOTREACHED();
61 return GetUpdatesCallerInfo::UNKNOWN;
62 }
63 }
64
WaitInterval(Mode mode,TimeDelta length)65 SyncerThread::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
66 : mode(mode), had_nudge(false), length(length) { }
67
SyncerThread(sessions::SyncSessionContext * context,Syncer * syncer)68 SyncerThread::SyncerThread(sessions::SyncSessionContext* context,
69 Syncer* syncer)
70 : thread_("SyncEngine_SyncerThread"),
71 syncer_short_poll_interval_seconds_(
72 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)),
73 syncer_long_poll_interval_seconds_(
74 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)),
75 mode_(NORMAL_MODE),
76 server_connection_ok_(false),
77 delay_provider_(new DelayProvider()),
78 syncer_(syncer),
79 session_context_(context) {
80 }
81
~SyncerThread()82 SyncerThread::~SyncerThread() {
83 DCHECK(!thread_.IsRunning());
84 }
85
CheckServerConnectionManagerStatus(HttpResponse::ServerConnectionCode code)86 void SyncerThread::CheckServerConnectionManagerStatus(
87 HttpResponse::ServerConnectionCode code) {
88
89 VLOG(1) << "SyncerThread(" << this << ")" << " Server connection changed."
90 << "Old mode: " << server_connection_ok_ << " Code: " << code;
91 // Note, be careful when adding cases here because if the SyncerThread
92 // thinks there is no valid connection as determined by this method, it
93 // will drop out of *all* forward progress sync loops (it won't poll and it
94 // will queue up Talk notifications but not actually call SyncShare) until
95 // some external action causes a ServerConnectionManager to broadcast that
96 // a valid connection has been re-established.
97 if (HttpResponse::CONNECTION_UNAVAILABLE == code ||
98 HttpResponse::SYNC_AUTH_ERROR == code) {
99 server_connection_ok_ = false;
100 VLOG(1) << "SyncerThread(" << this << ")" << " Server connection changed."
101 << " new mode:" << server_connection_ok_;
102 } else if (HttpResponse::SERVER_CONNECTION_OK == code) {
103 server_connection_ok_ = true;
104 VLOG(1) << "SyncerThread(" << this << ")" << " Server connection changed."
105 << " new mode:" << server_connection_ok_;
106 DoCanaryJob();
107 }
108 }
109
Start(Mode mode,ModeChangeCallback * callback)110 void SyncerThread::Start(Mode mode, ModeChangeCallback* callback) {
111 VLOG(1) << "SyncerThread(" << this << ")" << " Start called from thread "
112 << MessageLoop::current()->thread_name();
113 if (!thread_.IsRunning()) {
114 VLOG(1) << "SyncerThread(" << this << ")" << " Starting thread with mode "
115 << mode;
116 if (!thread_.Start()) {
117 NOTREACHED() << "Unable to start SyncerThread.";
118 return;
119 }
120 WatchConnectionManager();
121 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
122 this, &SyncerThread::SendInitialSnapshot));
123 }
124
125 VLOG(1) << "SyncerThread(" << this << ")" << " Entering start with mode = "
126 << mode;
127
128 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
129 this, &SyncerThread::StartImpl, mode, callback));
130 }
131
SendInitialSnapshot()132 void SyncerThread::SendInitialSnapshot() {
133 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
134 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_.get(), this,
135 SyncSourceInfo(), ModelSafeRoutingInfo(),
136 std::vector<ModelSafeWorker*>()));
137 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED);
138 sessions::SyncSessionSnapshot snapshot(dummy->TakeSnapshot());
139 event.snapshot = &snapshot;
140 session_context_->NotifyListeners(event);
141 }
142
WatchConnectionManager()143 void SyncerThread::WatchConnectionManager() {
144 ServerConnectionManager* scm = session_context_->connection_manager();
145 CheckServerConnectionManagerStatus(scm->server_status());
146 scm->AddListener(this);
147 }
148
StartImpl(Mode mode,ModeChangeCallback * callback)149 void SyncerThread::StartImpl(Mode mode, ModeChangeCallback* callback) {
150 VLOG(1) << "SyncerThread(" << this << ")" << " Doing StartImpl with mode "
151 << mode;
152
153 // TODO(lipalani): This will leak if startimpl is never run. Fix it using a
154 // ThreadSafeRefcounted object.
155 scoped_ptr<ModeChangeCallback> scoped_callback(callback);
156 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
157 DCHECK(!session_context_->account_name().empty());
158 DCHECK(syncer_.get());
159 mode_ = mode;
160 AdjustPolling(NULL); // Will kick start poll timer if needed.
161 if (scoped_callback.get())
162 scoped_callback->Run();
163
164 // We just changed our mode. See if there are any pending jobs that we could
165 // execute in the new mode.
166 DoPendingJobIfPossible(false);
167 }
168
DecideWhileInWaitInterval(const SyncSessionJob & job)169 SyncerThread::JobProcessDecision SyncerThread::DecideWhileInWaitInterval(
170 const SyncSessionJob& job) {
171
172 DCHECK(wait_interval_.get());
173 DCHECK_NE(job.purpose, SyncSessionJob::CLEAR_USER_DATA);
174
175 VLOG(1) << "SyncerThread(" << this << ")" << " Wait interval mode : "
176 << wait_interval_->mode << "Wait interval had nudge : "
177 << wait_interval_->had_nudge << "is canary job : "
178 << job.is_canary_job;
179
180 if (job.purpose == SyncSessionJob::POLL)
181 return DROP;
182
183 DCHECK(job.purpose == SyncSessionJob::NUDGE ||
184 job.purpose == SyncSessionJob::CONFIGURATION);
185 if (wait_interval_->mode == WaitInterval::THROTTLED)
186 return SAVE;
187
188 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF);
189 if (job.purpose == SyncSessionJob::NUDGE) {
190 if (mode_ == CONFIGURATION_MODE)
191 return SAVE;
192
193 // If we already had one nudge then just drop this nudge. We will retry
194 // later when the timer runs out.
195 return wait_interval_->had_nudge ? DROP : CONTINUE;
196 }
197 // This is a config job.
198 return job.is_canary_job ? CONTINUE : SAVE;
199 }
200
DecideOnJob(const SyncSessionJob & job)201 SyncerThread::JobProcessDecision SyncerThread::DecideOnJob(
202 const SyncSessionJob& job) {
203 if (job.purpose == SyncSessionJob::CLEAR_USER_DATA)
204 return CONTINUE;
205
206 if (wait_interval_.get())
207 return DecideWhileInWaitInterval(job);
208
209 if (mode_ == CONFIGURATION_MODE) {
210 if (job.purpose == SyncSessionJob::NUDGE)
211 return SAVE;
212 else if (job.purpose == SyncSessionJob::CONFIGURATION)
213 return CONTINUE;
214 else
215 return DROP;
216 }
217
218 // We are in normal mode.
219 DCHECK_EQ(mode_, NORMAL_MODE);
220 DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION);
221
222 // Freshness condition
223 if (job.scheduled_start < last_sync_session_end_time_) {
224 VLOG(1) << "SyncerThread(" << this << ")"
225 << " Dropping job because of freshness";
226 return DROP;
227 }
228
229 if (server_connection_ok_)
230 return CONTINUE;
231
232 VLOG(1) << "SyncerThread(" << this << ")"
233 << " Bad server connection. Using that to decide on job.";
234 return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP;
235 }
236
InitOrCoalescePendingJob(const SyncSessionJob & job)237 void SyncerThread::InitOrCoalescePendingJob(const SyncSessionJob& job) {
238 DCHECK(job.purpose != SyncSessionJob::CONFIGURATION);
239 if (pending_nudge_.get() == NULL) {
240 VLOG(1) << "SyncerThread(" << this << ")"
241 << " Creating a pending nudge job";
242 SyncSession* s = job.session.get();
243 scoped_ptr<SyncSession> session(new SyncSession(s->context(),
244 s->delegate(), s->source(), s->routing_info(), s->workers()));
245
246 SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start,
247 make_linked_ptr(session.release()), false, job.nudge_location);
248 pending_nudge_.reset(new SyncSessionJob(new_job));
249
250 return;
251 }
252
253 VLOG(1) << "SyncerThread(" << this << ")" << " Coalescing a pending nudge";
254 pending_nudge_->session->Coalesce(*(job.session.get()));
255 pending_nudge_->scheduled_start = job.scheduled_start;
256
257 // Unfortunately the nudge location cannot be modified. So it stores the
258 // location of the first caller.
259 }
260
ShouldRunJob(const SyncSessionJob & job)261 bool SyncerThread::ShouldRunJob(const SyncSessionJob& job) {
262 JobProcessDecision decision = DecideOnJob(job);
263 VLOG(1) << "SyncerThread(" << this << ")" << " Should run job, decision: "
264 << decision << " Job purpose " << job.purpose << "mode " << mode_;
265 if (decision != SAVE)
266 return decision == CONTINUE;
267
268 DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose ==
269 SyncSessionJob::CONFIGURATION);
270
271 SaveJob(job);
272 return false;
273 }
274
SaveJob(const SyncSessionJob & job)275 void SyncerThread::SaveJob(const SyncSessionJob& job) {
276 DCHECK(job.purpose != SyncSessionJob::CLEAR_USER_DATA);
277 if (job.purpose == SyncSessionJob::NUDGE) {
278 VLOG(1) << "SyncerThread(" << this << ")" << " Saving a nudge job";
279 InitOrCoalescePendingJob(job);
280 } else if (job.purpose == SyncSessionJob::CONFIGURATION){
281 VLOG(1) << "SyncerThread(" << this << ")" << " Saving a configuration job";
282 DCHECK(wait_interval_.get());
283 DCHECK(mode_ == CONFIGURATION_MODE);
284
285 SyncSession* old = job.session.get();
286 SyncSession* s(new SyncSession(session_context_.get(), this,
287 old->source(), old->routing_info(), old->workers()));
288 SyncSessionJob new_job(job.purpose, TimeTicks::Now(),
289 make_linked_ptr(s), false, job.nudge_location);
290 wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job));
291 } // drop the rest.
292 }
293
294 // Functor for std::find_if to search by ModelSafeGroup.
295 struct ModelSafeWorkerGroupIs {
ModelSafeWorkerGroupIsbrowser_sync::ModelSafeWorkerGroupIs296 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {}
operator ()browser_sync::ModelSafeWorkerGroupIs297 bool operator()(ModelSafeWorker* w) {
298 return group == w->GetModelSafeGroup();
299 }
300 ModelSafeGroup group;
301 };
302
ScheduleClearUserData()303 void SyncerThread::ScheduleClearUserData() {
304 if (!thread_.IsRunning()) {
305 NOTREACHED();
306 return;
307 }
308 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
309 this, &SyncerThread::ScheduleClearUserDataImpl));
310 }
311
ScheduleNudge(const TimeDelta & delay,NudgeSource source,const ModelTypeBitSet & types,const tracked_objects::Location & nudge_location)312 void SyncerThread::ScheduleNudge(const TimeDelta& delay,
313 NudgeSource source, const ModelTypeBitSet& types,
314 const tracked_objects::Location& nudge_location) {
315 if (!thread_.IsRunning()) {
316 NOTREACHED();
317 return;
318 }
319
320 VLOG(1) << "SyncerThread(" << this << ")" << " Nudge scheduled";
321
322 ModelTypePayloadMap types_with_payloads =
323 syncable::ModelTypePayloadMapFromBitSet(types, std::string());
324 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
325 this, &SyncerThread::ScheduleNudgeImpl, delay,
326 GetUpdatesFromNudgeSource(source), types_with_payloads, false,
327 nudge_location));
328 }
329
ScheduleNudgeWithPayloads(const TimeDelta & delay,NudgeSource source,const ModelTypePayloadMap & types_with_payloads,const tracked_objects::Location & nudge_location)330 void SyncerThread::ScheduleNudgeWithPayloads(const TimeDelta& delay,
331 NudgeSource source, const ModelTypePayloadMap& types_with_payloads,
332 const tracked_objects::Location& nudge_location) {
333 if (!thread_.IsRunning()) {
334 NOTREACHED();
335 return;
336 }
337
338 VLOG(1) << "SyncerThread(" << this << ")" << " Nudge scheduled with payloads";
339
340 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
341 this, &SyncerThread::ScheduleNudgeImpl, delay,
342 GetUpdatesFromNudgeSource(source), types_with_payloads, false,
343 nudge_location));
344 }
345
ScheduleClearUserDataImpl()346 void SyncerThread::ScheduleClearUserDataImpl() {
347 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
348 SyncSession* session = new SyncSession(session_context_.get(), this,
349 SyncSourceInfo(), ModelSafeRoutingInfo(),
350 std::vector<ModelSafeWorker*>());
351 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0),
352 SyncSessionJob::CLEAR_USER_DATA, session, FROM_HERE);
353 }
354
ScheduleNudgeImpl(const TimeDelta & delay,GetUpdatesCallerInfo::GetUpdatesSource source,const ModelTypePayloadMap & types_with_payloads,bool is_canary_job,const tracked_objects::Location & nudge_location)355 void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay,
356 GetUpdatesCallerInfo::GetUpdatesSource source,
357 const ModelTypePayloadMap& types_with_payloads,
358 bool is_canary_job, const tracked_objects::Location& nudge_location) {
359 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
360
361 VLOG(1) << "SyncerThread(" << this << ")" << " Running Schedule nudge impl";
362 // Note we currently nudge for all types regardless of the ones incurring
363 // the nudge. Doing different would throw off some syncer commands like
364 // CleanupDisabledTypes. We may want to change this in the future.
365 SyncSourceInfo info(source, types_with_payloads);
366
367 SyncSession* session(CreateSyncSession(info));
368 SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay,
369 make_linked_ptr(session), is_canary_job,
370 nudge_location);
371
372 session = NULL;
373 if (!ShouldRunJob(job))
374 return;
375
376 if (pending_nudge_.get()) {
377 if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) {
378 VLOG(1) << "SyncerThread(" << this << ")" << " Dropping the nudge because"
379 << "we are in backoff";
380 return;
381 }
382
383 VLOG(1) << "SyncerThread(" << this << ")" << " Coalescing pending nudge";
384 pending_nudge_->session->Coalesce(*(job.session.get()));
385
386 if (!IsBackingOff()) {
387 VLOG(1) << "SyncerThread(" << this << ")" << " Dropping a nudge because"
388 << " we are not in backoff and the job was coalesced";
389 return;
390 } else {
391 VLOG(1) << "SyncerThread(" << this << ")"
392 << " Rescheduling pending nudge";
393 SyncSession* s = pending_nudge_->session.get();
394 job.session.reset(new SyncSession(s->context(), s->delegate(),
395 s->source(), s->routing_info(), s->workers()));
396 pending_nudge_.reset();
397 }
398 }
399
400 // TODO(lipalani) - pass the job itself to ScheduleSyncSessionJob.
401 ScheduleSyncSessionJob(delay, SyncSessionJob::NUDGE, job.session.release(),
402 nudge_location);
403 }
404
405 // Helper to extract the routing info and workers corresponding to types in
406 // |types| from |registrar|.
GetModelSafeParamsForTypes(const ModelTypeBitSet & types,ModelSafeWorkerRegistrar * registrar,ModelSafeRoutingInfo * routes,std::vector<ModelSafeWorker * > * workers)407 void GetModelSafeParamsForTypes(const ModelTypeBitSet& types,
408 ModelSafeWorkerRegistrar* registrar, ModelSafeRoutingInfo* routes,
409 std::vector<ModelSafeWorker*>* workers) {
410 ModelSafeRoutingInfo r_tmp;
411 std::vector<ModelSafeWorker*> w_tmp;
412 registrar->GetModelSafeRoutingInfo(&r_tmp);
413 registrar->GetWorkers(&w_tmp);
414
415 bool passive_group_added = false;
416
417 typedef std::vector<ModelSafeWorker*>::const_iterator iter;
418 for (size_t i = syncable::FIRST_REAL_MODEL_TYPE; i < types.size(); ++i) {
419 if (!types.test(i))
420 continue;
421 syncable::ModelType t = syncable::ModelTypeFromInt(i);
422 DCHECK_EQ(1U, r_tmp.count(t));
423 (*routes)[t] = r_tmp[t];
424 iter it = std::find_if(w_tmp.begin(), w_tmp.end(),
425 ModelSafeWorkerGroupIs(r_tmp[t]));
426 if (it != w_tmp.end()) {
427 iter it2 = std::find_if(workers->begin(), workers->end(),
428 ModelSafeWorkerGroupIs(r_tmp[t]));
429 if (it2 == workers->end())
430 workers->push_back(*it);
431
432 if (r_tmp[t] == GROUP_PASSIVE)
433 passive_group_added = true;
434 } else {
435 NOTREACHED();
436 }
437 }
438
439 // Always add group passive.
440 if (passive_group_added == false) {
441 iter it = std::find_if(w_tmp.begin(), w_tmp.end(),
442 ModelSafeWorkerGroupIs(GROUP_PASSIVE));
443 if (it != w_tmp.end())
444 workers->push_back(*it);
445 else
446 NOTREACHED();
447 }
448 }
449
ScheduleConfig(const ModelTypeBitSet & types)450 void SyncerThread::ScheduleConfig(const ModelTypeBitSet& types) {
451 if (!thread_.IsRunning()) {
452 NOTREACHED();
453 return;
454 }
455
456 VLOG(1) << "SyncerThread(" << this << ")" << " Scheduling a config";
457 ModelSafeRoutingInfo routes;
458 std::vector<ModelSafeWorker*> workers;
459 GetModelSafeParamsForTypes(types, session_context_->registrar(),
460 &routes, &workers);
461
462 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
463 this, &SyncerThread::ScheduleConfigImpl, routes, workers,
464 GetUpdatesCallerInfo::FIRST_UPDATE));
465 }
466
ScheduleConfigImpl(const ModelSafeRoutingInfo & routing_info,const std::vector<ModelSafeWorker * > & workers,const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source)467 void SyncerThread::ScheduleConfigImpl(const ModelSafeRoutingInfo& routing_info,
468 const std::vector<ModelSafeWorker*>& workers,
469 const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source) {
470 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
471
472 VLOG(1) << "SyncerThread(" << this << ")" << " ScheduleConfigImpl...";
473 // TODO(tim): config-specific GetUpdatesCallerInfo value?
474 SyncSession* session = new SyncSession(session_context_.get(), this,
475 SyncSourceInfo(source,
476 syncable::ModelTypePayloadMapFromRoutingInfo(
477 routing_info, std::string())),
478 routing_info, workers);
479 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0),
480 SyncSessionJob::CONFIGURATION, session, FROM_HERE);
481 }
482
ScheduleSyncSessionJob(const base::TimeDelta & delay,SyncSessionJob::SyncSessionJobPurpose purpose,sessions::SyncSession * session,const tracked_objects::Location & nudge_location)483 void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay,
484 SyncSessionJob::SyncSessionJobPurpose purpose,
485 sessions::SyncSession* session,
486 const tracked_objects::Location& nudge_location) {
487 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
488
489 SyncSessionJob job(purpose, TimeTicks::Now() + delay,
490 make_linked_ptr(session), false, nudge_location);
491 if (purpose == SyncSessionJob::NUDGE) {
492 VLOG(1) << "SyncerThread(" << this << ")" << " Resetting pending_nudge in"
493 << " ScheduleSyncSessionJob";
494 DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == session);
495 pending_nudge_.reset(new SyncSessionJob(job));
496 }
497 VLOG(1) << "SyncerThread(" << this << ")"
498 << " Posting job to execute in DoSyncSessionJob. Job purpose "
499 << job.purpose;
500 MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod(this,
501 &SyncerThread::DoSyncSessionJob, job),
502 delay.InMilliseconds());
503 }
504
SetSyncerStepsForPurpose(SyncSessionJob::SyncSessionJobPurpose purpose,SyncerStep * start,SyncerStep * end)505 void SyncerThread::SetSyncerStepsForPurpose(
506 SyncSessionJob::SyncSessionJobPurpose purpose,
507 SyncerStep* start, SyncerStep* end) {
508 *end = SYNCER_END;
509 switch (purpose) {
510 case SyncSessionJob::CONFIGURATION:
511 *start = DOWNLOAD_UPDATES;
512 *end = APPLY_UPDATES;
513 return;
514 case SyncSessionJob::CLEAR_USER_DATA:
515 *start = CLEAR_PRIVATE_DATA;
516 return;
517 case SyncSessionJob::NUDGE:
518 case SyncSessionJob::POLL:
519 *start = SYNCER_BEGIN;
520 return;
521 default:
522 NOTREACHED();
523 }
524 }
525
DoSyncSessionJob(const SyncSessionJob & job)526 void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) {
527 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
528 if (!ShouldRunJob(job)) {
529 LOG(WARNING) << "Dropping nudge at DoSyncSessionJob, source = "
530 << job.session->source().updates_source;
531 return;
532 }
533
534 if (job.purpose == SyncSessionJob::NUDGE) {
535 if (pending_nudge_.get() == NULL || pending_nudge_->session != job.session)
536 return; // Another nudge must have been scheduled in in the meantime.
537 pending_nudge_.reset();
538 }
539 VLOG(1) << "SyncerThread(" << this << ")" << " DoSyncSessionJob. job purpose "
540 << job.purpose;
541
542 SyncerStep begin(SYNCER_BEGIN);
543 SyncerStep end(SYNCER_END);
544 SetSyncerStepsForPurpose(job.purpose, &begin, &end);
545
546 bool has_more_to_sync = true;
547 while (ShouldRunJob(job) && has_more_to_sync) {
548 VLOG(1) << "SyncerThread(" << this << ")"
549 << " SyncerThread: Calling SyncShare.";
550 // Synchronously perform the sync session from this thread.
551 syncer_->SyncShare(job.session.get(), begin, end);
552 has_more_to_sync = job.session->HasMoreToSync();
553 if (has_more_to_sync)
554 job.session->ResetTransientState();
555 }
556 VLOG(1) << "SyncerThread(" << this << ")"
557 << " SyncerThread: Done SyncShare looping.";
558 FinishSyncSessionJob(job);
559 }
560
UpdateCarryoverSessionState(const SyncSessionJob & old_job)561 void SyncerThread::UpdateCarryoverSessionState(const SyncSessionJob& old_job) {
562 if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
563 // Whatever types were part of a configuration task will have had updates
564 // downloaded. For that reason, we make sure they get recorded in the
565 // event that they get disabled at a later time.
566 ModelSafeRoutingInfo r(session_context_->previous_session_routing_info());
567 if (!r.empty()) {
568 ModelSafeRoutingInfo temp_r;
569 ModelSafeRoutingInfo old_info(old_job.session->routing_info());
570 std::set_union(r.begin(), r.end(), old_info.begin(), old_info.end(),
571 std::insert_iterator<ModelSafeRoutingInfo>(temp_r, temp_r.begin()));
572 session_context_->set_previous_session_routing_info(temp_r);
573 }
574 } else {
575 session_context_->set_previous_session_routing_info(
576 old_job.session->routing_info());
577 }
578 }
579
FinishSyncSessionJob(const SyncSessionJob & job)580 void SyncerThread::FinishSyncSessionJob(const SyncSessionJob& job) {
581 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
582 // Update timing information for how often datatypes are triggering nudges.
583 base::TimeTicks now = TimeTicks::Now();
584 if (!last_sync_session_end_time_.is_null()) {
585 ModelTypePayloadMap::const_iterator iter;
586 for (iter = job.session->source().types.begin();
587 iter != job.session->source().types.end();
588 ++iter) {
589 syncable::PostTimeToTypeHistogram(iter->first,
590 now - last_sync_session_end_time_);
591 }
592 }
593 last_sync_session_end_time_ = now;
594 UpdateCarryoverSessionState(job);
595 if (IsSyncingCurrentlySilenced()) {
596 VLOG(1) << "SyncerThread(" << this << ")"
597 << " We are currently throttled. So not scheduling the next sync.";
598 SaveJob(job);
599 return; // Nothing to do.
600 }
601
602 VLOG(1) << "SyncerThread(" << this << ")"
603 << " Updating the next polling time after SyncMain";
604 ScheduleNextSync(job);
605 }
606
ScheduleNextSync(const SyncSessionJob & old_job)607 void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) {
608 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
609 DCHECK(!old_job.session->HasMoreToSync());
610 // Note: |num_server_changes_remaining| > 0 here implies that we received a
611 // broken response while trying to download all updates, because the Syncer
612 // will loop until this value is exhausted. Also, if unsynced_handles exist
613 // but HasMoreToSync is false, this implies that the Syncer determined no
614 // forward progress was possible at this time (an error, such as an HTTP
615 // 500, is likely to have occurred during commit).
616 const bool work_to_do =
617 old_job.session->status_controller()->num_server_changes_remaining() > 0
618 || old_job.session->status_controller()->unsynced_handles().size() > 0;
619 VLOG(1) << "SyncerThread(" << this << ")" << " syncer has work to do: "
620 << work_to_do;
621
622 AdjustPolling(&old_job);
623
624 // TODO(tim): Old impl had special code if notifications disabled. Needed?
625 if (!work_to_do) {
626 // Success implies backoff relief. Note that if this was a "one-off" job
627 // (i.e. purpose == SyncSessionJob::CLEAR_USER_DATA), if there was
628 // work_to_do before it ran this wont have changed, as jobs like this don't
629 // run a full sync cycle. So we don't need special code here.
630 wait_interval_.reset();
631 VLOG(1) << "SyncerThread(" << this << ")"
632 << " Job suceeded so not scheduling more jobs";
633 return;
634 }
635
636 if (old_job.session->source().updates_source ==
637 GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION) {
638 VLOG(1) << "SyncerThread(" << this << ")"
639 << " Job failed with source continuation";
640 // We don't seem to have made forward progress. Start or extend backoff.
641 HandleConsecutiveContinuationError(old_job);
642 } else if (IsBackingOff()) {
643 VLOG(1) << "SyncerThread(" << this << ")"
644 << " A nudge during backoff failed";
645 // We weren't continuing but we're in backoff; must have been a nudge.
646 DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose);
647 DCHECK(!wait_interval_->had_nudge);
648 wait_interval_->had_nudge = true;
649 wait_interval_->timer.Reset();
650 } else {
651 VLOG(1) << "SyncerThread(" << this << ")"
652 << " Failed. Schedule a job with continuation as source";
653 // We weren't continuing and we aren't in backoff. Schedule a normal
654 // continuation.
655 if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
656 ScheduleConfigImpl(old_job.session->routing_info(),
657 old_job.session->workers(),
658 GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION));
659 } else {
660 // For all other purposes(nudge and poll) we schedule a retry nudge.
661 ScheduleNudgeImpl(TimeDelta::FromSeconds(0),
662 GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION),
663 old_job.session->source().types, false, FROM_HERE);
664 }
665 }
666 }
667
AdjustPolling(const SyncSessionJob * old_job)668 void SyncerThread::AdjustPolling(const SyncSessionJob* old_job) {
669 DCHECK(thread_.IsRunning());
670 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
671
672 TimeDelta poll = (!session_context_->notifications_enabled()) ?
673 syncer_short_poll_interval_seconds_ :
674 syncer_long_poll_interval_seconds_;
675 bool rate_changed = !poll_timer_.IsRunning() ||
676 poll != poll_timer_.GetCurrentDelay();
677
678 if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed)
679 poll_timer_.Reset();
680
681 if (!rate_changed)
682 return;
683
684 // Adjust poll rate.
685 poll_timer_.Stop();
686 poll_timer_.Start(poll, this, &SyncerThread::PollTimerCallback);
687 }
688
HandleConsecutiveContinuationError(const SyncSessionJob & old_job)689 void SyncerThread::HandleConsecutiveContinuationError(
690 const SyncSessionJob& old_job) {
691 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
692 // This if conditions should be compiled out in retail builds.
693 if (IsBackingOff()) {
694 DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job);
695 }
696 SyncSession* old = old_job.session.get();
697 SyncSession* s(new SyncSession(session_context_.get(), this,
698 old->source(), old->routing_info(), old->workers()));
699 TimeDelta length = delay_provider_->GetDelay(
700 IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1));
701
702 VLOG(1) << "SyncerThread(" << this << ")"
703 << " In handle continuation error. Old job purpose is "
704 << old_job.purpose;
705 VLOG(1) << "SyncerThread(" << this << ")"
706 << " In Handle continuation error. The time delta(ms) is: "
707 << length.InMilliseconds();
708
709 // This will reset the had_nudge variable as well.
710 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF,
711 length));
712 if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
713 SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length,
714 make_linked_ptr(s), false, FROM_HERE);
715 wait_interval_->pending_configure_job.reset(new SyncSessionJob(job));
716 } else {
717 // We are not in configuration mode. So wait_interval's pending job
718 // should be null.
719 DCHECK(wait_interval_->pending_configure_job.get() == NULL);
720
721 // TODO(lipalani) - handle clear user data.
722 InitOrCoalescePendingJob(old_job);
723 }
724 wait_interval_->timer.Start(length, this, &SyncerThread::DoCanaryJob);
725 }
726
727 // static
GetRecommendedDelay(const TimeDelta & last_delay)728 TimeDelta SyncerThread::GetRecommendedDelay(const TimeDelta& last_delay) {
729 if (last_delay.InSeconds() >= kMaxBackoffSeconds)
730 return TimeDelta::FromSeconds(kMaxBackoffSeconds);
731
732 // This calculates approx. base_delay_seconds * 2 +/- base_delay_seconds / 2
733 int64 backoff_s =
734 std::max(static_cast<int64>(1),
735 last_delay.InSeconds() * kBackoffRandomizationFactor);
736
737 // Flip a coin to randomize backoff interval by +/- 50%.
738 int rand_sign = base::RandInt(0, 1) * 2 - 1;
739
740 // Truncation is adequate for rounding here.
741 backoff_s = backoff_s +
742 (rand_sign * (last_delay.InSeconds() / kBackoffRandomizationFactor));
743
744 // Cap the backoff interval.
745 backoff_s = std::max(static_cast<int64>(1),
746 std::min(backoff_s, kMaxBackoffSeconds));
747
748 return TimeDelta::FromSeconds(backoff_s);
749 }
750
Stop()751 void SyncerThread::Stop() {
752 VLOG(1) << "SyncerThread(" << this << ")" << " stop called";
753 syncer_->RequestEarlyExit(); // Safe to call from any thread.
754 session_context_->connection_manager()->RemoveListener(this);
755 thread_.Stop();
756 }
757
DoCanaryJob()758 void SyncerThread::DoCanaryJob() {
759 VLOG(1) << "SyncerThread(" << this << ")" << " Do canary job";
760 DoPendingJobIfPossible(true);
761 }
762
DoPendingJobIfPossible(bool is_canary_job)763 void SyncerThread::DoPendingJobIfPossible(bool is_canary_job) {
764 SyncSessionJob* job_to_execute = NULL;
765 if (mode_ == CONFIGURATION_MODE && wait_interval_.get()
766 && wait_interval_->pending_configure_job.get()) {
767 VLOG(1) << "SyncerThread(" << this << ")" << " Found pending configure job";
768 job_to_execute = wait_interval_->pending_configure_job.get();
769 } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) {
770 VLOG(1) << "SyncerThread(" << this << ")" << " Found pending nudge job";
771 // Pending jobs mostly have time from the past. Reset it so this job
772 // will get executed.
773 if (pending_nudge_->scheduled_start < TimeTicks::Now())
774 pending_nudge_->scheduled_start = TimeTicks::Now();
775
776 scoped_ptr<SyncSession> session(CreateSyncSession(
777 pending_nudge_->session->source()));
778
779 // Also the routing info might have been changed since we cached the
780 // pending nudge. Update it by coalescing to the latest.
781 pending_nudge_->session->Coalesce(*(session.get()));
782 // The pending nudge would be cleared in the DoSyncSessionJob function.
783 job_to_execute = pending_nudge_.get();
784 }
785
786 if (job_to_execute != NULL) {
787 VLOG(1) << "SyncerThread(" << this << ")" << " Executing pending job";
788 SyncSessionJob copy = *job_to_execute;
789 copy.is_canary_job = is_canary_job;
790 DoSyncSessionJob(copy);
791 }
792 }
793
CreateSyncSession(const SyncSourceInfo & source)794 SyncSession* SyncerThread::CreateSyncSession(const SyncSourceInfo& source) {
795 ModelSafeRoutingInfo routes;
796 std::vector<ModelSafeWorker*> workers;
797 session_context_->registrar()->GetModelSafeRoutingInfo(&routes);
798 session_context_->registrar()->GetWorkers(&workers);
799 SyncSourceInfo info(source);
800
801 SyncSession* session(new SyncSession(session_context_.get(), this, info,
802 routes, workers));
803
804 return session;
805 }
806
PollTimerCallback()807 void SyncerThread::PollTimerCallback() {
808 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
809 ModelSafeRoutingInfo r;
810 ModelTypePayloadMap types_with_payloads =
811 syncable::ModelTypePayloadMapFromRoutingInfo(r, std::string());
812 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads);
813 SyncSession* s = CreateSyncSession(info);
814 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), SyncSessionJob::POLL, s,
815 FROM_HERE);
816 }
817
Unthrottle()818 void SyncerThread::Unthrottle() {
819 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
820 VLOG(1) << "SyncerThread(" << this << ")" << " Unthrottled..";
821 DoCanaryJob();
822 wait_interval_.reset();
823 }
824
Notify(SyncEngineEvent::EventCause cause)825 void SyncerThread::Notify(SyncEngineEvent::EventCause cause) {
826 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
827 session_context_->NotifyListeners(SyncEngineEvent(cause));
828 }
829
IsBackingOff() const830 bool SyncerThread::IsBackingOff() const {
831 return wait_interval_.get() && wait_interval_->mode ==
832 WaitInterval::EXPONENTIAL_BACKOFF;
833 }
834
OnSilencedUntil(const base::TimeTicks & silenced_until)835 void SyncerThread::OnSilencedUntil(const base::TimeTicks& silenced_until) {
836 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED,
837 silenced_until - TimeTicks::Now()));
838 wait_interval_->timer.Start(wait_interval_->length, this,
839 &SyncerThread::Unthrottle);
840 }
841
IsSyncingCurrentlySilenced()842 bool SyncerThread::IsSyncingCurrentlySilenced() {
843 return wait_interval_.get() && wait_interval_->mode ==
844 WaitInterval::THROTTLED;
845 }
846
OnReceivedShortPollIntervalUpdate(const base::TimeDelta & new_interval)847 void SyncerThread::OnReceivedShortPollIntervalUpdate(
848 const base::TimeDelta& new_interval) {
849 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
850 syncer_short_poll_interval_seconds_ = new_interval;
851 }
852
OnReceivedLongPollIntervalUpdate(const base::TimeDelta & new_interval)853 void SyncerThread::OnReceivedLongPollIntervalUpdate(
854 const base::TimeDelta& new_interval) {
855 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
856 syncer_long_poll_interval_seconds_ = new_interval;
857 }
858
OnShouldStopSyncingPermanently()859 void SyncerThread::OnShouldStopSyncingPermanently() {
860 VLOG(1) << "SyncerThread(" << this << ")"
861 << " OnShouldStopSyncingPermanently";
862 syncer_->RequestEarlyExit(); // Thread-safe.
863 Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY);
864 }
865
OnServerConnectionEvent(const ServerConnectionEvent2 & event)866 void SyncerThread::OnServerConnectionEvent(
867 const ServerConnectionEvent2& event) {
868 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this,
869 &SyncerThread::CheckServerConnectionManagerStatus,
870 event.connection_code));
871 }
872
set_notifications_enabled(bool notifications_enabled)873 void SyncerThread::set_notifications_enabled(bool notifications_enabled) {
874 session_context_->set_notifications_enabled(notifications_enabled);
875 }
876
877 } // browser_sync
878