1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "sync/engine/sync_scheduler_impl.h"
6
7 #include <algorithm>
8 #include <cstring>
9
10 #include "base/auto_reset.h"
11 #include "base/bind.h"
12 #include "base/bind_helpers.h"
13 #include "base/compiler_specific.h"
14 #include "base/location.h"
15 #include "base/logging.h"
16 #include "base/message_loop/message_loop.h"
17 #include "sync/engine/backoff_delay_provider.h"
18 #include "sync/engine/syncer.h"
19 #include "sync/notifier/object_id_invalidation_map.h"
20 #include "sync/protocol/proto_enum_conversions.h"
21 #include "sync/protocol/sync.pb.h"
22 #include "sync/util/data_type_histogram.h"
23 #include "sync/util/logging.h"
24
25 using base::TimeDelta;
26 using base::TimeTicks;
27
28 namespace syncer {
29
30 using sessions::SyncSession;
31 using sessions::SyncSessionSnapshot;
32 using sync_pb::GetUpdatesCallerInfo;
33
34 namespace {
35
ShouldRequestEarlyExit(const SyncProtocolError & error)36 bool ShouldRequestEarlyExit(const SyncProtocolError& error) {
37 switch (error.error_type) {
38 case SYNC_SUCCESS:
39 case MIGRATION_DONE:
40 case THROTTLED:
41 case TRANSIENT_ERROR:
42 return false;
43 case NOT_MY_BIRTHDAY:
44 case CLEAR_PENDING:
45 case DISABLED_BY_ADMIN:
46 case USER_ROLLBACK:
47 // If we send terminate sync early then |sync_cycle_ended| notification
48 // would not be sent. If there were no actions then |ACTIONABLE_ERROR|
49 // notification wouldnt be sent either. Then the UI layer would be left
50 // waiting forever. So assert we would send something.
51 DCHECK_NE(error.action, UNKNOWN_ACTION);
52 return true;
53 case INVALID_CREDENTIAL:
54 // The notification for this is handled by PostAndProcessHeaders|.
55 // Server does no have to send any action for this.
56 return true;
57 // Make the default a NOTREACHED. So if a new error is introduced we
58 // think about its expected functionality.
59 default:
60 NOTREACHED();
61 return false;
62 }
63 }
64
IsActionableError(const SyncProtocolError & error)65 bool IsActionableError(
66 const SyncProtocolError& error) {
67 return (error.action != UNKNOWN_ACTION);
68 }
69 } // namespace
70
ConfigurationParams()71 ConfigurationParams::ConfigurationParams()
72 : source(GetUpdatesCallerInfo::UNKNOWN) {}
ConfigurationParams(const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource & source,ModelTypeSet types_to_download,const ModelSafeRoutingInfo & routing_info,const base::Closure & ready_task,const base::Closure & retry_task)73 ConfigurationParams::ConfigurationParams(
74 const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource& source,
75 ModelTypeSet types_to_download,
76 const ModelSafeRoutingInfo& routing_info,
77 const base::Closure& ready_task,
78 const base::Closure& retry_task)
79 : source(source),
80 types_to_download(types_to_download),
81 routing_info(routing_info),
82 ready_task(ready_task),
83 retry_task(retry_task) {
84 DCHECK(!ready_task.is_null());
85 DCHECK(!retry_task.is_null());
86 }
~ConfigurationParams()87 ConfigurationParams::~ConfigurationParams() {}
88
WaitInterval()89 SyncSchedulerImpl::WaitInterval::WaitInterval()
90 : mode(UNKNOWN) {}
91
WaitInterval(Mode mode,TimeDelta length)92 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
93 : mode(mode), length(length) {}
94
~WaitInterval()95 SyncSchedulerImpl::WaitInterval::~WaitInterval() {}
96
97 #define ENUM_CASE(x) case x: return #x; break;
98
GetModeString(Mode mode)99 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) {
100 switch (mode) {
101 ENUM_CASE(UNKNOWN);
102 ENUM_CASE(EXPONENTIAL_BACKOFF);
103 ENUM_CASE(THROTTLED);
104 }
105 NOTREACHED();
106 return "";
107 }
108
GetUpdatesFromNudgeSource(NudgeSource source)109 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource(
110 NudgeSource source) {
111 switch (source) {
112 case NUDGE_SOURCE_NOTIFICATION:
113 return GetUpdatesCallerInfo::NOTIFICATION;
114 case NUDGE_SOURCE_LOCAL:
115 return GetUpdatesCallerInfo::LOCAL;
116 case NUDGE_SOURCE_LOCAL_REFRESH:
117 return GetUpdatesCallerInfo::DATATYPE_REFRESH;
118 case NUDGE_SOURCE_UNKNOWN:
119 return GetUpdatesCallerInfo::UNKNOWN;
120 default:
121 NOTREACHED();
122 return GetUpdatesCallerInfo::UNKNOWN;
123 }
124 }
125
126 // Helper macros to log with the syncer thread name; useful when there
127 // are multiple syncer threads involved.
128
129 #define SLOG(severity) LOG(severity) << name_ << ": "
130
131 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": "
132
133 #define SDVLOG_LOC(from_here, verbose_level) \
134 DVLOG_LOC(from_here, verbose_level) << name_ << ": "
135
136 namespace {
137
138 const int kDefaultSessionsCommitDelaySeconds = 10;
139
IsConfigRelatedUpdateSourceValue(GetUpdatesCallerInfo::GetUpdatesSource source)140 bool IsConfigRelatedUpdateSourceValue(
141 GetUpdatesCallerInfo::GetUpdatesSource source) {
142 switch (source) {
143 case GetUpdatesCallerInfo::RECONFIGURATION:
144 case GetUpdatesCallerInfo::MIGRATION:
145 case GetUpdatesCallerInfo::NEW_CLIENT:
146 case GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE:
147 return true;
148 default:
149 return false;
150 }
151 }
152
153 } // namespace
154
SyncSchedulerImpl(const std::string & name,BackoffDelayProvider * delay_provider,sessions::SyncSessionContext * context,Syncer * syncer)155 SyncSchedulerImpl::SyncSchedulerImpl(const std::string& name,
156 BackoffDelayProvider* delay_provider,
157 sessions::SyncSessionContext* context,
158 Syncer* syncer)
159 : name_(name),
160 started_(false),
161 syncer_short_poll_interval_seconds_(
162 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)),
163 syncer_long_poll_interval_seconds_(
164 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)),
165 sessions_commit_delay_(
166 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)),
167 mode_(NORMAL_MODE),
168 delay_provider_(delay_provider),
169 syncer_(syncer),
170 session_context_(context),
171 no_scheduling_allowed_(false),
172 do_poll_after_credentials_updated_(false),
173 next_sync_session_job_priority_(NORMAL_PRIORITY),
174 weak_ptr_factory_(this),
175 weak_ptr_factory_for_weak_handle_(this) {
176 weak_handle_this_ = MakeWeakHandle(
177 weak_ptr_factory_for_weak_handle_.GetWeakPtr());
178 }
179
~SyncSchedulerImpl()180 SyncSchedulerImpl::~SyncSchedulerImpl() {
181 DCHECK(CalledOnValidThread());
182 Stop();
183 }
184
OnCredentialsUpdated()185 void SyncSchedulerImpl::OnCredentialsUpdated() {
186 DCHECK(CalledOnValidThread());
187
188 if (HttpResponse::SYNC_AUTH_ERROR ==
189 session_context_->connection_manager()->server_status()) {
190 OnServerConnectionErrorFixed();
191 }
192 }
193
OnConnectionStatusChange()194 void SyncSchedulerImpl::OnConnectionStatusChange() {
195 if (HttpResponse::CONNECTION_UNAVAILABLE ==
196 session_context_->connection_manager()->server_status()) {
197 // Optimistically assume that the connection is fixed and try
198 // connecting.
199 OnServerConnectionErrorFixed();
200 }
201 }
202
OnServerConnectionErrorFixed()203 void SyncSchedulerImpl::OnServerConnectionErrorFixed() {
204 // There could be a pending nudge or configuration job in several cases:
205 //
206 // 1. We're in exponential backoff.
207 // 2. We're silenced / throttled.
208 // 3. A nudge was saved previously due to not having a valid auth token.
209 // 4. A nudge was scheduled + saved while in configuration mode.
210 //
211 // In all cases except (2), we want to retry contacting the server. We
212 // call TryCanaryJob to achieve this, and note that nothing -- not even a
213 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that
214 // has the authority to do that is the Unthrottle timer.
215 TryCanaryJob();
216 }
217
Start(Mode mode)218 void SyncSchedulerImpl::Start(Mode mode) {
219 DCHECK(CalledOnValidThread());
220 std::string thread_name = base::MessageLoop::current()->thread_name();
221 if (thread_name.empty())
222 thread_name = "<Main thread>";
223 SDVLOG(2) << "Start called from thread "
224 << thread_name << " with mode " << GetModeString(mode);
225 if (!started_) {
226 started_ = true;
227 SendInitialSnapshot();
228 }
229
230 DCHECK(!session_context_->account_name().empty());
231 DCHECK(syncer_.get());
232 Mode old_mode = mode_;
233 mode_ = mode;
234 AdjustPolling(UPDATE_INTERVAL); // Will kick start poll timer if needed.
235
236 if (old_mode != mode_ && mode_ == NORMAL_MODE) {
237 // We just got back to normal mode. Let's try to run the work that was
238 // queued up while we were configuring.
239
240 // Update our current time before checking IsRetryRequired().
241 nudge_tracker_.SetSyncCycleStartTime(base::TimeTicks::Now());
242 if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY)) {
243 TrySyncSessionJob();
244 }
245 }
246 }
247
GetEnabledAndUnthrottledTypes()248 ModelTypeSet SyncSchedulerImpl::GetEnabledAndUnthrottledTypes() {
249 ModelTypeSet enabled_types = session_context_->GetEnabledTypes();
250 ModelTypeSet enabled_protocol_types =
251 Intersection(ProtocolTypes(), enabled_types);
252 ModelTypeSet throttled_types = nudge_tracker_.GetThrottledTypes();
253 return Difference(enabled_protocol_types, throttled_types);
254 }
255
SendInitialSnapshot()256 void SyncSchedulerImpl::SendInitialSnapshot() {
257 DCHECK(CalledOnValidThread());
258 scoped_ptr<SyncSession> dummy(SyncSession::Build(session_context_, this));
259 SyncCycleEvent event(SyncCycleEvent::STATUS_CHANGED);
260 event.snapshot = dummy->TakeSnapshot();
261 FOR_EACH_OBSERVER(SyncEngineEventListener,
262 *session_context_->listeners(),
263 OnSyncCycleEvent(event));
264 }
265
266 namespace {
267
268 // Helper to extract the routing info corresponding to types in
269 // |types_to_download| from |current_routes|.
BuildModelSafeParams(ModelTypeSet types_to_download,const ModelSafeRoutingInfo & current_routes,ModelSafeRoutingInfo * result_routes)270 void BuildModelSafeParams(
271 ModelTypeSet types_to_download,
272 const ModelSafeRoutingInfo& current_routes,
273 ModelSafeRoutingInfo* result_routes) {
274 for (ModelTypeSet::Iterator iter = types_to_download.First(); iter.Good();
275 iter.Inc()) {
276 ModelType type = iter.Get();
277 ModelSafeRoutingInfo::const_iterator route = current_routes.find(type);
278 DCHECK(route != current_routes.end());
279 ModelSafeGroup group = route->second;
280 (*result_routes)[type] = group;
281 }
282 }
283
284 } // namespace.
285
ScheduleConfiguration(const ConfigurationParams & params)286 void SyncSchedulerImpl::ScheduleConfiguration(
287 const ConfigurationParams& params) {
288 DCHECK(CalledOnValidThread());
289 DCHECK(IsConfigRelatedUpdateSourceValue(params.source));
290 DCHECK_EQ(CONFIGURATION_MODE, mode_);
291 DCHECK(!params.ready_task.is_null());
292 CHECK(started_) << "Scheduler must be running to configure.";
293 SDVLOG(2) << "Reconfiguring syncer.";
294
295 // Only one configuration is allowed at a time. Verify we're not waiting
296 // for a pending configure job.
297 DCHECK(!pending_configure_params_);
298
299 ModelSafeRoutingInfo restricted_routes;
300 BuildModelSafeParams(params.types_to_download,
301 params.routing_info,
302 &restricted_routes);
303 session_context_->SetRoutingInfo(restricted_routes);
304
305 // Only reconfigure if we have types to download.
306 if (!params.types_to_download.Empty()) {
307 pending_configure_params_.reset(new ConfigurationParams(params));
308 TrySyncSessionJob();
309 } else {
310 SDVLOG(2) << "No change in routing info, calling ready task directly.";
311 params.ready_task.Run();
312 }
313 }
314
CanRunJobNow(JobPriority priority)315 bool SyncSchedulerImpl::CanRunJobNow(JobPriority priority) {
316 DCHECK(CalledOnValidThread());
317 if (wait_interval_ && wait_interval_->mode == WaitInterval::THROTTLED) {
318 SDVLOG(1) << "Unable to run a job because we're throttled.";
319 return false;
320 }
321
322 if (wait_interval_
323 && wait_interval_->mode == WaitInterval::EXPONENTIAL_BACKOFF
324 && priority != CANARY_PRIORITY) {
325 SDVLOG(1) << "Unable to run a job because we're backing off.";
326 return false;
327 }
328
329 if (session_context_->connection_manager()->HasInvalidAuthToken()) {
330 SDVLOG(1) << "Unable to run a job because we have no valid auth token.";
331 return false;
332 }
333
334 return true;
335 }
336
CanRunNudgeJobNow(JobPriority priority)337 bool SyncSchedulerImpl::CanRunNudgeJobNow(JobPriority priority) {
338 DCHECK(CalledOnValidThread());
339
340 if (!CanRunJobNow(priority)) {
341 SDVLOG(1) << "Unable to run a nudge job right now";
342 return false;
343 }
344
345 const ModelTypeSet enabled_types = session_context_->GetEnabledTypes();
346 if (nudge_tracker_.GetThrottledTypes().HasAll(enabled_types)) {
347 SDVLOG(1) << "Not running a nudge because we're fully type throttled.";
348 return false;
349 }
350
351 if (mode_ == CONFIGURATION_MODE) {
352 SDVLOG(1) << "Not running nudge because we're in configuration mode.";
353 return false;
354 }
355
356 return true;
357 }
358
ScheduleLocalNudge(const TimeDelta & desired_delay,ModelTypeSet types,const tracked_objects::Location & nudge_location)359 void SyncSchedulerImpl::ScheduleLocalNudge(
360 const TimeDelta& desired_delay,
361 ModelTypeSet types,
362 const tracked_objects::Location& nudge_location) {
363 DCHECK(CalledOnValidThread());
364 DCHECK(!types.Empty());
365
366 SDVLOG_LOC(nudge_location, 2)
367 << "Scheduling sync because of local change to "
368 << ModelTypeSetToString(types);
369 UpdateNudgeTimeRecords(types);
370 nudge_tracker_.RecordLocalChange(types);
371 ScheduleNudgeImpl(desired_delay, nudge_location);
372 }
373
ScheduleLocalRefreshRequest(const TimeDelta & desired_delay,ModelTypeSet types,const tracked_objects::Location & nudge_location)374 void SyncSchedulerImpl::ScheduleLocalRefreshRequest(
375 const TimeDelta& desired_delay,
376 ModelTypeSet types,
377 const tracked_objects::Location& nudge_location) {
378 DCHECK(CalledOnValidThread());
379 DCHECK(!types.Empty());
380
381 SDVLOG_LOC(nudge_location, 2)
382 << "Scheduling sync because of local refresh request for "
383 << ModelTypeSetToString(types);
384 nudge_tracker_.RecordLocalRefreshRequest(types);
385 ScheduleNudgeImpl(desired_delay, nudge_location);
386 }
387
ScheduleInvalidationNudge(const TimeDelta & desired_delay,const ObjectIdInvalidationMap & invalidation_map,const tracked_objects::Location & nudge_location)388 void SyncSchedulerImpl::ScheduleInvalidationNudge(
389 const TimeDelta& desired_delay,
390 const ObjectIdInvalidationMap& invalidation_map,
391 const tracked_objects::Location& nudge_location) {
392 DCHECK(CalledOnValidThread());
393 DCHECK(!invalidation_map.Empty());
394
395 SDVLOG_LOC(nudge_location, 2)
396 << "Scheduling sync because we received invalidation for "
397 << ModelTypeSetToString(
398 ObjectIdSetToModelTypeSet(invalidation_map.GetObjectIds()));
399 nudge_tracker_.RecordRemoteInvalidation(invalidation_map);
400 ScheduleNudgeImpl(desired_delay, nudge_location);
401 }
402
403 // TODO(zea): Consider adding separate throttling/backoff for datatype
404 // refresh requests.
ScheduleNudgeImpl(const TimeDelta & delay,const tracked_objects::Location & nudge_location)405 void SyncSchedulerImpl::ScheduleNudgeImpl(
406 const TimeDelta& delay,
407 const tracked_objects::Location& nudge_location) {
408 DCHECK(CalledOnValidThread());
409
410 if (no_scheduling_allowed_) {
411 NOTREACHED() << "Illegal to schedule job while session in progress.";
412 return;
413 }
414
415 if (!started_) {
416 SDVLOG_LOC(nudge_location, 2)
417 << "Dropping nudge, scheduler is not running.";
418 return;
419 }
420
421 SDVLOG_LOC(nudge_location, 2)
422 << "In ScheduleNudgeImpl with delay "
423 << delay.InMilliseconds() << " ms";
424
425 if (!CanRunNudgeJobNow(NORMAL_PRIORITY))
426 return;
427
428 TimeTicks incoming_run_time = TimeTicks::Now() + delay;
429 if (!scheduled_nudge_time_.is_null() &&
430 (scheduled_nudge_time_ < incoming_run_time)) {
431 // Old job arrives sooner than this one. Don't reschedule it.
432 return;
433 }
434
435 // Either there is no existing nudge in flight or the incoming nudge should be
436 // made to arrive first (preempt) the existing nudge. We reschedule in either
437 // case.
438 SDVLOG_LOC(nudge_location, 2)
439 << "Scheduling a nudge with "
440 << delay.InMilliseconds() << " ms delay";
441 scheduled_nudge_time_ = incoming_run_time;
442 pending_wakeup_timer_.Start(
443 nudge_location,
444 delay,
445 base::Bind(&SyncSchedulerImpl::PerformDelayedNudge,
446 weak_ptr_factory_.GetWeakPtr()));
447 }
448
GetModeString(SyncScheduler::Mode mode)449 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) {
450 switch (mode) {
451 ENUM_CASE(CONFIGURATION_MODE);
452 ENUM_CASE(NORMAL_MODE);
453 }
454 return "";
455 }
456
DoNudgeSyncSessionJob(JobPriority priority)457 void SyncSchedulerImpl::DoNudgeSyncSessionJob(JobPriority priority) {
458 DCHECK(CalledOnValidThread());
459 DCHECK(CanRunNudgeJobNow(priority));
460
461 DVLOG(2) << "Will run normal mode sync cycle with types "
462 << ModelTypeSetToString(session_context_->GetEnabledTypes());
463 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
464 bool premature_exit = !syncer_->NormalSyncShare(
465 GetEnabledAndUnthrottledTypes(),
466 nudge_tracker_,
467 session.get());
468 AdjustPolling(FORCE_RESET);
469 // Don't run poll job till the next time poll timer fires.
470 do_poll_after_credentials_updated_ = false;
471
472 bool success = !premature_exit
473 && !sessions::HasSyncerError(
474 session->status_controller().model_neutral_state());
475
476 if (success) {
477 // That cycle took care of any outstanding work we had.
478 SDVLOG(2) << "Nudge succeeded.";
479 nudge_tracker_.RecordSuccessfulSyncCycle();
480 scheduled_nudge_time_ = base::TimeTicks();
481
482 // If we're here, then we successfully reached the server. End all backoff.
483 wait_interval_.reset();
484 NotifyRetryTime(base::Time());
485 } else {
486 HandleFailure(session->status_controller().model_neutral_state());
487 }
488 }
489
DoConfigurationSyncSessionJob(JobPriority priority)490 void SyncSchedulerImpl::DoConfigurationSyncSessionJob(JobPriority priority) {
491 DCHECK(CalledOnValidThread());
492 DCHECK_EQ(mode_, CONFIGURATION_MODE);
493 DCHECK(pending_configure_params_ != NULL);
494
495 if (!CanRunJobNow(priority)) {
496 SDVLOG(2) << "Unable to run configure job right now.";
497 if (!pending_configure_params_->retry_task.is_null()) {
498 pending_configure_params_->retry_task.Run();
499 pending_configure_params_->retry_task.Reset();
500 }
501 return;
502 }
503
504 SDVLOG(2) << "Will run configure SyncShare with types "
505 << ModelTypeSetToString(session_context_->GetEnabledTypes());
506 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
507 bool premature_exit = !syncer_->ConfigureSyncShare(
508 pending_configure_params_->types_to_download,
509 pending_configure_params_->source,
510 session.get());
511 AdjustPolling(FORCE_RESET);
512 // Don't run poll job till the next time poll timer fires.
513 do_poll_after_credentials_updated_ = false;
514
515 bool success = !premature_exit
516 && !sessions::HasSyncerError(
517 session->status_controller().model_neutral_state());
518
519 if (success) {
520 SDVLOG(2) << "Configure succeeded.";
521 pending_configure_params_->ready_task.Run();
522 pending_configure_params_.reset();
523
524 // If we're here, then we successfully reached the server. End all backoff.
525 wait_interval_.reset();
526 NotifyRetryTime(base::Time());
527 } else {
528 HandleFailure(session->status_controller().model_neutral_state());
529 // Sync cycle might receive response from server that causes scheduler to
530 // stop and draws pending_configure_params_ invalid.
531 if (started_ && !pending_configure_params_->retry_task.is_null()) {
532 pending_configure_params_->retry_task.Run();
533 pending_configure_params_->retry_task.Reset();
534 }
535 }
536 }
537
HandleFailure(const sessions::ModelNeutralState & model_neutral_state)538 void SyncSchedulerImpl::HandleFailure(
539 const sessions::ModelNeutralState& model_neutral_state) {
540 if (IsCurrentlyThrottled()) {
541 SDVLOG(2) << "Was throttled during previous sync cycle.";
542 RestartWaiting();
543 } else if (!IsBackingOff()) {
544 // Setup our backoff if this is our first such failure.
545 TimeDelta length = delay_provider_->GetDelay(
546 delay_provider_->GetInitialDelay(model_neutral_state));
547 wait_interval_.reset(
548 new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length));
549 SDVLOG(2) << "Sync cycle failed. Will back off for "
550 << wait_interval_->length.InMilliseconds() << "ms.";
551 RestartWaiting();
552 }
553 }
554
DoPollSyncSessionJob()555 void SyncSchedulerImpl::DoPollSyncSessionJob() {
556 base::AutoReset<bool> protector(&no_scheduling_allowed_, true);
557
558 SDVLOG(2) << "Polling with types "
559 << ModelTypeSetToString(GetEnabledAndUnthrottledTypes());
560 scoped_ptr<SyncSession> session(SyncSession::Build(session_context_, this));
561 syncer_->PollSyncShare(
562 GetEnabledAndUnthrottledTypes(),
563 session.get());
564
565 AdjustPolling(FORCE_RESET);
566
567 if (IsCurrentlyThrottled()) {
568 SDVLOG(2) << "Poll request got us throttled.";
569 // The OnSilencedUntil() call set up the WaitInterval for us. All we need
570 // to do is start the timer.
571 RestartWaiting();
572 }
573 }
574
UpdateNudgeTimeRecords(ModelTypeSet types)575 void SyncSchedulerImpl::UpdateNudgeTimeRecords(ModelTypeSet types) {
576 DCHECK(CalledOnValidThread());
577 base::TimeTicks now = TimeTicks::Now();
578 // Update timing information for how often datatypes are triggering nudges.
579 for (ModelTypeSet::Iterator iter = types.First(); iter.Good(); iter.Inc()) {
580 base::TimeTicks previous = last_local_nudges_by_model_type_[iter.Get()];
581 last_local_nudges_by_model_type_[iter.Get()] = now;
582 if (previous.is_null())
583 continue;
584
585 #define PER_DATA_TYPE_MACRO(type_str) \
586 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous);
587 SYNC_DATA_TYPE_HISTOGRAM(iter.Get());
588 #undef PER_DATA_TYPE_MACRO
589 }
590 }
591
GetPollInterval()592 TimeDelta SyncSchedulerImpl::GetPollInterval() {
593 return (!session_context_->notifications_enabled() ||
594 !session_context_->ShouldFetchUpdatesBeforeCommit()) ?
595 syncer_short_poll_interval_seconds_ :
596 syncer_long_poll_interval_seconds_;
597 }
598
AdjustPolling(PollAdjustType type)599 void SyncSchedulerImpl::AdjustPolling(PollAdjustType type) {
600 DCHECK(CalledOnValidThread());
601
602 TimeDelta poll = GetPollInterval();
603 bool rate_changed = !poll_timer_.IsRunning() ||
604 poll != poll_timer_.GetCurrentDelay();
605
606 if (type == FORCE_RESET) {
607 last_poll_reset_ = base::TimeTicks::Now();
608 if (!rate_changed)
609 poll_timer_.Reset();
610 }
611
612 if (!rate_changed)
613 return;
614
615 // Adjust poll rate.
616 poll_timer_.Stop();
617 poll_timer_.Start(FROM_HERE, poll, this,
618 &SyncSchedulerImpl::PollTimerCallback);
619 }
620
RestartWaiting()621 void SyncSchedulerImpl::RestartWaiting() {
622 CHECK(wait_interval_.get());
623 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0));
624 NotifyRetryTime(base::Time::Now() + wait_interval_->length);
625 SDVLOG(2) << "Starting WaitInterval timer of length "
626 << wait_interval_->length.InMilliseconds() << "ms.";
627 if (wait_interval_->mode == WaitInterval::THROTTLED) {
628 pending_wakeup_timer_.Start(
629 FROM_HERE,
630 wait_interval_->length,
631 base::Bind(&SyncSchedulerImpl::Unthrottle,
632 weak_ptr_factory_.GetWeakPtr()));
633 } else {
634 pending_wakeup_timer_.Start(
635 FROM_HERE,
636 wait_interval_->length,
637 base::Bind(&SyncSchedulerImpl::ExponentialBackoffRetry,
638 weak_ptr_factory_.GetWeakPtr()));
639 }
640 }
641
Stop()642 void SyncSchedulerImpl::Stop() {
643 DCHECK(CalledOnValidThread());
644 SDVLOG(2) << "Stop called";
645
646 // Kill any in-flight method calls.
647 weak_ptr_factory_.InvalidateWeakPtrs();
648 wait_interval_.reset();
649 NotifyRetryTime(base::Time());
650 poll_timer_.Stop();
651 pending_wakeup_timer_.Stop();
652 pending_configure_params_.reset();
653 if (started_)
654 started_ = false;
655 }
656
657 // This is the only place where we invoke DoSyncSessionJob with canary
658 // privileges. Everyone else should use NORMAL_PRIORITY.
TryCanaryJob()659 void SyncSchedulerImpl::TryCanaryJob() {
660 next_sync_session_job_priority_ = CANARY_PRIORITY;
661 TrySyncSessionJob();
662 }
663
TrySyncSessionJob()664 void SyncSchedulerImpl::TrySyncSessionJob() {
665 // Post call to TrySyncSessionJobImpl on current thread. Later request for
666 // access token will be here.
667 base::MessageLoop::current()->PostTask(FROM_HERE, base::Bind(
668 &SyncSchedulerImpl::TrySyncSessionJobImpl,
669 weak_ptr_factory_.GetWeakPtr()));
670 }
671
TrySyncSessionJobImpl()672 void SyncSchedulerImpl::TrySyncSessionJobImpl() {
673 JobPriority priority = next_sync_session_job_priority_;
674 next_sync_session_job_priority_ = NORMAL_PRIORITY;
675
676 nudge_tracker_.SetSyncCycleStartTime(base::TimeTicks::Now());
677
678 DCHECK(CalledOnValidThread());
679 if (mode_ == CONFIGURATION_MODE) {
680 if (pending_configure_params_) {
681 SDVLOG(2) << "Found pending configure job";
682 DoConfigurationSyncSessionJob(priority);
683 }
684 } else if (CanRunNudgeJobNow(priority)) {
685 if (nudge_tracker_.IsSyncRequired()) {
686 SDVLOG(2) << "Found pending nudge job";
687 DoNudgeSyncSessionJob(priority);
688 } else if (do_poll_after_credentials_updated_ ||
689 ((base::TimeTicks::Now() - last_poll_reset_) >= GetPollInterval())) {
690 DoPollSyncSessionJob();
691 // Poll timer fires infrequently. Usually by this time access token is
692 // already expired and poll job will fail with auth error. Set flag to
693 // retry poll once ProfileSyncService gets new access token, TryCanaryJob
694 // will be called after access token is retrieved.
695 if (HttpResponse::SYNC_AUTH_ERROR ==
696 session_context_->connection_manager()->server_status()) {
697 do_poll_after_credentials_updated_ = true;
698 }
699 }
700 }
701
702 if (priority == CANARY_PRIORITY) {
703 // If this is canary job then whatever result was don't run poll job till
704 // the next time poll timer fires.
705 do_poll_after_credentials_updated_ = false;
706 }
707
708 if (IsBackingOff() && !pending_wakeup_timer_.IsRunning()) {
709 // If we succeeded, our wait interval would have been cleared. If it hasn't
710 // been cleared, then we should increase our backoff interval and schedule
711 // another retry.
712 TimeDelta length = delay_provider_->GetDelay(wait_interval_->length);
713 wait_interval_.reset(
714 new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, length));
715 SDVLOG(2) << "Sync cycle failed. Will back off for "
716 << wait_interval_->length.InMilliseconds() << "ms.";
717 RestartWaiting();
718 }
719 }
720
PollTimerCallback()721 void SyncSchedulerImpl::PollTimerCallback() {
722 DCHECK(CalledOnValidThread());
723 if (no_scheduling_allowed_) {
724 // The no_scheduling_allowed_ flag is set by a function-scoped AutoReset in
725 // functions that are called only on the sync thread. This function is also
726 // called only on the sync thread, and only when it is posted by an expiring
727 // timer. If we find that no_scheduling_allowed_ is set here, then
728 // something is very wrong. Maybe someone mistakenly called us directly, or
729 // mishandled the book-keeping for no_scheduling_allowed_.
730 NOTREACHED() << "Illegal to schedule job while session in progress.";
731 return;
732 }
733
734 TrySyncSessionJob();
735 }
736
RetryTimerCallback()737 void SyncSchedulerImpl::RetryTimerCallback() {
738 TrySyncSessionJob();
739 }
740
Unthrottle()741 void SyncSchedulerImpl::Unthrottle() {
742 DCHECK(CalledOnValidThread());
743 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
744
745 // We're no longer throttled, so clear the wait interval.
746 wait_interval_.reset();
747 NotifyRetryTime(base::Time());
748 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
749
750 // We treat this as a 'canary' in the sense that it was originally scheduled
751 // to run some time ago, failed, and we now want to retry, versus a job that
752 // was just created (e.g via ScheduleNudgeImpl). The main implication is
753 // that we're careful to update routing info (etc) with such potentially
754 // stale canary jobs.
755 TryCanaryJob();
756 }
757
TypeUnthrottle(base::TimeTicks unthrottle_time)758 void SyncSchedulerImpl::TypeUnthrottle(base::TimeTicks unthrottle_time) {
759 DCHECK(CalledOnValidThread());
760 nudge_tracker_.UpdateTypeThrottlingState(unthrottle_time);
761 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
762
763 if (nudge_tracker_.IsAnyTypeThrottled()) {
764 const base::TimeTicks now = base::TimeTicks::Now();
765 base::TimeDelta time_until_next_unthrottle =
766 nudge_tracker_.GetTimeUntilNextUnthrottle(now);
767 type_unthrottle_timer_.Start(
768 FROM_HERE,
769 time_until_next_unthrottle,
770 base::Bind(&SyncSchedulerImpl::TypeUnthrottle,
771 weak_ptr_factory_.GetWeakPtr(),
772 now + time_until_next_unthrottle));
773 }
774
775 // Maybe this is a good time to run a nudge job. Let's try it.
776 if (nudge_tracker_.IsSyncRequired() && CanRunNudgeJobNow(NORMAL_PRIORITY))
777 TrySyncSessionJob();
778 }
779
PerformDelayedNudge()780 void SyncSchedulerImpl::PerformDelayedNudge() {
781 // Circumstances may have changed since we scheduled this delayed nudge.
782 // We must check to see if it's OK to run the job before we do so.
783 if (CanRunNudgeJobNow(NORMAL_PRIORITY))
784 TrySyncSessionJob();
785
786 // We're not responsible for setting up any retries here. The functions that
787 // first put us into a state that prevents successful sync cycles (eg. global
788 // throttling, type throttling, network errors, transient errors) will also
789 // setup the appropriate retry logic (eg. retry after timeout, exponential
790 // backoff, retry when the network changes).
791 }
792
ExponentialBackoffRetry()793 void SyncSchedulerImpl::ExponentialBackoffRetry() {
794 TryCanaryJob();
795 }
796
NotifyRetryTime(base::Time retry_time)797 void SyncSchedulerImpl::NotifyRetryTime(base::Time retry_time) {
798 FOR_EACH_OBSERVER(SyncEngineEventListener,
799 *session_context_->listeners(),
800 OnRetryTimeChanged(retry_time));
801 }
802
NotifyThrottledTypesChanged(ModelTypeSet types)803 void SyncSchedulerImpl::NotifyThrottledTypesChanged(ModelTypeSet types) {
804 FOR_EACH_OBSERVER(SyncEngineEventListener,
805 *session_context_->listeners(),
806 OnThrottledTypesChanged(types));
807 }
808
IsBackingOff() const809 bool SyncSchedulerImpl::IsBackingOff() const {
810 DCHECK(CalledOnValidThread());
811 return wait_interval_.get() && wait_interval_->mode ==
812 WaitInterval::EXPONENTIAL_BACKOFF;
813 }
814
OnThrottled(const base::TimeDelta & throttle_duration)815 void SyncSchedulerImpl::OnThrottled(const base::TimeDelta& throttle_duration) {
816 DCHECK(CalledOnValidThread());
817 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED,
818 throttle_duration));
819 NotifyRetryTime(base::Time::Now() + wait_interval_->length);
820 NotifyThrottledTypesChanged(ModelTypeSet::All());
821 }
822
OnTypesThrottled(ModelTypeSet types,const base::TimeDelta & throttle_duration)823 void SyncSchedulerImpl::OnTypesThrottled(
824 ModelTypeSet types,
825 const base::TimeDelta& throttle_duration) {
826 base::TimeTicks now = base::TimeTicks::Now();
827
828 nudge_tracker_.SetTypesThrottledUntil(types, throttle_duration, now);
829 base::TimeDelta time_until_next_unthrottle =
830 nudge_tracker_.GetTimeUntilNextUnthrottle(now);
831 type_unthrottle_timer_.Start(
832 FROM_HERE,
833 time_until_next_unthrottle,
834 base::Bind(&SyncSchedulerImpl::TypeUnthrottle,
835 weak_ptr_factory_.GetWeakPtr(),
836 now + time_until_next_unthrottle));
837 NotifyThrottledTypesChanged(nudge_tracker_.GetThrottledTypes());
838 }
839
IsCurrentlyThrottled()840 bool SyncSchedulerImpl::IsCurrentlyThrottled() {
841 DCHECK(CalledOnValidThread());
842 return wait_interval_.get() && wait_interval_->mode ==
843 WaitInterval::THROTTLED;
844 }
845
OnReceivedShortPollIntervalUpdate(const base::TimeDelta & new_interval)846 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate(
847 const base::TimeDelta& new_interval) {
848 DCHECK(CalledOnValidThread());
849 syncer_short_poll_interval_seconds_ = new_interval;
850 }
851
OnReceivedLongPollIntervalUpdate(const base::TimeDelta & new_interval)852 void SyncSchedulerImpl::OnReceivedLongPollIntervalUpdate(
853 const base::TimeDelta& new_interval) {
854 DCHECK(CalledOnValidThread());
855 syncer_long_poll_interval_seconds_ = new_interval;
856 }
857
OnReceivedSessionsCommitDelay(const base::TimeDelta & new_delay)858 void SyncSchedulerImpl::OnReceivedSessionsCommitDelay(
859 const base::TimeDelta& new_delay) {
860 DCHECK(CalledOnValidThread());
861 sessions_commit_delay_ = new_delay;
862 }
863
OnReceivedClientInvalidationHintBufferSize(int size)864 void SyncSchedulerImpl::OnReceivedClientInvalidationHintBufferSize(int size) {
865 if (size > 0)
866 nudge_tracker_.SetHintBufferSize(size);
867 else
868 NOTREACHED() << "Hint buffer size should be > 0.";
869 }
870
OnSyncProtocolError(const SyncProtocolError & sync_protocol_error)871 void SyncSchedulerImpl::OnSyncProtocolError(
872 const SyncProtocolError& sync_protocol_error) {
873 DCHECK(CalledOnValidThread());
874 if (ShouldRequestEarlyExit(sync_protocol_error)) {
875 SDVLOG(2) << "Sync Scheduler requesting early exit.";
876 Stop();
877 }
878 if (IsActionableError(sync_protocol_error)) {
879 SDVLOG(2) << "OnActionableError";
880 FOR_EACH_OBSERVER(SyncEngineEventListener,
881 *session_context_->listeners(),
882 OnActionableError(sync_protocol_error));
883 }
884 }
885
OnReceivedGuRetryDelay(const base::TimeDelta & delay)886 void SyncSchedulerImpl::OnReceivedGuRetryDelay(const base::TimeDelta& delay) {
887 nudge_tracker_.SetNextRetryTime(TimeTicks::Now() + delay);
888 retry_timer_.Start(FROM_HERE, delay, this,
889 &SyncSchedulerImpl::RetryTimerCallback);
890 }
891
OnReceivedMigrationRequest(ModelTypeSet types)892 void SyncSchedulerImpl::OnReceivedMigrationRequest(ModelTypeSet types) {
893 FOR_EACH_OBSERVER(SyncEngineEventListener,
894 *session_context_->listeners(),
895 OnMigrationRequested(types));
896 }
897
SetNotificationsEnabled(bool notifications_enabled)898 void SyncSchedulerImpl::SetNotificationsEnabled(bool notifications_enabled) {
899 DCHECK(CalledOnValidThread());
900 session_context_->set_notifications_enabled(notifications_enabled);
901 if (notifications_enabled)
902 nudge_tracker_.OnInvalidationsEnabled();
903 else
904 nudge_tracker_.OnInvalidationsDisabled();
905 }
906
GetSessionsCommitDelay() const907 base::TimeDelta SyncSchedulerImpl::GetSessionsCommitDelay() const {
908 DCHECK(CalledOnValidThread());
909 return sessions_commit_delay_;
910 }
911
912 #undef SDVLOG_LOC
913
914 #undef SDVLOG
915
916 #undef SLOG
917
918 #undef ENUM_CASE
919
920 } // namespace syncer
921