• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "chrome/browser/sync/engine/syncer_thread.h"
6 
7 #include <algorithm>
8 
9 #include "base/rand_util.h"
10 #include "chrome/browser/sync/engine/syncer.h"
11 
12 using base::TimeDelta;
13 using base::TimeTicks;
14 
15 namespace browser_sync {
16 
17 using sessions::SyncSession;
18 using sessions::SyncSessionSnapshot;
19 using sessions::SyncSourceInfo;
20 using syncable::ModelTypePayloadMap;
21 using syncable::ModelTypeBitSet;
22 using sync_pb::GetUpdatesCallerInfo;
23 
DelayProvider()24 SyncerThread::DelayProvider::DelayProvider() {}
~DelayProvider()25 SyncerThread::DelayProvider::~DelayProvider() {}
26 
WaitInterval()27 SyncerThread::WaitInterval::WaitInterval() {}
~WaitInterval()28 SyncerThread::WaitInterval::~WaitInterval() {}
29 
SyncSessionJob()30 SyncerThread::SyncSessionJob::SyncSessionJob() {}
~SyncSessionJob()31 SyncerThread::SyncSessionJob::~SyncSessionJob() {}
32 
SyncSessionJob(SyncSessionJobPurpose purpose,base::TimeTicks start,linked_ptr<sessions::SyncSession> session,bool is_canary_job,const tracked_objects::Location & nudge_location)33 SyncerThread::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose,
34     base::TimeTicks start,
35     linked_ptr<sessions::SyncSession> session, bool is_canary_job,
36     const tracked_objects::Location& nudge_location) : purpose(purpose),
37         scheduled_start(start),
38         session(session),
39         is_canary_job(is_canary_job),
40         nudge_location(nudge_location) {
41 }
42 
GetDelay(const base::TimeDelta & last_delay)43 TimeDelta SyncerThread::DelayProvider::GetDelay(
44     const base::TimeDelta& last_delay) {
45   return SyncerThread::GetRecommendedDelay(last_delay);
46 }
47 
GetUpdatesFromNudgeSource(NudgeSource source)48 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource(
49     NudgeSource source) {
50   switch (source) {
51     case NUDGE_SOURCE_NOTIFICATION:
52       return GetUpdatesCallerInfo::NOTIFICATION;
53     case NUDGE_SOURCE_LOCAL:
54       return GetUpdatesCallerInfo::LOCAL;
55     case NUDGE_SOURCE_CONTINUATION:
56       return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION;
57     case NUDGE_SOURCE_UNKNOWN:
58       return GetUpdatesCallerInfo::UNKNOWN;
59     default:
60       NOTREACHED();
61       return GetUpdatesCallerInfo::UNKNOWN;
62   }
63 }
64 
WaitInterval(Mode mode,TimeDelta length)65 SyncerThread::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
66     : mode(mode), had_nudge(false), length(length) { }
67 
SyncerThread(sessions::SyncSessionContext * context,Syncer * syncer)68 SyncerThread::SyncerThread(sessions::SyncSessionContext* context,
69                            Syncer* syncer)
70     : thread_("SyncEngine_SyncerThread"),
71       syncer_short_poll_interval_seconds_(
72           TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)),
73       syncer_long_poll_interval_seconds_(
74           TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)),
75       mode_(NORMAL_MODE),
76       server_connection_ok_(false),
77       delay_provider_(new DelayProvider()),
78       syncer_(syncer),
79       session_context_(context) {
80 }
81 
~SyncerThread()82 SyncerThread::~SyncerThread() {
83   DCHECK(!thread_.IsRunning());
84 }
85 
CheckServerConnectionManagerStatus(HttpResponse::ServerConnectionCode code)86 void SyncerThread::CheckServerConnectionManagerStatus(
87     HttpResponse::ServerConnectionCode code) {
88 
89   VLOG(1) << "SyncerThread(" << this << ")" << " Server connection changed."
90           << "Old mode: " << server_connection_ok_ << " Code: " << code;
91   // Note, be careful when adding cases here because if the SyncerThread
92   // thinks there is no valid connection as determined by this method, it
93   // will drop out of *all* forward progress sync loops (it won't poll and it
94   // will queue up Talk notifications but not actually call SyncShare) until
95   // some external action causes a ServerConnectionManager to broadcast that
96   // a valid connection has been re-established.
97   if (HttpResponse::CONNECTION_UNAVAILABLE == code ||
98       HttpResponse::SYNC_AUTH_ERROR == code) {
99     server_connection_ok_ = false;
100     VLOG(1) << "SyncerThread(" << this << ")" << " Server connection changed."
101             << " new mode:" << server_connection_ok_;
102   } else if (HttpResponse::SERVER_CONNECTION_OK == code) {
103     server_connection_ok_ = true;
104     VLOG(1) << "SyncerThread(" << this << ")" << " Server connection changed."
105             << " new mode:" << server_connection_ok_;
106     DoCanaryJob();
107   }
108 }
109 
Start(Mode mode,ModeChangeCallback * callback)110 void SyncerThread::Start(Mode mode, ModeChangeCallback* callback) {
111   VLOG(1) << "SyncerThread(" << this << ")" << "  Start called from thread "
112           << MessageLoop::current()->thread_name();
113   if (!thread_.IsRunning()) {
114     VLOG(1) << "SyncerThread(" << this << ")" << " Starting thread with mode "
115             << mode;
116     if (!thread_.Start()) {
117       NOTREACHED() << "Unable to start SyncerThread.";
118       return;
119     }
120     WatchConnectionManager();
121     thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
122         this, &SyncerThread::SendInitialSnapshot));
123   }
124 
125   VLOG(1) << "SyncerThread(" << this << ")" << "  Entering start with mode = "
126           << mode;
127 
128   thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
129       this, &SyncerThread::StartImpl, mode, callback));
130 }
131 
SendInitialSnapshot()132 void SyncerThread::SendInitialSnapshot() {
133   DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
134   scoped_ptr<SyncSession> dummy(new SyncSession(session_context_.get(), this,
135       SyncSourceInfo(), ModelSafeRoutingInfo(),
136       std::vector<ModelSafeWorker*>()));
137   SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED);
138   sessions::SyncSessionSnapshot snapshot(dummy->TakeSnapshot());
139   event.snapshot = &snapshot;
140   session_context_->NotifyListeners(event);
141 }
142 
WatchConnectionManager()143 void SyncerThread::WatchConnectionManager() {
144   ServerConnectionManager* scm = session_context_->connection_manager();
145   CheckServerConnectionManagerStatus(scm->server_status());
146   scm->AddListener(this);
147 }
148 
StartImpl(Mode mode,ModeChangeCallback * callback)149 void SyncerThread::StartImpl(Mode mode, ModeChangeCallback* callback) {
150   VLOG(1) << "SyncerThread(" << this << ")" << " Doing StartImpl with mode "
151           << mode;
152 
153   // TODO(lipalani): This will leak if startimpl is never run. Fix it using a
154   // ThreadSafeRefcounted object.
155   scoped_ptr<ModeChangeCallback> scoped_callback(callback);
156   DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
157   DCHECK(!session_context_->account_name().empty());
158   DCHECK(syncer_.get());
159   mode_ = mode;
160   AdjustPolling(NULL);  // Will kick start poll timer if needed.
161   if (scoped_callback.get())
162     scoped_callback->Run();
163 
164   // We just changed our mode. See if there are any pending jobs that we could
165   // execute in the new mode.
166   DoPendingJobIfPossible(false);
167 }
168 
DecideWhileInWaitInterval(const SyncSessionJob & job)169 SyncerThread::JobProcessDecision SyncerThread::DecideWhileInWaitInterval(
170     const SyncSessionJob& job) {
171 
172   DCHECK(wait_interval_.get());
173   DCHECK_NE(job.purpose, SyncSessionJob::CLEAR_USER_DATA);
174 
175   VLOG(1) << "SyncerThread(" << this << ")" << " Wait interval mode : "
176           << wait_interval_->mode << "Wait interval had nudge : "
177           << wait_interval_->had_nudge << "is canary job : "
178           << job.is_canary_job;
179 
180   if (job.purpose == SyncSessionJob::POLL)
181     return DROP;
182 
183   DCHECK(job.purpose == SyncSessionJob::NUDGE ||
184       job.purpose == SyncSessionJob::CONFIGURATION);
185   if (wait_interval_->mode == WaitInterval::THROTTLED)
186     return SAVE;
187 
188   DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF);
189   if (job.purpose == SyncSessionJob::NUDGE) {
190     if (mode_ == CONFIGURATION_MODE)
191       return SAVE;
192 
193     // If we already had one nudge then just drop this nudge. We will retry
194     // later when the timer runs out.
195     return wait_interval_->had_nudge ? DROP : CONTINUE;
196   }
197   // This is a config job.
198   return job.is_canary_job ? CONTINUE : SAVE;
199 }
200 
DecideOnJob(const SyncSessionJob & job)201 SyncerThread::JobProcessDecision SyncerThread::DecideOnJob(
202     const SyncSessionJob& job) {
203   if (job.purpose == SyncSessionJob::CLEAR_USER_DATA)
204     return CONTINUE;
205 
206   if (wait_interval_.get())
207     return DecideWhileInWaitInterval(job);
208 
209   if (mode_ == CONFIGURATION_MODE) {
210     if (job.purpose == SyncSessionJob::NUDGE)
211       return SAVE;
212     else if (job.purpose == SyncSessionJob::CONFIGURATION)
213       return CONTINUE;
214     else
215       return DROP;
216   }
217 
218   // We are in normal mode.
219   DCHECK_EQ(mode_, NORMAL_MODE);
220   DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION);
221 
222   // Freshness condition
223   if (job.scheduled_start < last_sync_session_end_time_) {
224     VLOG(1) << "SyncerThread(" << this << ")"
225             << " Dropping job because of freshness";
226     return DROP;
227   }
228 
229   if (server_connection_ok_)
230     return CONTINUE;
231 
232   VLOG(1) << "SyncerThread(" << this << ")"
233           << " Bad server connection. Using that to decide on job.";
234   return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP;
235 }
236 
InitOrCoalescePendingJob(const SyncSessionJob & job)237 void SyncerThread::InitOrCoalescePendingJob(const SyncSessionJob& job) {
238   DCHECK(job.purpose != SyncSessionJob::CONFIGURATION);
239   if (pending_nudge_.get() == NULL) {
240     VLOG(1) << "SyncerThread(" << this << ")"
241             << " Creating a pending nudge job";
242     SyncSession* s = job.session.get();
243     scoped_ptr<SyncSession> session(new SyncSession(s->context(),
244         s->delegate(), s->source(), s->routing_info(), s->workers()));
245 
246     SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start,
247         make_linked_ptr(session.release()), false, job.nudge_location);
248     pending_nudge_.reset(new SyncSessionJob(new_job));
249 
250     return;
251   }
252 
253   VLOG(1) << "SyncerThread(" << this << ")" << " Coalescing a pending nudge";
254   pending_nudge_->session->Coalesce(*(job.session.get()));
255   pending_nudge_->scheduled_start = job.scheduled_start;
256 
257   // Unfortunately the nudge location cannot be modified. So it stores the
258   // location of the first caller.
259 }
260 
ShouldRunJob(const SyncSessionJob & job)261 bool SyncerThread::ShouldRunJob(const SyncSessionJob& job) {
262   JobProcessDecision decision = DecideOnJob(job);
263   VLOG(1) << "SyncerThread(" << this << ")" << " Should run job, decision: "
264           << decision << " Job purpose " << job.purpose << "mode " << mode_;
265   if (decision != SAVE)
266     return decision == CONTINUE;
267 
268   DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose ==
269       SyncSessionJob::CONFIGURATION);
270 
271   SaveJob(job);
272   return false;
273 }
274 
SaveJob(const SyncSessionJob & job)275 void SyncerThread::SaveJob(const SyncSessionJob& job) {
276   DCHECK(job.purpose != SyncSessionJob::CLEAR_USER_DATA);
277   if (job.purpose == SyncSessionJob::NUDGE) {
278     VLOG(1) << "SyncerThread(" << this << ")" << " Saving a nudge job";
279     InitOrCoalescePendingJob(job);
280   } else if (job.purpose == SyncSessionJob::CONFIGURATION){
281     VLOG(1) << "SyncerThread(" << this << ")" << " Saving a configuration job";
282     DCHECK(wait_interval_.get());
283     DCHECK(mode_ == CONFIGURATION_MODE);
284 
285     SyncSession* old = job.session.get();
286     SyncSession* s(new SyncSession(session_context_.get(), this,
287         old->source(), old->routing_info(), old->workers()));
288     SyncSessionJob new_job(job.purpose, TimeTicks::Now(),
289                           make_linked_ptr(s), false, job.nudge_location);
290     wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job));
291   } // drop the rest.
292 }
293 
294 // Functor for std::find_if to search by ModelSafeGroup.
295 struct ModelSafeWorkerGroupIs {
ModelSafeWorkerGroupIsbrowser_sync::ModelSafeWorkerGroupIs296   explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {}
operator ()browser_sync::ModelSafeWorkerGroupIs297   bool operator()(ModelSafeWorker* w) {
298     return group == w->GetModelSafeGroup();
299   }
300   ModelSafeGroup group;
301 };
302 
ScheduleClearUserData()303 void SyncerThread::ScheduleClearUserData() {
304   if (!thread_.IsRunning()) {
305     NOTREACHED();
306     return;
307   }
308   thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
309       this, &SyncerThread::ScheduleClearUserDataImpl));
310 }
311 
ScheduleNudge(const TimeDelta & delay,NudgeSource source,const ModelTypeBitSet & types,const tracked_objects::Location & nudge_location)312 void SyncerThread::ScheduleNudge(const TimeDelta& delay,
313     NudgeSource source, const ModelTypeBitSet& types,
314     const tracked_objects::Location& nudge_location) {
315   if (!thread_.IsRunning()) {
316     NOTREACHED();
317     return;
318   }
319 
320   VLOG(1) << "SyncerThread(" << this << ")" << " Nudge scheduled";
321 
322   ModelTypePayloadMap types_with_payloads =
323       syncable::ModelTypePayloadMapFromBitSet(types, std::string());
324   thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
325       this, &SyncerThread::ScheduleNudgeImpl, delay,
326       GetUpdatesFromNudgeSource(source), types_with_payloads, false,
327       nudge_location));
328 }
329 
ScheduleNudgeWithPayloads(const TimeDelta & delay,NudgeSource source,const ModelTypePayloadMap & types_with_payloads,const tracked_objects::Location & nudge_location)330 void SyncerThread::ScheduleNudgeWithPayloads(const TimeDelta& delay,
331     NudgeSource source, const ModelTypePayloadMap& types_with_payloads,
332     const tracked_objects::Location& nudge_location) {
333   if (!thread_.IsRunning()) {
334     NOTREACHED();
335     return;
336   }
337 
338   VLOG(1) << "SyncerThread(" << this << ")" << " Nudge scheduled with payloads";
339 
340   thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
341       this, &SyncerThread::ScheduleNudgeImpl, delay,
342       GetUpdatesFromNudgeSource(source), types_with_payloads, false,
343       nudge_location));
344 }
345 
ScheduleClearUserDataImpl()346 void SyncerThread::ScheduleClearUserDataImpl() {
347   DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
348   SyncSession* session = new SyncSession(session_context_.get(), this,
349       SyncSourceInfo(), ModelSafeRoutingInfo(),
350       std::vector<ModelSafeWorker*>());
351   ScheduleSyncSessionJob(TimeDelta::FromSeconds(0),
352       SyncSessionJob::CLEAR_USER_DATA, session, FROM_HERE);
353 }
354 
ScheduleNudgeImpl(const TimeDelta & delay,GetUpdatesCallerInfo::GetUpdatesSource source,const ModelTypePayloadMap & types_with_payloads,bool is_canary_job,const tracked_objects::Location & nudge_location)355 void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay,
356     GetUpdatesCallerInfo::GetUpdatesSource source,
357     const ModelTypePayloadMap& types_with_payloads,
358     bool is_canary_job, const tracked_objects::Location& nudge_location) {
359   DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
360 
361   VLOG(1) << "SyncerThread(" << this << ")" << " Running Schedule nudge impl";
362   // Note we currently nudge for all types regardless of the ones incurring
363   // the nudge.  Doing different would throw off some syncer commands like
364   // CleanupDisabledTypes.  We may want to change this in the future.
365   SyncSourceInfo info(source, types_with_payloads);
366 
367   SyncSession* session(CreateSyncSession(info));
368   SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay,
369                      make_linked_ptr(session), is_canary_job,
370                      nudge_location);
371 
372   session = NULL;
373   if (!ShouldRunJob(job))
374     return;
375 
376   if (pending_nudge_.get()) {
377     if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) {
378       VLOG(1) << "SyncerThread(" << this << ")" << " Dropping the nudge because"
379               << "we are in backoff";
380       return;
381     }
382 
383     VLOG(1) << "SyncerThread(" << this << ")" << " Coalescing pending nudge";
384     pending_nudge_->session->Coalesce(*(job.session.get()));
385 
386     if (!IsBackingOff()) {
387       VLOG(1) << "SyncerThread(" << this << ")" << " Dropping a nudge because"
388               << " we are not in backoff and the job was coalesced";
389       return;
390     } else {
391       VLOG(1) << "SyncerThread(" << this << ")"
392               << " Rescheduling pending nudge";
393       SyncSession* s = pending_nudge_->session.get();
394       job.session.reset(new SyncSession(s->context(), s->delegate(),
395           s->source(), s->routing_info(), s->workers()));
396       pending_nudge_.reset();
397     }
398   }
399 
400   // TODO(lipalani) - pass the job itself to ScheduleSyncSessionJob.
401   ScheduleSyncSessionJob(delay, SyncSessionJob::NUDGE, job.session.release(),
402       nudge_location);
403 }
404 
405 // Helper to extract the routing info and workers corresponding to types in
406 // |types| from |registrar|.
GetModelSafeParamsForTypes(const ModelTypeBitSet & types,ModelSafeWorkerRegistrar * registrar,ModelSafeRoutingInfo * routes,std::vector<ModelSafeWorker * > * workers)407 void GetModelSafeParamsForTypes(const ModelTypeBitSet& types,
408     ModelSafeWorkerRegistrar* registrar, ModelSafeRoutingInfo* routes,
409     std::vector<ModelSafeWorker*>* workers) {
410   ModelSafeRoutingInfo r_tmp;
411   std::vector<ModelSafeWorker*> w_tmp;
412   registrar->GetModelSafeRoutingInfo(&r_tmp);
413   registrar->GetWorkers(&w_tmp);
414 
415   bool passive_group_added = false;
416 
417   typedef std::vector<ModelSafeWorker*>::const_iterator iter;
418   for (size_t i = syncable::FIRST_REAL_MODEL_TYPE; i < types.size(); ++i) {
419     if (!types.test(i))
420       continue;
421     syncable::ModelType t = syncable::ModelTypeFromInt(i);
422     DCHECK_EQ(1U, r_tmp.count(t));
423     (*routes)[t] = r_tmp[t];
424     iter it = std::find_if(w_tmp.begin(), w_tmp.end(),
425                            ModelSafeWorkerGroupIs(r_tmp[t]));
426     if (it != w_tmp.end()) {
427       iter it2 = std::find_if(workers->begin(), workers->end(),
428                               ModelSafeWorkerGroupIs(r_tmp[t]));
429       if (it2 == workers->end())
430         workers->push_back(*it);
431 
432       if (r_tmp[t] == GROUP_PASSIVE)
433         passive_group_added = true;
434     } else {
435         NOTREACHED();
436     }
437   }
438 
439   // Always add group passive.
440   if (passive_group_added == false) {
441     iter it = std::find_if(w_tmp.begin(), w_tmp.end(),
442                            ModelSafeWorkerGroupIs(GROUP_PASSIVE));
443     if (it != w_tmp.end())
444       workers->push_back(*it);
445     else
446       NOTREACHED();
447   }
448 }
449 
ScheduleConfig(const ModelTypeBitSet & types)450 void SyncerThread::ScheduleConfig(const ModelTypeBitSet& types) {
451   if (!thread_.IsRunning()) {
452     NOTREACHED();
453     return;
454   }
455 
456   VLOG(1) << "SyncerThread(" << this << ")" << " Scheduling a config";
457   ModelSafeRoutingInfo routes;
458   std::vector<ModelSafeWorker*> workers;
459   GetModelSafeParamsForTypes(types, session_context_->registrar(),
460                              &routes, &workers);
461 
462   thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
463       this, &SyncerThread::ScheduleConfigImpl, routes, workers,
464       GetUpdatesCallerInfo::FIRST_UPDATE));
465 }
466 
ScheduleConfigImpl(const ModelSafeRoutingInfo & routing_info,const std::vector<ModelSafeWorker * > & workers,const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source)467 void SyncerThread::ScheduleConfigImpl(const ModelSafeRoutingInfo& routing_info,
468     const std::vector<ModelSafeWorker*>& workers,
469     const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source) {
470   DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
471 
472   VLOG(1) << "SyncerThread(" << this << ")" << " ScheduleConfigImpl...";
473   // TODO(tim): config-specific GetUpdatesCallerInfo value?
474   SyncSession* session = new SyncSession(session_context_.get(), this,
475       SyncSourceInfo(source,
476           syncable::ModelTypePayloadMapFromRoutingInfo(
477               routing_info, std::string())),
478       routing_info, workers);
479   ScheduleSyncSessionJob(TimeDelta::FromSeconds(0),
480     SyncSessionJob::CONFIGURATION, session, FROM_HERE);
481 }
482 
ScheduleSyncSessionJob(const base::TimeDelta & delay,SyncSessionJob::SyncSessionJobPurpose purpose,sessions::SyncSession * session,const tracked_objects::Location & nudge_location)483 void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay,
484     SyncSessionJob::SyncSessionJobPurpose purpose,
485     sessions::SyncSession* session,
486     const tracked_objects::Location& nudge_location) {
487   DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
488 
489   SyncSessionJob job(purpose, TimeTicks::Now() + delay,
490                         make_linked_ptr(session), false, nudge_location);
491   if (purpose == SyncSessionJob::NUDGE) {
492     VLOG(1) << "SyncerThread(" << this << ")" << " Resetting pending_nudge in"
493             << " ScheduleSyncSessionJob";
494     DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == session);
495     pending_nudge_.reset(new SyncSessionJob(job));
496   }
497   VLOG(1) << "SyncerThread(" << this << ")"
498           << " Posting job to execute in DoSyncSessionJob. Job purpose "
499           << job.purpose;
500   MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod(this,
501       &SyncerThread::DoSyncSessionJob, job),
502       delay.InMilliseconds());
503 }
504 
SetSyncerStepsForPurpose(SyncSessionJob::SyncSessionJobPurpose purpose,SyncerStep * start,SyncerStep * end)505 void SyncerThread::SetSyncerStepsForPurpose(
506     SyncSessionJob::SyncSessionJobPurpose purpose,
507     SyncerStep* start, SyncerStep* end) {
508   *end = SYNCER_END;
509   switch (purpose) {
510     case SyncSessionJob::CONFIGURATION:
511       *start = DOWNLOAD_UPDATES;
512       *end = APPLY_UPDATES;
513       return;
514     case SyncSessionJob::CLEAR_USER_DATA:
515       *start = CLEAR_PRIVATE_DATA;
516        return;
517     case SyncSessionJob::NUDGE:
518     case SyncSessionJob::POLL:
519       *start = SYNCER_BEGIN;
520       return;
521     default:
522       NOTREACHED();
523   }
524 }
525 
DoSyncSessionJob(const SyncSessionJob & job)526 void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) {
527   DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
528   if (!ShouldRunJob(job)) {
529     LOG(WARNING) << "Dropping nudge at DoSyncSessionJob, source = "
530         << job.session->source().updates_source;
531     return;
532   }
533 
534   if (job.purpose == SyncSessionJob::NUDGE) {
535     if (pending_nudge_.get() == NULL || pending_nudge_->session != job.session)
536       return;  // Another nudge must have been scheduled in in the meantime.
537     pending_nudge_.reset();
538   }
539   VLOG(1) << "SyncerThread(" << this << ")" << " DoSyncSessionJob. job purpose "
540           << job.purpose;
541 
542   SyncerStep begin(SYNCER_BEGIN);
543   SyncerStep end(SYNCER_END);
544   SetSyncerStepsForPurpose(job.purpose, &begin, &end);
545 
546   bool has_more_to_sync = true;
547   while (ShouldRunJob(job) && has_more_to_sync) {
548     VLOG(1) << "SyncerThread(" << this << ")"
549             << " SyncerThread: Calling SyncShare.";
550     // Synchronously perform the sync session from this thread.
551     syncer_->SyncShare(job.session.get(), begin, end);
552     has_more_to_sync = job.session->HasMoreToSync();
553     if (has_more_to_sync)
554       job.session->ResetTransientState();
555   }
556   VLOG(1) << "SyncerThread(" << this << ")"
557           << " SyncerThread: Done SyncShare looping.";
558   FinishSyncSessionJob(job);
559 }
560 
UpdateCarryoverSessionState(const SyncSessionJob & old_job)561 void SyncerThread::UpdateCarryoverSessionState(const SyncSessionJob& old_job) {
562   if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
563     // Whatever types were part of a configuration task will have had updates
564     // downloaded.  For that reason, we make sure they get recorded in the
565     // event that they get disabled at a later time.
566     ModelSafeRoutingInfo r(session_context_->previous_session_routing_info());
567     if (!r.empty()) {
568       ModelSafeRoutingInfo temp_r;
569       ModelSafeRoutingInfo old_info(old_job.session->routing_info());
570       std::set_union(r.begin(), r.end(), old_info.begin(), old_info.end(),
571           std::insert_iterator<ModelSafeRoutingInfo>(temp_r, temp_r.begin()));
572       session_context_->set_previous_session_routing_info(temp_r);
573     }
574   } else {
575     session_context_->set_previous_session_routing_info(
576         old_job.session->routing_info());
577   }
578 }
579 
FinishSyncSessionJob(const SyncSessionJob & job)580 void SyncerThread::FinishSyncSessionJob(const SyncSessionJob& job) {
581   DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
582   // Update timing information for how often datatypes are triggering nudges.
583   base::TimeTicks now = TimeTicks::Now();
584   if (!last_sync_session_end_time_.is_null()) {
585     ModelTypePayloadMap::const_iterator iter;
586     for (iter = job.session->source().types.begin();
587          iter != job.session->source().types.end();
588          ++iter) {
589       syncable::PostTimeToTypeHistogram(iter->first,
590                                         now - last_sync_session_end_time_);
591     }
592   }
593   last_sync_session_end_time_ = now;
594   UpdateCarryoverSessionState(job);
595   if (IsSyncingCurrentlySilenced()) {
596     VLOG(1) << "SyncerThread(" << this << ")"
597             << " We are currently throttled. So not scheduling the next sync.";
598     SaveJob(job);
599     return;  // Nothing to do.
600   }
601 
602   VLOG(1) << "SyncerThread(" << this << ")"
603           << " Updating the next polling time after SyncMain";
604   ScheduleNextSync(job);
605 }
606 
ScheduleNextSync(const SyncSessionJob & old_job)607 void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) {
608   DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
609   DCHECK(!old_job.session->HasMoreToSync());
610   // Note: |num_server_changes_remaining| > 0 here implies that we received a
611   // broken response while trying to download all updates, because the Syncer
612   // will loop until this value is exhausted. Also, if unsynced_handles exist
613   // but HasMoreToSync is false, this implies that the Syncer determined no
614   // forward progress was possible at this time (an error, such as an HTTP
615   // 500, is likely to have occurred during commit).
616   const bool work_to_do =
617      old_job.session->status_controller()->num_server_changes_remaining() > 0
618      || old_job.session->status_controller()->unsynced_handles().size() > 0;
619   VLOG(1) << "SyncerThread(" << this << ")" << " syncer has work to do: "
620           << work_to_do;
621 
622   AdjustPolling(&old_job);
623 
624   // TODO(tim): Old impl had special code if notifications disabled. Needed?
625   if (!work_to_do) {
626     // Success implies backoff relief.  Note that if this was a "one-off" job
627     // (i.e. purpose == SyncSessionJob::CLEAR_USER_DATA), if there was
628     // work_to_do before it ran this wont have changed, as jobs like this don't
629     // run a full sync cycle.  So we don't need special code here.
630     wait_interval_.reset();
631     VLOG(1) << "SyncerThread(" << this << ")"
632             << " Job suceeded so not scheduling more jobs";
633     return;
634   }
635 
636   if (old_job.session->source().updates_source ==
637       GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION) {
638     VLOG(1) << "SyncerThread(" << this << ")"
639             << " Job failed with source continuation";
640     // We don't seem to have made forward progress. Start or extend backoff.
641     HandleConsecutiveContinuationError(old_job);
642   } else if (IsBackingOff()) {
643     VLOG(1) << "SyncerThread(" << this << ")"
644             << " A nudge during backoff failed";
645     // We weren't continuing but we're in backoff; must have been a nudge.
646     DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose);
647     DCHECK(!wait_interval_->had_nudge);
648     wait_interval_->had_nudge = true;
649     wait_interval_->timer.Reset();
650   } else {
651     VLOG(1) << "SyncerThread(" << this << ")"
652             << " Failed. Schedule a job with continuation as source";
653     // We weren't continuing and we aren't in backoff.  Schedule a normal
654     // continuation.
655     if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
656       ScheduleConfigImpl(old_job.session->routing_info(),
657           old_job.session->workers(),
658           GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION));
659     } else  {
660       // For all other purposes(nudge and poll) we schedule a retry nudge.
661       ScheduleNudgeImpl(TimeDelta::FromSeconds(0),
662                         GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION),
663                         old_job.session->source().types, false, FROM_HERE);
664     }
665   }
666 }
667 
AdjustPolling(const SyncSessionJob * old_job)668 void SyncerThread::AdjustPolling(const SyncSessionJob* old_job) {
669   DCHECK(thread_.IsRunning());
670   DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
671 
672   TimeDelta poll  = (!session_context_->notifications_enabled()) ?
673       syncer_short_poll_interval_seconds_ :
674       syncer_long_poll_interval_seconds_;
675   bool rate_changed = !poll_timer_.IsRunning() ||
676                        poll != poll_timer_.GetCurrentDelay();
677 
678   if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed)
679     poll_timer_.Reset();
680 
681   if (!rate_changed)
682     return;
683 
684   // Adjust poll rate.
685   poll_timer_.Stop();
686   poll_timer_.Start(poll, this, &SyncerThread::PollTimerCallback);
687 }
688 
HandleConsecutiveContinuationError(const SyncSessionJob & old_job)689 void SyncerThread::HandleConsecutiveContinuationError(
690     const SyncSessionJob& old_job) {
691   DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
692   // This if conditions should be compiled out in retail builds.
693   if (IsBackingOff()) {
694     DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job);
695   }
696   SyncSession* old = old_job.session.get();
697   SyncSession* s(new SyncSession(session_context_.get(), this,
698       old->source(), old->routing_info(), old->workers()));
699   TimeDelta length = delay_provider_->GetDelay(
700       IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1));
701 
702   VLOG(1) << "SyncerThread(" << this << ")"
703           << " In handle continuation error. Old job purpose is "
704           << old_job.purpose;
705   VLOG(1) << "SyncerThread(" << this << ")"
706     << " In Handle continuation error. The time delta(ms) is: "
707           << length.InMilliseconds();
708 
709   // This will reset the had_nudge variable as well.
710   wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF,
711                                         length));
712   if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
713     SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length,
714                         make_linked_ptr(s), false, FROM_HERE);
715     wait_interval_->pending_configure_job.reset(new SyncSessionJob(job));
716   } else {
717     // We are not in configuration mode. So wait_interval's pending job
718     // should be null.
719     DCHECK(wait_interval_->pending_configure_job.get() == NULL);
720 
721     // TODO(lipalani) - handle clear user data.
722     InitOrCoalescePendingJob(old_job);
723   }
724   wait_interval_->timer.Start(length, this, &SyncerThread::DoCanaryJob);
725 }
726 
727 // static
GetRecommendedDelay(const TimeDelta & last_delay)728 TimeDelta SyncerThread::GetRecommendedDelay(const TimeDelta& last_delay) {
729   if (last_delay.InSeconds() >= kMaxBackoffSeconds)
730     return TimeDelta::FromSeconds(kMaxBackoffSeconds);
731 
732   // This calculates approx. base_delay_seconds * 2 +/- base_delay_seconds / 2
733   int64 backoff_s =
734       std::max(static_cast<int64>(1),
735                last_delay.InSeconds() * kBackoffRandomizationFactor);
736 
737   // Flip a coin to randomize backoff interval by +/- 50%.
738   int rand_sign = base::RandInt(0, 1) * 2 - 1;
739 
740   // Truncation is adequate for rounding here.
741   backoff_s = backoff_s +
742       (rand_sign * (last_delay.InSeconds() / kBackoffRandomizationFactor));
743 
744   // Cap the backoff interval.
745   backoff_s = std::max(static_cast<int64>(1),
746                        std::min(backoff_s, kMaxBackoffSeconds));
747 
748   return TimeDelta::FromSeconds(backoff_s);
749 }
750 
Stop()751 void SyncerThread::Stop() {
752   VLOG(1) << "SyncerThread(" << this << ")" << " stop called";
753   syncer_->RequestEarlyExit();  // Safe to call from any thread.
754   session_context_->connection_manager()->RemoveListener(this);
755   thread_.Stop();
756 }
757 
DoCanaryJob()758 void SyncerThread::DoCanaryJob() {
759   VLOG(1) << "SyncerThread(" << this << ")" << " Do canary job";
760   DoPendingJobIfPossible(true);
761 }
762 
DoPendingJobIfPossible(bool is_canary_job)763 void SyncerThread::DoPendingJobIfPossible(bool is_canary_job) {
764   SyncSessionJob* job_to_execute = NULL;
765   if (mode_ == CONFIGURATION_MODE && wait_interval_.get()
766       && wait_interval_->pending_configure_job.get()) {
767     VLOG(1) << "SyncerThread(" << this << ")" << " Found pending configure job";
768     job_to_execute = wait_interval_->pending_configure_job.get();
769   } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) {
770     VLOG(1) << "SyncerThread(" << this << ")" << " Found pending nudge job";
771     // Pending jobs mostly have time from the past. Reset it so this job
772     // will get executed.
773     if (pending_nudge_->scheduled_start < TimeTicks::Now())
774       pending_nudge_->scheduled_start = TimeTicks::Now();
775 
776     scoped_ptr<SyncSession> session(CreateSyncSession(
777         pending_nudge_->session->source()));
778 
779     // Also the routing info might have been changed since we cached the
780     // pending nudge. Update it by coalescing to the latest.
781     pending_nudge_->session->Coalesce(*(session.get()));
782     // The pending nudge would be cleared in the DoSyncSessionJob function.
783     job_to_execute = pending_nudge_.get();
784   }
785 
786   if (job_to_execute != NULL) {
787     VLOG(1) << "SyncerThread(" << this << ")" << " Executing pending job";
788     SyncSessionJob copy = *job_to_execute;
789     copy.is_canary_job = is_canary_job;
790     DoSyncSessionJob(copy);
791   }
792 }
793 
CreateSyncSession(const SyncSourceInfo & source)794 SyncSession* SyncerThread::CreateSyncSession(const SyncSourceInfo& source) {
795   ModelSafeRoutingInfo routes;
796   std::vector<ModelSafeWorker*> workers;
797   session_context_->registrar()->GetModelSafeRoutingInfo(&routes);
798   session_context_->registrar()->GetWorkers(&workers);
799   SyncSourceInfo info(source);
800 
801   SyncSession* session(new SyncSession(session_context_.get(), this, info,
802       routes, workers));
803 
804   return session;
805 }
806 
PollTimerCallback()807 void SyncerThread::PollTimerCallback() {
808   DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
809   ModelSafeRoutingInfo r;
810   ModelTypePayloadMap types_with_payloads =
811       syncable::ModelTypePayloadMapFromRoutingInfo(r, std::string());
812   SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads);
813   SyncSession* s = CreateSyncSession(info);
814   ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), SyncSessionJob::POLL, s,
815       FROM_HERE);
816 }
817 
Unthrottle()818 void SyncerThread::Unthrottle() {
819   DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
820   VLOG(1) << "SyncerThread(" << this << ")" << " Unthrottled..";
821   DoCanaryJob();
822   wait_interval_.reset();
823 }
824 
Notify(SyncEngineEvent::EventCause cause)825 void SyncerThread::Notify(SyncEngineEvent::EventCause cause) {
826   DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
827   session_context_->NotifyListeners(SyncEngineEvent(cause));
828 }
829 
IsBackingOff() const830 bool SyncerThread::IsBackingOff() const {
831   return wait_interval_.get() && wait_interval_->mode ==
832       WaitInterval::EXPONENTIAL_BACKOFF;
833 }
834 
OnSilencedUntil(const base::TimeTicks & silenced_until)835 void SyncerThread::OnSilencedUntil(const base::TimeTicks& silenced_until) {
836   wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED,
837                                         silenced_until - TimeTicks::Now()));
838   wait_interval_->timer.Start(wait_interval_->length, this,
839       &SyncerThread::Unthrottle);
840 }
841 
IsSyncingCurrentlySilenced()842 bool SyncerThread::IsSyncingCurrentlySilenced() {
843   return wait_interval_.get() && wait_interval_->mode ==
844       WaitInterval::THROTTLED;
845 }
846 
OnReceivedShortPollIntervalUpdate(const base::TimeDelta & new_interval)847 void SyncerThread::OnReceivedShortPollIntervalUpdate(
848     const base::TimeDelta& new_interval) {
849   DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
850   syncer_short_poll_interval_seconds_ = new_interval;
851 }
852 
OnReceivedLongPollIntervalUpdate(const base::TimeDelta & new_interval)853 void SyncerThread::OnReceivedLongPollIntervalUpdate(
854     const base::TimeDelta& new_interval) {
855   DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
856   syncer_long_poll_interval_seconds_ = new_interval;
857 }
858 
OnShouldStopSyncingPermanently()859 void SyncerThread::OnShouldStopSyncingPermanently() {
860   VLOG(1) << "SyncerThread(" << this << ")"
861           << " OnShouldStopSyncingPermanently";
862   syncer_->RequestEarlyExit();  // Thread-safe.
863   Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY);
864 }
865 
OnServerConnectionEvent(const ServerConnectionEvent2 & event)866 void SyncerThread::OnServerConnectionEvent(
867     const ServerConnectionEvent2& event) {
868   thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this,
869       &SyncerThread::CheckServerConnectionManagerStatus,
870       event.connection_code));
871 }
872 
set_notifications_enabled(bool notifications_enabled)873 void SyncerThread::set_notifications_enabled(bool notifications_enabled) {
874   session_context_->set_notifications_enabled(notifications_enabled);
875 }
876 
877 }  // browser_sync
878