• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "sync/engine/sync_scheduler_impl.h"
6 
7 #include <algorithm>
8 #include <cstring>
9 
10 #include "base/auto_reset.h"
11 #include "base/bind.h"
12 #include "base/bind_helpers.h"
13 #include "base/compiler_specific.h"
14 #include "base/location.h"
15 #include "base/logging.h"
16 #include "base/message_loop/message_loop.h"
17 #include "sync/engine/backoff_delay_provider.h"
18 #include "sync/engine/syncer.h"
19 #include "sync/notifier/object_id_invalidation_map.h"
20 #include "sync/protocol/proto_enum_conversions.h"
21 #include "sync/protocol/sync.pb.h"
22 #include "sync/util/data_type_histogram.h"
23 #include "sync/util/logging.h"
24 
25 using base::TimeDelta;
26 using base::TimeTicks;
27 
28 namespace syncer {
29 
30 using sessions::SyncSession;
31 using sessions::SyncSessionSnapshot;
32 using sync_pb::GetUpdatesCallerInfo;
33 
34 namespace {
35 
ShouldRequestEarlyExit(const SyncProtocolError & error)36 bool ShouldRequestEarlyExit(const SyncProtocolError& error) {
37   switch (error.error_type) {
38     case SYNC_SUCCESS:
39     case MIGRATION_DONE:
40     case THROTTLED:
41     case TRANSIENT_ERROR:
42       return false;
43     case NOT_MY_BIRTHDAY:
44     case CLEAR_PENDING:
45     case DISABLED_BY_ADMIN:
46     case USER_ROLLBACK:
47       // If we send terminate sync early then |sync_cycle_ended| notification
48       // would not be sent. If there were no actions then |ACTIONABLE_ERROR|
49       // notification wouldnt be sent either. Then the UI layer would be left
50       // waiting forever. So assert we would send something.
51       DCHECK_NE(error.action, UNKNOWN_ACTION);
52       return true;
53     case INVALID_CREDENTIAL:
54       // The notification for this is handled by PostAndProcessHeaders|.
55       // Server does no have to send any action for this.
56       return true;
57     // Make the default a NOTREACHED. So if a new error is introduced we
58     // think about its expected functionality.
59     default:
60       NOTREACHED();
61       return false;
62   }
63 }
64 
IsActionableError(const SyncProtocolError & error)65 bool IsActionableError(
66     const SyncProtocolError& error) {
67   return (error.action != UNKNOWN_ACTION);
68 }
69 }  // namespace
70 
ConfigurationParams()71 ConfigurationParams::ConfigurationParams()
72     : source(GetUpdatesCallerInfo::UNKNOWN) {}
ConfigurationParams(const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource & source,ModelTypeSet types_to_download,const ModelSafeRoutingInfo & routing_info,const base::Closure & ready_task,const base::Closure & retry_task)73 ConfigurationParams::ConfigurationParams(
74     const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource& source,
75     ModelTypeSet types_to_download,
76     const ModelSafeRoutingInfo& routing_info,
77     const base::Closure& ready_task,
78     const base::Closure& retry_task)
79     : source(source),
80       types_to_download(types_to_download),
81       routing_info(routing_info),
82       ready_task(ready_task),
83       retry_task(retry_task) {
84   DCHECK(!ready_task.is_null());
85   DCHECK(!retry_task.is_null());
86 }
~ConfigurationParams()87 ConfigurationParams::~ConfigurationParams() {}
88 
WaitInterval()89 SyncSchedulerImpl::WaitInterval::WaitInterval()
90     : mode(UNKNOWN) {}
91 
WaitInterval(Mode mode,TimeDelta length)92 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
93     : mode(mode), length(length) {}
94 
~WaitInterval()95 SyncSchedulerImpl::WaitInterval::~WaitInterval() {}
96 
97 #define ENUM_CASE(x) case x: return #x; break;
98 
GetModeString(Mode mode)99 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) {
100   switch (mode) {
101     ENUM_CASE(UNKNOWN);
102     ENUM_CASE(EXPONENTIAL_BACKOFF);
103     ENUM_CASE(THROTTLED);
104   }
105   NOTREACHED();
106   return "";
107 }
108 
GetUpdatesFromNudgeSource(NudgeSource source)109 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource(
110     NudgeSource source) {
111   switch (source) {
112     case NUDGE_SOURCE_NOTIFICATION:
113       return GetUpdatesCallerInfo::NOTIFICATION;
114     case NUDGE_SOURCE_LOCAL:
115       return GetUpdatesCallerInfo::LOCAL;
116     case NUDGE_SOURCE_LOCAL_REFRESH:
117       return GetUpdatesCallerInfo::DATATYPE_REFRESH;
118     case NUDGE_SOURCE_UNKNOWN:
119       return GetUpdatesCallerInfo::UNKNOWN;
120     default:
121       NOTREACHED();
122       return GetUpdatesCallerInfo::UNKNOWN;
123   }
124 }
125 
126 // Helper macros to log with the syncer thread name; useful when there
127 // are multiple syncer threads involved.
128 
129 #define SLOG(severity) LOG(severity) << name_ << ": "
130 
131 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": "
132 
133 #define SDVLOG_LOC(from_here, verbose_level)             \
134   DVLOG_LOC(from_here, verbose_level) << name_ << ": "
135 
136 namespace {
137 
138 const int kDefaultSessionsCommitDelaySeconds = 10;
139 
IsConfigRelatedUpdateSourceValue(GetUpdatesCallerInfo::GetUpdatesSource source)140 bool IsConfigRelatedUpdateSourceValue(
141     GetUpdatesCallerInfo::GetUpdatesSource source) {
142   switch (source) {
143     case GetUpdatesCallerInfo::RECONFIGURATION:
144     case GetUpdatesCallerInfo::MIGRATION:
145     case GetUpdatesCallerInfo::NEW_CLIENT:
146     case GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE:
147       return true;
148     default:
149       return false;
150   }
151 }
152 
153 }  // namespace
154 
SyncSchedulerImpl(const std::string & name,BackoffDelayProvider * delay_provider,sessions::SyncSessionContext * context,Syncer * syncer)155 SyncSchedulerImpl::SyncSchedulerImpl(const std::string& name,
156                                      BackoffDelayProvider* delay_provider,
157                                      sessions::SyncSessionContext* context,
158                                      Syncer* syncer)
159     : name_(name),
160       started_(false),
161       syncer_short_poll_interval_seconds_(
162           TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)),
163       syncer_long_poll_interval_seconds_(
164           TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)),
165       sessions_commit_delay_(
166           TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)),
167       mode_(NORMAL_MODE),
168       delay_provider_(delay_provider),
169       syncer_(syncer),
170       session_context_(context),
171       no_scheduling_allowed_(false),
172       do_poll_after_credentials_updated_(false),
173       next_sync_session_job_priority_(NORMAL_PRIORITY),
174       weak_ptr_factory_(this),
175       weak_ptr_factory_for_weak_handle_(this) {
176   weak_handle_this_ = MakeWeakHandle(
177       weak_ptr_factory_for_weak_handle_.GetWeakPtr());
178 }
179 
~SyncSchedulerImpl()180 SyncSchedulerImpl::~SyncSchedulerImpl() {
181   DCHECK(CalledOnValidThread());
182   Stop();
183 }
184 
OnCredentialsUpdated()185 void SyncSchedulerImpl::OnCredentialsUpdated() {
186   DCHECK(CalledOnValidThread());
187 
188   if (HttpResponse::SYNC_AUTH_ERROR ==
189       session_context_->connection_manager()->server_status()) {
190     OnServerConnectionErrorFixed();
191   }
192 }
193 
OnConnectionStatusChange()194 void SyncSchedulerImpl::OnConnectionStatusChange() {
195   if (HttpResponse::CONNECTION_UNAVAILABLE  ==
196       session_context_->connection_manager()->server_status()) {
197     // Optimistically assume that the connection is fixed and try
198     // connecting.
199     OnServerConnectionErrorFixed();
200   }
201 }
202 
OnServerConnectionErrorFixed()203 void SyncSchedulerImpl::OnServerConnectionErrorFixed() {
204   // There could be a pending nudge or configuration job in several cases:
205   //
206   // 1. We're in exponential backoff.
207   // 2. We're silenced / throttled.
208   // 3. A nudge was saved previously due to not having a valid auth token.
209   // 4. A nudge was scheduled + saved while in configuration mode.
210   //
211   // In all cases except (2), we want to retry contacting the server. We
212   // call TryCanaryJob to achieve this, and note that nothing -- not even a
213   // canary job -- can bypass a THROTTLED WaitInterval. The only thing that
214   // has the authority to do that is the Unthrottle timer.
215   TryCanaryJob();
216 }
217 
Start(Mode mode)218 void SyncSchedulerImpl::Start(Mode mode) {
219   DCHECK(CalledOnValidThread());
220   std::string thread_name = base::MessageLoop::current()->thread_name();
221   if (thread_name.empty())
222     thread_name = "<Main thread>";
223   SDVLOG(2) << "Start called from thread "
224             << thread_name << " with mode " << GetModeString(mode);
225   if (!started_) {
226     started_ = true;
227     SendInitialSnapshot();
228   }
229 
230   DCHECK(!session_context_->account_name().empty());
231   DCHECK(syncer_.get());
232   Mode old_mode = mode_;
233   mode_ = mode;
234   AdjustPolling(UPDATE_INTERVAL);  // Will kick start poll timer if needed.
235 
236   if (old_mode != mode_ && mode_ == NORMAL_MODE) {
237     // We just got back to normal mode.  Let's try to run the work that was
238     // queued up while we were configuring.
239 
240     // Update our current time before checking IsRetryRequired().
241     nudge_tracker_.SetSyncCycleStartTime(base::TimeTicks::Now());
242     if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY)) {
243       TrySyncSessionJob();
244     }
245   }
246 }
247 
GetEnabledAndUnthrottledTypes()248 ModelTypeSet SyncSchedulerImpl::GetEnabledAndUnthrottledTypes() {
249   ModelTypeSet enabled_types = session_context_->GetEnabledTypes();
250   ModelTypeSet enabled_protocol_types =
251       Intersection(ProtocolTypes(), enabled_types);
252   ModelTypeSet throttled_types = nudge_tracker_.GetThrottledTypes();
253   return Difference(enabled_protocol_types, throttled_types);
254 }
255 
SendInitialSnapshot()256 void SyncSchedulerImpl::SendInitialSnapshot() {
257   DCHECK(CalledOnValidThread());
258   scoped_ptr<SyncSession> dummy(SyncSession::Build(session_context_, this));
259   SyncCycleEvent event(SyncCycleEvent::STATUS_CHANGED);
260   event.snapshot = dummy->TakeSnapshot();
261   FOR_EACH_OBSERVER(SyncEngineEventListener,
262                     *session_context_->listeners(),
263                     OnSyncCycleEvent(event));
264 }
265 
266 namespace {
267 
268 // Helper to extract the routing info corresponding to types in
269 // |types_to_download| from |current_routes|.
BuildModelSafeParams(ModelTypeSet types_to_download,const ModelSafeRoutingInfo & current_routes,ModelSafeRoutingInfo * result_routes)270 void BuildModelSafeParams(
271     ModelTypeSet types_to_download,
272     const ModelSafeRoutingInfo& current_routes,
273     ModelSafeRoutingInfo* result_routes) {
274   for (ModelTypeSet::Iterator iter = types_to_download.First(); iter.Good();
275        iter.Inc()) {
276     ModelType type = iter.Get();
277     ModelSafeRoutingInfo::const_iterator route = current_routes.find(type);
278     DCHECK(route != current_routes.end());
279     ModelSafeGroup group = route->second;
280     (*result_routes)[type] = group;
281   }
282 }
283 
284 }  // namespace.
285 
ScheduleConfiguration(const ConfigurationParams & params)286 void SyncSchedulerImpl::ScheduleConfiguration(
287     const ConfigurationParams& params) {
288   DCHECK(CalledOnValidThread());
289   DCHECK(IsConfigRelatedUpdateSourceValue(params.source));
290   DCHECK_EQ(CONFIGURATION_MODE, mode_);
291   DCHECK(!params.ready_task.is_null());
292   CHECK(started_) << "Scheduler must be running to configure.";
293   SDVLOG(2) << "Reconfiguring syncer.";
294 
295   // Only one configuration is allowed at a time. Verify we're not waiting
296   // for a pending configure job.
297   DCHECK(!pending_configure_params_);
298 
299   ModelSafeRoutingInfo restricted_routes;
300   BuildModelSafeParams(params.types_to_download,
301                        params.routing_info,
302                        &restricted_routes);
303   session_context_->SetRoutingInfo(restricted_routes);
304 
305   // Only reconfigure if we have types to download.
306   if (!params.types_to_download.Empty()) {
307     pending_configure_params_.reset(new ConfigurationParams(params));
308     TrySyncSessionJob();
309   } else {
310     SDVLOG(2) << "No change in routing info, calling ready task directly.";
311     params.ready_task.Run();
312   }
313 }
314 
CanRunJobNow(JobPriority priority)315 bool SyncSchedulerImpl::CanRunJobNow(JobPriority priority) {
316   DCHECK(CalledOnValidThread());
317   if (wait_interval_ && wait_interval_->mode == WaitInterval::THROTTLED) {
318     SDVLOG(1) << "Unable to run a job because we're throttled.";
319     return false;
320   }
321 
322   if (wait_interval_
323       && wait_interval_->mode == WaitInterval::EXPONENTIAL_BACKOFF
324       && priority != CANARY_PRIORITY) {
325     SDVLOG(1) << "Unable to run a job because we're backing off.";
326     return false;
327   }
328 
329   if (session_context_->connection_manager()->HasInvalidAuthToken()) {
330     SDVLOG(1) << "Unable to run a job because we have no valid auth token.";
331     return false;
332   }
333 
334   return true;
335 }
336 
CanRunNudgeJobNow(JobPriority priority)337 bool SyncSchedulerImpl::CanRunNudgeJobNow(JobPriority priority) {
338   DCHECK(CalledOnValidThread());
339 
340   if (!CanRunJobNow(priority)) {
341     SDVLOG(1) << "Unable to run a nudge job right now";
342     return false;
343   }
344 
345   const ModelTypeSet enabled_types = session_context_->GetEnabledTypes();
346   if (nudge_tracker_.GetThrottledTypes().HasAll(enabled_types)) {
347     SDVLOG(1) << "Not running a nudge because we're fully type throttled.";
348     return false;
349   }
350 
351   if (mode_ == CONFIGURATION_MODE) {
352     SDVLOG(1) << "Not running nudge because we're in configuration mode.";
353     return false;
354   }
355 
356   return true;
357 }
358 
ScheduleLocalNudge(const TimeDelta & desired_delay,ModelTypeSet types,const tracked_objects::Location & nudge_location)359 void SyncSchedulerImpl::ScheduleLocalNudge(
360     const TimeDelta& desired_delay,
361     ModelTypeSet types,
362     const tracked_objects::Location& nudge_location) {
363   DCHECK(CalledOnValidThread());
364   DCHECK(!types.Empty());
365 
366   SDVLOG_LOC(nudge_location, 2)
367       << "Scheduling sync because of local change to "
368       << ModelTypeSetToString(types);
369   UpdateNudgeTimeRecords(types);
370   nudge_tracker_.RecordLocalChange(types);
371   ScheduleNudgeImpl(desired_delay, nudge_location);
372 }
373 
ScheduleLocalRefreshRequest(const TimeDelta & desired_delay,ModelTypeSet types,const tracked_objects::Location & nudge_location)374 void SyncSchedulerImpl::ScheduleLocalRefreshRequest(
375     const TimeDelta& desired_delay,
376     ModelTypeSet types,
377     const tracked_objects::Location& nudge_location) {
378   DCHECK(CalledOnValidThread());
379   DCHECK(!types.Empty());
380 
381   SDVLOG_LOC(nudge_location, 2)
382       << "Scheduling sync because of local refresh request for "
383       << ModelTypeSetToString(types);
384   nudge_tracker_.RecordLocalRefreshRequest(types);
385   ScheduleNudgeImpl(desired_delay, nudge_location);
386 }
387 
ScheduleInvalidationNudge(const TimeDelta & desired_delay,const ObjectIdInvalidationMap & invalidation_map,const tracked_objects::Location & nudge_location)388 void SyncSchedulerImpl::ScheduleInvalidationNudge(
389     const TimeDelta& desired_delay,
390     const ObjectIdInvalidationMap& invalidation_map,
391     const tracked_objects::Location& nudge_location) {
392   DCHECK(CalledOnValidThread());
393   DCHECK(!invalidation_map.Empty());
394 
395   SDVLOG_LOC(nudge_location, 2)
396       << "Scheduling sync because we received invalidation for "
397       << ModelTypeSetToString(
398           ObjectIdSetToModelTypeSet(invalidation_map.GetObjectIds()));
399   nudge_tracker_.RecordRemoteInvalidation(invalidation_map);
400   ScheduleNudgeImpl(desired_delay, nudge_location);
401 }
402 
403 // TODO(zea): Consider adding separate throttling/backoff for datatype
404 // refresh requests.
ScheduleNudgeImpl(const TimeDelta & delay,const tracked_objects::Location & nudge_location)405 void SyncSchedulerImpl::ScheduleNudgeImpl(
406     const TimeDelta& delay,
407     const tracked_objects::Location& nudge_location) {
408   DCHECK(CalledOnValidThread());
409 
410   if (no_scheduling_allowed_) {
411     NOTREACHED() << "Illegal to schedule job while session in progress.";
412     return;
413   }
414 
415   if (!started_) {
416     SDVLOG_LOC(nudge_location, 2)
417         << "Dropping nudge, scheduler is not running.";
418     return;
419   }
420 
421   SDVLOG_LOC(nudge_location, 2)
422       << "In ScheduleNudgeImpl with delay "
423       << delay.InMilliseconds() << " ms";
424 
425   if (!CanRunNudgeJobNow(NORMAL_PRIORITY))
426     return;
427 
428   TimeTicks incoming_run_time = TimeTicks::Now() + delay;
429   if (!scheduled_nudge_time_.is_null() &&
430     (scheduled_nudge_time_ < incoming_run_time)) {
431     // Old job arrives sooner than this one.  Don't reschedule it.
432     return;
433   }
434 
435   // Either there is no existing nudge in flight or the incoming nudge should be
436   // made to arrive first (preempt) the existing nudge.  We reschedule in either
437   // case.
438   SDVLOG_LOC(nudge_location, 2)
439       << "Scheduling a nudge with "
440       << delay.InMilliseconds() << " ms delay";
441   scheduled_nudge_time_ = incoming_run_time;
442   pending_wakeup_timer_.Start(
443       nudge_location,
444       delay,
445       base::Bind(&SyncSchedulerImpl::PerformDelayedNudge,
446                  weak_ptr_factory_.GetWeakPtr()));
447 }
448 
GetModeString(SyncScheduler::Mode mode)449 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) {
450   switch (mode) {
451     ENUM_CASE(CONFIGURATION_MODE);
452     ENUM_CASE(NORMAL_MODE);
453   }
454   return "";
455 }
456 
DoNudgeSyncSessionJob(JobPriority priority)457 void SyncSchedulerImpl::DoNudgeSyncSessionJob(JobPriority priority) {
458   DCHECK(CalledOnValidThread());
459   DCHECK(CanRunNudgeJobNow(priority));
460 
461   DVLOG(2) << "Will run normal mode sync cycle with types "
462            << ModelTypeSetToString(session_context_->GetEnabledTypes());
463   scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
464   bool premature_exit = !syncer_->NormalSyncShare(
465       GetEnabledAndUnthrottledTypes(),
466       nudge_tracker_,
467       session.get());
468   AdjustPolling(FORCE_RESET);
469   // Don't run poll job till the next time poll timer fires.
470   do_poll_after_credentials_updated_ = false;
471 
472   bool success = !premature_exit
473       && !sessions::HasSyncerError(
474           session->status_controller().model_neutral_state());
475 
476   if (success) {
477     // That cycle took care of any outstanding work we had.
478     SDVLOG(2) << "Nudge succeeded.";
479     nudge_tracker_.RecordSuccessfulSyncCycle();
480     scheduled_nudge_time_ = base::TimeTicks();
481 
482     // If we're here, then we successfully reached the server.  End all backoff.
483     wait_interval_.reset();
484     NotifyRetryTime(base::Time());
485   } else {
486     HandleFailure(session->status_controller().model_neutral_state());
487   }
488 }
489 
DoConfigurationSyncSessionJob(JobPriority priority)490 void SyncSchedulerImpl::DoConfigurationSyncSessionJob(JobPriority priority) {
491   DCHECK(CalledOnValidThread());
492   DCHECK_EQ(mode_, CONFIGURATION_MODE);
493   DCHECK(pending_configure_params_ != NULL);
494 
495   if (!CanRunJobNow(priority)) {
496     SDVLOG(2) << "Unable to run configure job right now.";
497     if (!pending_configure_params_->retry_task.is_null()) {
498       pending_configure_params_->retry_task.Run();
499       pending_configure_params_->retry_task.Reset();
500     }
501     return;
502   }
503 
504   SDVLOG(2) << "Will run configure SyncShare with types "
505             << ModelTypeSetToString(session_context_->GetEnabledTypes());
506   scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
507   bool premature_exit = !syncer_->ConfigureSyncShare(
508       pending_configure_params_->types_to_download,
509       pending_configure_params_->source,
510       session.get());
511   AdjustPolling(FORCE_RESET);
512   // Don't run poll job till the next time poll timer fires.
513   do_poll_after_credentials_updated_ = false;
514 
515   bool success = !premature_exit
516       && !sessions::HasSyncerError(
517           session->status_controller().model_neutral_state());
518 
519   if (success) {
520     SDVLOG(2) << "Configure succeeded.";
521     pending_configure_params_->ready_task.Run();
522     pending_configure_params_.reset();
523 
524     // If we're here, then we successfully reached the server.  End all backoff.
525     wait_interval_.reset();
526     NotifyRetryTime(base::Time());
527   } else {
528     HandleFailure(session->status_controller().model_neutral_state());
529     // Sync cycle might receive response from server that causes scheduler to
530     // stop and draws pending_configure_params_ invalid.
531     if (started_ && !pending_configure_params_->retry_task.is_null()) {
532       pending_configure_params_->retry_task.Run();
533       pending_configure_params_->retry_task.Reset();
534     }
535   }
536 }
537 
HandleFailure(const sessions::ModelNeutralState & model_neutral_state)538 void SyncSchedulerImpl::HandleFailure(
539     const sessions::ModelNeutralState& model_neutral_state) {
540   if (IsCurrentlyThrottled()) {
541     SDVLOG(2) << "Was throttled during previous sync cycle.";
542     RestartWaiting();
543   } else if (!IsBackingOff()) {
544     // Setup our backoff if this is our first such failure.
545     TimeDelta length = delay_provider_->GetDelay(
546         delay_provider_->GetInitialDelay(model_neutral_state));
547     wait_interval_.reset(
548         new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length));
549     SDVLOG(2) << "Sync cycle failed.  Will back off for "
550         << wait_interval_->length.InMilliseconds() << "ms.";
551     RestartWaiting();
552   }
553 }
554 
DoPollSyncSessionJob()555 void SyncSchedulerImpl::DoPollSyncSessionJob() {
556   base::AutoReset<bool> protector(&no_scheduling_allowed_, true);
557 
558   SDVLOG(2) << "Polling with types "
559             << ModelTypeSetToString(GetEnabledAndUnthrottledTypes());
560   scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
561   syncer_->PollSyncShare(
562       GetEnabledAndUnthrottledTypes(),
563       session.get());
564 
565   AdjustPolling(FORCE_RESET);
566 
567   if (IsCurrentlyThrottled()) {
568     SDVLOG(2) << "Poll request got us throttled.";
569     // The OnSilencedUntil() call set up the WaitInterval for us.  All we need
570     // to do is start the timer.
571     RestartWaiting();
572   }
573 }
574 
UpdateNudgeTimeRecords(ModelTypeSet types)575 void SyncSchedulerImpl::UpdateNudgeTimeRecords(ModelTypeSet types) {
576   DCHECK(CalledOnValidThread());
577   base::TimeTicks now = TimeTicks::Now();
578   // Update timing information for how often datatypes are triggering nudges.
579   for (ModelTypeSet::Iterator iter = types.First(); iter.Good(); iter.Inc()) {
580     base::TimeTicks previous = last_local_nudges_by_model_type_[iter.Get()];
581     last_local_nudges_by_model_type_[iter.Get()] = now;
582     if (previous.is_null())
583       continue;
584 
585 #define PER_DATA_TYPE_MACRO(type_str) \
586     SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous);
587     SYNC_DATA_TYPE_HISTOGRAM(iter.Get());
588 #undef PER_DATA_TYPE_MACRO
589   }
590 }
591 
GetPollInterval()592 TimeDelta SyncSchedulerImpl::GetPollInterval() {
593   return (!session_context_->notifications_enabled() ||
594           !session_context_->ShouldFetchUpdatesBeforeCommit()) ?
595       syncer_short_poll_interval_seconds_ :
596       syncer_long_poll_interval_seconds_;
597 }
598 
AdjustPolling(PollAdjustType type)599 void SyncSchedulerImpl::AdjustPolling(PollAdjustType type) {
600   DCHECK(CalledOnValidThread());
601 
602   TimeDelta poll = GetPollInterval();
603   bool rate_changed = !poll_timer_.IsRunning() ||
604                        poll != poll_timer_.GetCurrentDelay();
605 
606   if (type == FORCE_RESET) {
607     last_poll_reset_ = base::TimeTicks::Now();
608     if (!rate_changed)
609       poll_timer_.Reset();
610   }
611 
612   if (!rate_changed)
613     return;
614 
615   // Adjust poll rate.
616   poll_timer_.Stop();
617   poll_timer_.Start(FROM_HERE, poll, this,
618                     &SyncSchedulerImpl::PollTimerCallback);
619 }
620 
RestartWaiting()621 void SyncSchedulerImpl::RestartWaiting() {
622   CHECK(wait_interval_.get());
623   DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0));
624   NotifyRetryTime(base::Time::Now() + wait_interval_->length);
625   SDVLOG(2) << "Starting WaitInterval timer of length "
626       << wait_interval_->length.InMilliseconds() << "ms.";
627   if (wait_interval_->mode == WaitInterval::THROTTLED) {
628     pending_wakeup_timer_.Start(
629         FROM_HERE,
630         wait_interval_->length,
631         base::Bind(&SyncSchedulerImpl::Unthrottle,
632                    weak_ptr_factory_.GetWeakPtr()));
633   } else {
634     pending_wakeup_timer_.Start(
635         FROM_HERE,
636         wait_interval_->length,
637         base::Bind(&SyncSchedulerImpl::ExponentialBackoffRetry,
638                    weak_ptr_factory_.GetWeakPtr()));
639   }
640 }
641 
Stop()642 void SyncSchedulerImpl::Stop() {
643   DCHECK(CalledOnValidThread());
644   SDVLOG(2) << "Stop called";
645 
646   // Kill any in-flight method calls.
647   weak_ptr_factory_.InvalidateWeakPtrs();
648   wait_interval_.reset();
649   NotifyRetryTime(base::Time());
650   poll_timer_.Stop();
651   pending_wakeup_timer_.Stop();
652   pending_configure_params_.reset();
653   if (started_)
654     started_ = false;
655 }
656 
657 // This is the only place where we invoke DoSyncSessionJob with canary
658 // privileges.  Everyone else should use NORMAL_PRIORITY.
TryCanaryJob()659 void SyncSchedulerImpl::TryCanaryJob() {
660   next_sync_session_job_priority_ = CANARY_PRIORITY;
661   TrySyncSessionJob();
662 }
663 
TrySyncSessionJob()664 void SyncSchedulerImpl::TrySyncSessionJob() {
665   // Post call to TrySyncSessionJobImpl on current thread. Later request for
666   // access token will be here.
667   base::MessageLoop::current()->PostTask(FROM_HERE, base::Bind(
668       &SyncSchedulerImpl::TrySyncSessionJobImpl,
669       weak_ptr_factory_.GetWeakPtr()));
670 }
671 
TrySyncSessionJobImpl()672 void SyncSchedulerImpl::TrySyncSessionJobImpl() {
673   JobPriority priority = next_sync_session_job_priority_;
674   next_sync_session_job_priority_ = NORMAL_PRIORITY;
675 
676   nudge_tracker_.SetSyncCycleStartTime(base::TimeTicks::Now());
677 
678   DCHECK(CalledOnValidThread());
679   if (mode_ == CONFIGURATION_MODE) {
680     if (pending_configure_params_) {
681       SDVLOG(2) << "Found pending configure job";
682       DoConfigurationSyncSessionJob(priority);
683     }
684   } else if (CanRunNudgeJobNow(priority)) {
685     if (nudge_tracker_.IsSyncRequired()) {
686       SDVLOG(2) << "Found pending nudge job";
687       DoNudgeSyncSessionJob(priority);
688     } else if (do_poll_after_credentials_updated_ ||
689         ((base::TimeTicks::Now() - last_poll_reset_) >= GetPollInterval())) {
690       DoPollSyncSessionJob();
691       // Poll timer fires infrequently. Usually by this time access token is
692       // already expired and poll job will fail with auth error. Set flag to
693       // retry poll once ProfileSyncService gets new access token, TryCanaryJob
694       // will be called after access token is retrieved.
695       if (HttpResponse::SYNC_AUTH_ERROR ==
696           session_context_->connection_manager()->server_status()) {
697         do_poll_after_credentials_updated_ = true;
698       }
699     }
700   }
701 
702   if (priority == CANARY_PRIORITY) {
703     // If this is canary job then whatever result was don't run poll job till
704     // the next time poll timer fires.
705     do_poll_after_credentials_updated_ = false;
706   }
707 
708   if (IsBackingOff() && !pending_wakeup_timer_.IsRunning()) {
709     // If we succeeded, our wait interval would have been cleared.  If it hasn't
710     // been cleared, then we should increase our backoff interval and schedule
711     // another retry.
712     TimeDelta length = delay_provider_->GetDelay(wait_interval_->length);
713     wait_interval_.reset(
714       new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length));
715     SDVLOG(2) << "Sync cycle failed.  Will back off for "
716         << wait_interval_->length.InMilliseconds() << "ms.";
717     RestartWaiting();
718   }
719 }
720 
PollTimerCallback()721 void SyncSchedulerImpl::PollTimerCallback() {
722   DCHECK(CalledOnValidThread());
723   if (no_scheduling_allowed_) {
724     // The no_scheduling_allowed_ flag is set by a function-scoped AutoReset in
725     // functions that are called only on the sync thread.  This function is also
726     // called only on the sync thread, and only when it is posted by an expiring
727     // timer.  If we find that no_scheduling_allowed_ is set here, then
728     // something is very wrong.  Maybe someone mistakenly called us directly, or
729     // mishandled the book-keeping for no_scheduling_allowed_.
730     NOTREACHED() << "Illegal to schedule job while session in progress.";
731     return;
732   }
733 
734   TrySyncSessionJob();
735 }
736 
RetryTimerCallback()737 void SyncSchedulerImpl::RetryTimerCallback() {
738   TrySyncSessionJob();
739 }
740 
Unthrottle()741 void SyncSchedulerImpl::Unthrottle() {
742   DCHECK(CalledOnValidThread());
743   DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
744 
745   // We're no longer throttled, so clear the wait interval.
746   wait_interval_.reset();
747   NotifyRetryTime(base::Time());
748   NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
749 
750   // We treat this as a 'canary' in the sense that it was originally scheduled
751   // to run some time ago, failed, and we now want to retry, versus a job that
752   // was just created (e.g via ScheduleNudgeImpl). The main implication is
753   // that we're careful to update routing info (etc) with such potentially
754   // stale canary jobs.
755   TryCanaryJob();
756 }
757 
TypeUnthrottle(base::TimeTicks unthrottle_time)758 void SyncSchedulerImpl::TypeUnthrottle(base::TimeTicks unthrottle_time) {
759   DCHECK(CalledOnValidThread());
760   nudge_tracker_.UpdateTypeThrottlingState(unthrottle_time);
761   NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
762 
763   if (nudge_tracker_.IsAnyTypeThrottled()) {
764     const base::TimeTicks now = base::TimeTicks::Now();
765     base::TimeDelta time_until_next_unthrottle =
766         nudge_tracker_.GetTimeUntilNextUnthrottle(now);
767     type_unthrottle_timer_.Start(
768         FROM_HERE,
769         time_until_next_unthrottle,
770         base::Bind(&SyncSchedulerImpl::TypeUnthrottle,
771                    weak_ptr_factory_.GetWeakPtr(),
772                    now + time_until_next_unthrottle));
773   }
774 
775   // Maybe this is a good time to run a nudge job.  Let's try it.
776   if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY))
777     TrySyncSessionJob();
778 }
779 
PerformDelayedNudge()780 void SyncSchedulerImpl::PerformDelayedNudge() {
781   // Circumstances may have changed since we scheduled this delayed nudge.
782   // We must check to see if it's OK to run the job before we do so.
783   if (CanRunNudgeJobNow(NORMAL_PRIORITY))
784     TrySyncSessionJob();
785 
786   // We're not responsible for setting up any retries here.  The functions that
787   // first put us into a state that prevents successful sync cycles (eg. global
788   // throttling, type throttling, network errors, transient errors) will also
789   // setup the appropriate retry logic (eg. retry after timeout, exponential
790   // backoff, retry when the network changes).
791 }
792 
ExponentialBackoffRetry()793 void SyncSchedulerImpl::ExponentialBackoffRetry() {
794   TryCanaryJob();
795 }
796 
NotifyRetryTime(base::Time retry_time)797 void SyncSchedulerImpl::NotifyRetryTime(base::Time retry_time) {
798   FOR_EACH_OBSERVER(SyncEngineEventListener,
799                     *session_context_->listeners(),
800                     OnRetryTimeChanged(retry_time));
801 }
802 
NotifyThrottledTypesChanged(ModelTypeSet types)803 void SyncSchedulerImpl::NotifyThrottledTypesChanged(ModelTypeSet types) {
804   FOR_EACH_OBSERVER(SyncEngineEventListener,
805                     *session_context_->listeners(),
806                     OnThrottledTypesChanged(types));
807 }
808 
IsBackingOff() const809 bool SyncSchedulerImpl::IsBackingOff() const {
810   DCHECK(CalledOnValidThread());
811   return wait_interval_.get() && wait_interval_->mode ==
812       WaitInterval::EXPONENTIAL_BACKOFF;
813 }
814 
OnThrottled(const base::TimeDelta & throttle_duration)815 void SyncSchedulerImpl::OnThrottled(const base::TimeDelta& throttle_duration) {
816   DCHECK(CalledOnValidThread());
817   wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED,
818                                         throttle_duration));
819   NotifyRetryTime(base::Time::Now() + wait_interval_->length);
820   NotifyThrottledTypesChanged(ModelTypeSet::All());
821 }
822 
OnTypesThrottled(ModelTypeSet types,const base::TimeDelta & throttle_duration)823 void SyncSchedulerImpl::OnTypesThrottled(
824     ModelTypeSet types,
825     const base::TimeDelta& throttle_duration) {
826   base::TimeTicks now = base::TimeTicks::Now();
827 
828   nudge_tracker_.SetTypesThrottledUntil(types, throttle_duration, now);
829   base::TimeDelta time_until_next_unthrottle =
830       nudge_tracker_.GetTimeUntilNextUnthrottle(now);
831   type_unthrottle_timer_.Start(
832       FROM_HERE,
833       time_until_next_unthrottle,
834       base::Bind(&SyncSchedulerImpl::TypeUnthrottle,
835                  weak_ptr_factory_.GetWeakPtr(),
836                  now + time_until_next_unthrottle));
837   NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
838 }
839 
IsCurrentlyThrottled()840 bool SyncSchedulerImpl::IsCurrentlyThrottled() {
841   DCHECK(CalledOnValidThread());
842   return wait_interval_.get() && wait_interval_->mode ==
843       WaitInterval::THROTTLED;
844 }
845 
OnReceivedShortPollIntervalUpdate(const base::TimeDelta & new_interval)846 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate(
847     const base::TimeDelta& new_interval) {
848   DCHECK(CalledOnValidThread());
849   syncer_short_poll_interval_seconds_ = new_interval;
850 }
851 
OnReceivedLongPollIntervalUpdate(const base::TimeDelta & new_interval)852 void SyncSchedulerImpl::OnReceivedLongPollIntervalUpdate(
853     const base::TimeDelta& new_interval) {
854   DCHECK(CalledOnValidThread());
855   syncer_long_poll_interval_seconds_ = new_interval;
856 }
857 
OnReceivedSessionsCommitDelay(const base::TimeDelta & new_delay)858 void SyncSchedulerImpl::OnReceivedSessionsCommitDelay(
859     const base::TimeDelta& new_delay) {
860   DCHECK(CalledOnValidThread());
861   sessions_commit_delay_ = new_delay;
862 }
863 
OnReceivedClientInvalidationHintBufferSize(int size)864 void SyncSchedulerImpl::OnReceivedClientInvalidationHintBufferSize(int size) {
865   if (size > 0)
866     nudge_tracker_.SetHintBufferSize(size);
867   else
868     NOTREACHED() << "Hint buffer size should be > 0.";
869 }
870 
OnSyncProtocolError(const SyncProtocolError & sync_protocol_error)871 void SyncSchedulerImpl::OnSyncProtocolError(
872     const SyncProtocolError& sync_protocol_error) {
873   DCHECK(CalledOnValidThread());
874   if (ShouldRequestEarlyExit(sync_protocol_error)) {
875     SDVLOG(2) << "Sync Scheduler requesting early exit.";
876     Stop();
877   }
878   if (IsActionableError(sync_protocol_error)) {
879     SDVLOG(2) << "OnActionableError";
880     FOR_EACH_OBSERVER(SyncEngineEventListener,
881                       *session_context_->listeners(),
882                       OnActionableError(sync_protocol_error));
883   }
884 }
885 
OnReceivedGuRetryDelay(const base::TimeDelta & delay)886 void SyncSchedulerImpl::OnReceivedGuRetryDelay(const base::TimeDelta& delay) {
887   nudge_tracker_.SetNextRetryTime(TimeTicks::Now() + delay);
888   retry_timer_.Start(FROM_HERE, delay, this,
889                      &SyncSchedulerImpl::RetryTimerCallback);
890 }
891 
OnReceivedMigrationRequest(ModelTypeSet types)892 void SyncSchedulerImpl::OnReceivedMigrationRequest(ModelTypeSet types) {
893     FOR_EACH_OBSERVER(SyncEngineEventListener,
894                       *session_context_->listeners(),
895                       OnMigrationRequested(types));
896 }
897 
SetNotificationsEnabled(bool notifications_enabled)898 void SyncSchedulerImpl::SetNotificationsEnabled(bool notifications_enabled) {
899   DCHECK(CalledOnValidThread());
900   session_context_->set_notifications_enabled(notifications_enabled);
901   if (notifications_enabled)
902     nudge_tracker_.OnInvalidationsEnabled();
903   else
904     nudge_tracker_.OnInvalidationsDisabled();
905 }
906 
GetSessionsCommitDelay() const907 base::TimeDelta SyncSchedulerImpl::GetSessionsCommitDelay() const {
908   DCHECK(CalledOnValidThread());
909   return sessions_commit_delay_;
910 }
911 
912 #undef SDVLOG_LOC
913 
914 #undef SDVLOG
915 
916 #undef SLOG
917 
918 #undef ENUM_CASE
919 
920 }  // namespace syncer
921