1 /*
<lambda>null2 * Copyright 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 package androidx.work.impl
17
18 import android.annotation.SuppressLint
19 import android.content.Context
20 import androidx.annotation.RestrictTo
21 import androidx.annotation.VisibleForTesting
22 import androidx.work.Clock
23 import androidx.work.Configuration
24 import androidx.work.Data
25 import androidx.work.DirectExecutor
26 import androidx.work.ListenableWorker
27 import androidx.work.ListenableWorker.Result.Failure
28 import androidx.work.Logger
29 import androidx.work.WorkInfo
30 import androidx.work.WorkerExceptionInfo
31 import androidx.work.WorkerParameters
32 import androidx.work.impl.WorkerWrapper.Resolution.ResetWorkerStatus
33 import androidx.work.impl.foreground.ForegroundProcessor
34 import androidx.work.impl.model.DependencyDao
35 import androidx.work.impl.model.WorkGenerationalId
36 import androidx.work.impl.model.WorkSpec
37 import androidx.work.impl.model.WorkSpecDao
38 import androidx.work.impl.model.generationalId
39 import androidx.work.impl.utils.WorkForegroundUpdater
40 import androidx.work.impl.utils.WorkProgressUpdater
41 import androidx.work.impl.utils.safeAccept
42 import androidx.work.impl.utils.taskexecutor.TaskExecutor
43 import androidx.work.impl.utils.workForeground
44 import androidx.work.launchFuture
45 import androidx.work.logd
46 import androidx.work.loge
47 import androidx.work.logi
48 import com.google.common.util.concurrent.ListenableFuture
49 import java.util.UUID
50 import java.util.concurrent.Callable
51 import java.util.concurrent.CancellationException
52 import java.util.concurrent.ExecutionException
53 import java.util.concurrent.Future
54 import kotlin.collections.removeLast as removeLastKt
55 import kotlin.coroutines.coroutineContext
56 import kotlin.coroutines.resumeWithException
57 import kotlinx.coroutines.CancellableContinuation
58 import kotlinx.coroutines.Job
59 import kotlinx.coroutines.asCoroutineDispatcher
60 import kotlinx.coroutines.suspendCancellableCoroutine
61 import kotlinx.coroutines.withContext
62
63 /**
64 * A runnable that looks up the [WorkSpec] from the database for a given id, instantiates its
65 * Worker, and then calls it.
66 */
67 @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
68 class WorkerWrapper internal constructor(builder: Builder) {
69 val workSpec: WorkSpec = builder.workSpec
70 private val appContext: Context = builder.appContext
71 private val workSpecId: String = workSpec.id
72 private val runtimeExtras: WorkerParameters.RuntimeExtras = builder.runtimeExtras
73
74 private val builderWorker: ListenableWorker? = builder.worker
75 private val workTaskExecutor: TaskExecutor = builder.workTaskExecutor
76
77 private val configuration: Configuration = builder.configuration
78 private val clock: Clock = configuration.clock
79 private val foregroundProcessor: ForegroundProcessor = builder.foregroundProcessor
80 private val workDatabase: WorkDatabase = builder.workDatabase
81 private val workSpecDao: WorkSpecDao = workDatabase.workSpecDao()
82 private val dependencyDao: DependencyDao = workDatabase.dependencyDao()
83 private val tags: List<String> = builder.tags
84 private val workDescription: String = createWorkDescription(tags)
85
86 private val workerJob = Job()
87
88 val workGenerationalId: WorkGenerationalId
89 get() = workSpec.generationalId()
90
91 fun launch(): ListenableFuture<Boolean> =
92 launchFuture(workTaskExecutor.taskCoroutineDispatcher + Job()) {
93 val resolution: Resolution =
94 try {
95 // we're wrapping runWorker in separate job, so we can always run post
96 // processing
97 // without a fear of being cancelled.
98 withContext(workerJob) { runWorker() }
99 } catch (workerStoppedException: WorkerStoppedException) {
100 ResetWorkerStatus(workerStoppedException.reason)
101 } catch (e: CancellationException) {
102 // means that worker was self-cancelled, which we treat as failure
103 Resolution.Failed()
104 } catch (throwable: Throwable) {
105 loge(TAG, throwable) { "Unexpected error in WorkerWrapper" }
106 Resolution.Failed()
107 }
108 workDatabase.runInTransaction(
109 Callable {
110 when (resolution) {
111 is Resolution.Finished -> onWorkFinished(resolution.result)
112 is Resolution.Failed -> {
113 setFailed(resolution.result)
114 false
115 }
116 is ResetWorkerStatus -> resetWorkerStatus(resolution.reason)
117 }
118 }
119 )
120 }
121
122 private sealed class Resolution {
123 class ResetWorkerStatus(val reason: Int = WorkInfo.STOP_REASON_NOT_STOPPED) : Resolution()
124
125 class Failed(val result: ListenableWorker.Result = Failure()) : Resolution()
126
127 class Finished(val result: ListenableWorker.Result) : Resolution()
128 }
129
130 private suspend fun runWorker(): Resolution {
131 val isTracingEnabled = configuration.tracer.isEnabled()
132 val traceTag = workSpec.traceTag
133 if (isTracingEnabled && traceTag != null) {
134 configuration.tracer.beginAsyncSection(
135 traceTag,
136 // Use hashCode() instead of a generational id given we want to allow concurrent
137 // execution of Workers with the same name. Additionally `generation` is already
138 // a part of the WorkSpec's hashCode.
139 workSpec.hashCode()
140 )
141 }
142 // Needed for nested transactions, such as when we're in a dependent work request when
143 // using a SynchronousExecutor.
144 val shouldExit =
145 workDatabase.runInTransaction(
146 Callable {
147 // Do a quick check to make sure we don't need to bail out in case this work is
148 // already
149 // running, finished, or is blocked.
150 if (workSpec.state !== WorkInfo.State.ENQUEUED) {
151 logd(TAG) {
152 "${workSpec.workerClassName} is not in ENQUEUED state. Nothing more to do"
153 }
154 return@Callable true
155 }
156
157 // Case 1:
158 // Ensure that Workers that are backed off are only executed when they are
159 // supposed to.
160 // GreedyScheduler can schedule WorkSpecs that have already been backed off
161 // because
162 // it is holding on to snapshots of WorkSpecs. So WorkerWrapper needs to
163 // determine
164 // if the ListenableWorker is actually eligible to execute at this point in
165 // time.
166
167 // Case 2:
168 // On API 23, we double scheduler Workers because JobScheduler prefers batching.
169 // So is the Work is periodic, we only need to execute it once per interval.
170 // Also potential bugs in the platform may cause a Job to run more than once.
171 if (workSpec.isPeriodic || workSpec.isBackedOff) {
172 val now = clock.currentTimeMillis()
173 if (now < workSpec.calculateNextRunTime()) {
174 Logger.get()
175 .debug(
176 TAG,
177 "Delaying execution for ${workSpec.workerClassName} because it is " +
178 "being executed before schedule.",
179 )
180
181 // For AlarmManager implementation we need to reschedule this kind of
182 // Work.
183 // This is not a problem for JobScheduler because we will only
184 // reschedule
185 // work if JobScheduler is unaware of a jobId.
186 return@Callable true
187 }
188 }
189 return@Callable false
190 }
191 )
192
193 if (shouldExit) return ResetWorkerStatus()
194
195 // Merge inputs. This can be potentially expensive code, so this should not be done inside
196 // a database transaction.
197 val input: Data =
198 if (workSpec.isPeriodic) {
199 workSpec.input
200 } else {
201 val inputMergerFactory = configuration.inputMergerFactory
202 val inputMergerClassName = workSpec.inputMergerClassName
203 val inputMerger =
204 inputMergerFactory.createInputMergerWithDefaultFallback(inputMergerClassName)
205 if (inputMerger == null) {
206 loge(TAG) { "Could not create Input Merger ${workSpec.inputMergerClassName}" }
207 return Resolution.Failed()
208 }
209 val inputs =
210 listOf(workSpec.input) + workSpecDao.getInputsFromPrerequisites(workSpecId)
211 inputMerger.merge(inputs)
212 }
213 val params =
214 WorkerParameters(
215 UUID.fromString(workSpecId),
216 input,
217 tags,
218 runtimeExtras,
219 workSpec.runAttemptCount,
220 workSpec.generation,
221 configuration.executor,
222 configuration.workerCoroutineContext,
223 workTaskExecutor,
224 configuration.workerFactory,
225 WorkProgressUpdater(workDatabase, workTaskExecutor),
226 WorkForegroundUpdater(workDatabase, foregroundProcessor, workTaskExecutor)
227 )
228
229 // Not always creating a worker here, as the WorkerWrapper.Builder can set a worker override
230 // in test mode.
231 val worker =
232 builderWorker
233 ?: try {
234 configuration.workerFactory.createWorkerWithDefaultFallback(
235 appContext,
236 workSpec.workerClassName,
237 params
238 )
239 } catch (e: Throwable) {
240 loge(TAG) { "Could not create Worker ${workSpec.workerClassName}" }
241
242 configuration.workerInitializationExceptionHandler?.safeAccept(
243 WorkerExceptionInfo(workSpec.workerClassName, params, e),
244 TAG
245 )
246 return Resolution.Failed()
247 }
248 worker.setUsed()
249 // we specifically use coroutineContext[Job] instead of workerJob
250 // because it will be complete once withContext finishes.
251 // This way if worker has successfully finished and then
252 // interrupt() is called, then it is ignored, because
253 // job is already completed.
254 val job = coroutineContext[Job]!!
255
256 // worker stopping is complicated process.
257 // Historical behavior that we are trying to preserve is that
258 // worker.onStopped is always called in case of stoppage since the worker is instantiated,
259 // no matter if other methods such as startWork or getForegroundInfoAsync were called.
260 //
261 // Another important behavior is that worker should be marked as stopped before
262 // calling .cancel() on the future returned from the startWork(). So the listeners of this
263 // future could check what was the stop reason via `getStopReason()`, including listeners
264 // that were added with the direct executor.
265 // worker.stop() could be safely called multiple times, (only first one is effective),
266 // and we rely on this property.
267 // The completion listener below is for the cases when
268 // 1. getForegroundInfoAsync / startWork weren't called yet at all
269 // 2. when WorkerWrapper received stop signal when getForegroundInfoAsync() completed
270 // and startWork() hasn't been called yet.
271 // 3. startWork's future was completed, but job was cancelled before we actually received
272 // a notification about future's completion. (it is the natural race between stop signal
273 // and future completion, that we can't avoid. In this case worker will be decided as
274 // stopped and re-enqueued for another attempt)
275 job.invokeOnCompletion {
276 if (it is WorkerStoppedException) {
277 worker.stop(it.reason)
278 }
279 if (isTracingEnabled && traceTag != null) {
280 configuration.tracer.endAsyncSection(traceTag, workSpec.hashCode())
281 }
282 }
283
284 // Try to set the work to the running state. Note that this may fail because another thread
285 // may have modified the DB since we checked last at the top of this function.
286 if (!trySetRunning()) {
287 return ResetWorkerStatus()
288 }
289
290 if (job.isCancelled) {
291 // doesn't matter job is cancelled anyway
292 return ResetWorkerStatus()
293 }
294
295 val foregroundUpdater = params.foregroundUpdater
296 val mainDispatcher = workTaskExecutor.getMainThreadExecutor().asCoroutineDispatcher()
297 try {
298 val result =
299 withContext(mainDispatcher) {
300 workForeground(
301 appContext,
302 workSpec,
303 worker,
304 foregroundUpdater,
305 workTaskExecutor
306 )
307 logd(TAG) { "Starting work for ${workSpec.workerClassName}" }
308 // *important* we can't pass future around suspension points
309 // because we will lose cancellation, so we have to await
310 // right here on the main thread.
311 worker.startWork().awaitWithin(worker)
312 }
313 return Resolution.Finished(result)
314 } catch (cancellation: CancellationException) {
315 logi(TAG, cancellation) { "$workDescription was cancelled" }
316 throw cancellation
317 } catch (throwable: Throwable) {
318 loge(TAG, throwable) { "$workDescription failed because it threw an exception/error" }
319 configuration.workerExecutionExceptionHandler?.safeAccept(
320 WorkerExceptionInfo(workSpec.workerClassName, params, throwable),
321 TAG
322 )
323 return Resolution.Failed()
324 }
325 }
326
327 private fun onWorkFinished(result: ListenableWorker.Result): Boolean {
328 val state = workSpecDao.getState(workSpecId)
329 workDatabase.workProgressDao().delete(workSpecId)
330 return if (state == null) {
331 // state can be null here with a REPLACE on beginUniqueWork().
332 // Treat it as a failure, and rescheduleAndResolve() will
333 // turn into a no-op. We still need to notify potential observers
334 // holding on to wake locks on our behalf.
335 false
336 } else if (state === WorkInfo.State.RUNNING) {
337 handleResult(result)
338 } else if (!state.isFinished) {
339 // counting this is stopped with unknown reason
340 reschedule(WorkInfo.STOP_REASON_UNKNOWN)
341 } else {
342 false
343 }
344 }
345
346 @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
347 fun interrupt(stopReason: Int) {
348 workerJob.cancel(WorkerStoppedException(stopReason))
349 }
350
351 private fun resetWorkerStatus(stopReason: Int): Boolean {
352 return if (workSpec.backOffOnSystemInterruptions == true) {
353 logd(TAG) { "Worker ${workSpec.workerClassName} was interrupted. Backing off." }
354 // Treat it as a reschedule. This ensures that we update the last enqueued time
355 // which in turn ensures that we correctly calculate the next runtime for the
356 // given Worker.
357 reschedule(stopReason)
358 true
359 } else {
360 val state = workSpecDao.getState(workSpecId)
361 return if (state != null && !state.isFinished) {
362 logd(TAG) {
363 "Status for $workSpecId is $state; not doing any work and " +
364 "rescheduling for later execution"
365 }
366 // Set state to ENQUEUED again.
367 // Reset scheduled state so it's picked up by background schedulers again.
368 // We want to preserve time when work was enqueued so just explicitly set enqueued
369 // instead using markEnqueuedState. Similarly, don't change any override time.
370 workSpecDao.setState(WorkInfo.State.ENQUEUED, workSpecId)
371 workSpecDao.setStopReason(workSpecId, stopReason)
372 workSpecDao.markWorkSpecScheduled(workSpecId, WorkSpec.SCHEDULE_NOT_REQUESTED_YET)
373 true
374 } else {
375 logd(TAG) { "Status for $workSpecId is $state ; not doing any work" }
376 false
377 }
378 }
379 }
380
381 private fun handleResult(result: ListenableWorker.Result?): Boolean {
382 return if (result is ListenableWorker.Result.Success) {
383 logi(TAG) { "Worker result SUCCESS for $workDescription" }
384 if (workSpec.isPeriodic) {
385 resetPeriodic()
386 } else {
387 setSucceeded(result)
388 }
389 } else if (result is ListenableWorker.Result.Retry) {
390 logi(TAG) { "Worker result RETRY for $workDescription" }
391 reschedule(WorkInfo.STOP_REASON_NOT_STOPPED)
392 } else {
393 logi(TAG) { "Worker result FAILURE for $workDescription" }
394 if (workSpec.isPeriodic) {
395 resetPeriodic()
396 } else {
397 // we have here either failure or null
398 setFailed(result ?: Failure())
399 }
400 }
401 }
402
403 private fun trySetRunning(): Boolean =
404 workDatabase.runInTransaction(
405 Callable {
406 val currentState = workSpecDao.getState(workSpecId)
407 if (currentState === WorkInfo.State.ENQUEUED) {
408 workSpecDao.setState(WorkInfo.State.RUNNING, workSpecId)
409 workSpecDao.incrementWorkSpecRunAttemptCount(workSpecId)
410 workSpecDao.setStopReason(workSpecId, WorkInfo.STOP_REASON_NOT_STOPPED)
411 true
412 } else false
413 }
414 )
415
416 @VisibleForTesting
417 fun setFailed(result: ListenableWorker.Result): Boolean {
418 iterativelyFailWorkAndDependents(workSpecId)
419 val failure = result as Failure
420 // Update Data as necessary.
421 val output = failure.outputData
422 workSpecDao.resetWorkSpecNextScheduleTimeOverride(
423 workSpecId,
424 workSpec.nextScheduleTimeOverrideGeneration
425 )
426 workSpecDao.setOutput(workSpecId, output)
427 return false
428 }
429
430 private fun iterativelyFailWorkAndDependents(workSpecId: String) {
431 val idsToProcess = mutableListOf(workSpecId)
432 while (idsToProcess.isNotEmpty()) {
433 val id = idsToProcess.removeLastKt()
434 // Don't fail already cancelled work.
435 if (workSpecDao.getState(id) !== WorkInfo.State.CANCELLED) {
436 workSpecDao.setState(WorkInfo.State.FAILED, id)
437 }
438 idsToProcess.addAll(dependencyDao.getDependentWorkIds(id))
439 }
440 }
441
442 private fun reschedule(stopReason: Int): Boolean {
443 workSpecDao.setState(WorkInfo.State.ENQUEUED, workSpecId)
444 workSpecDao.setLastEnqueueTime(workSpecId, clock.currentTimeMillis())
445 workSpecDao.resetWorkSpecNextScheduleTimeOverride(
446 workSpecId,
447 workSpec.nextScheduleTimeOverrideGeneration
448 )
449 workSpecDao.markWorkSpecScheduled(workSpecId, WorkSpec.SCHEDULE_NOT_REQUESTED_YET)
450 workSpecDao.setStopReason(workSpecId, stopReason)
451 return true
452 }
453
454 private fun resetPeriodic(): Boolean {
455 // The system clock may have been changed such that the lastEnqueueTime was in the past.
456 // Therefore we always use the current time to determine the next run time of a Worker.
457 // This way, the Schedulers will correctly schedule the next instance of the
458 // PeriodicWork in the future. This happens in calculateNextRunTime() in WorkSpec.
459 workSpecDao.setLastEnqueueTime(workSpecId, clock.currentTimeMillis())
460 workSpecDao.setState(WorkInfo.State.ENQUEUED, workSpecId)
461 workSpecDao.resetWorkSpecRunAttemptCount(workSpecId)
462 workSpecDao.resetWorkSpecNextScheduleTimeOverride(
463 workSpecId,
464 workSpec.nextScheduleTimeOverrideGeneration
465 )
466 workSpecDao.incrementPeriodCount(workSpecId)
467 workSpecDao.markWorkSpecScheduled(workSpecId, WorkSpec.SCHEDULE_NOT_REQUESTED_YET)
468 return false
469 }
470
471 private fun setSucceeded(result: ListenableWorker.Result): Boolean {
472 workSpecDao.setState(WorkInfo.State.SUCCEEDED, workSpecId)
473 val success = result as ListenableWorker.Result.Success
474 // Update Data as necessary.
475 val output = success.outputData
476 workSpecDao.setOutput(workSpecId, output)
477
478 // Unblock Dependencies and set Period Start Time
479 val currentTimeMillis = clock.currentTimeMillis()
480 val dependentWorkIds = dependencyDao.getDependentWorkIds(workSpecId)
481 for (dependentWorkId in dependentWorkIds) {
482 if (
483 workSpecDao.getState(dependentWorkId) === WorkInfo.State.BLOCKED &&
484 dependencyDao.hasCompletedAllPrerequisites(dependentWorkId)
485 ) {
486 logi(TAG) { "Setting status to enqueued for $dependentWorkId" }
487 workSpecDao.setState(WorkInfo.State.ENQUEUED, dependentWorkId)
488 workSpecDao.setLastEnqueueTime(dependentWorkId, currentTimeMillis)
489 }
490 }
491 return false
492 }
493
494 private fun createWorkDescription(tags: List<String>) =
495 "Work [ id=$workSpecId, tags={ ${tags.joinToString(",")} } ]"
496
497 /** Builder class for [WorkerWrapper] */
498 @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
499 class Builder
500 @SuppressLint("LambdaLast")
501 constructor(
502 context: Context,
503 val configuration: Configuration,
504 val workTaskExecutor: TaskExecutor,
505 val foregroundProcessor: ForegroundProcessor,
506 val workDatabase: WorkDatabase,
507 val workSpec: WorkSpec,
508 val tags: List<String>
509 ) {
510 val appContext: Context = context.applicationContext
511 var worker: ListenableWorker? = null
512 var runtimeExtras = WorkerParameters.RuntimeExtras()
513
514 /**
515 * @param runtimeExtras The [WorkerParameters.RuntimeExtras] for the worker; if this is
516 * `null`, it will be ignored and the default value will be retained.
517 * @return The instance of [Builder] for chaining.
518 */
519 fun withRuntimeExtras(runtimeExtras: WorkerParameters.RuntimeExtras?): Builder {
520 if (runtimeExtras != null) {
521 this.runtimeExtras = runtimeExtras
522 }
523 return this
524 }
525
526 /**
527 * @param worker The instance of [ListenableWorker] to be executed by [WorkerWrapper].
528 * Useful in the context of testing.
529 * @return The instance of [Builder] for chaining.
530 */
531 @VisibleForTesting
532 fun withWorker(worker: ListenableWorker): Builder {
533 this.worker = worker
534 return this
535 }
536
537 /** @return The instance of [WorkerWrapper]. */
538 fun build(): WorkerWrapper {
539 return WorkerWrapper(this)
540 }
541 }
542 }
543
544 private val TAG = Logger.tagWithPrefix("WorkerWrapper")
545
546 // copy of await() function but with specific cancellation propagation.
547 // it is needed that we specifically want to call .stop() on worker itself before
548 // calling cancel() of the future.
549 @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
awaitWithinnull550 suspend fun <T> ListenableFuture<T>.awaitWithin(worker: ListenableWorker): T {
551 try {
552 if (isDone) return getUninterruptibly(this)
553 } catch (e: ExecutionException) {
554 // ExecutionException is the only kind of exception that can be thrown from a gotten
555 // Future, other than CancellationException. Cancellation is propagated upward so that
556 // the coroutine running this suspend function may process it.
557 // Any other Exception showing up here indicates a very fundamental bug in a
558 // Future implementation.
559 throw e.nonNullCause()
560 }
561
562 return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
563 addListener(ToContinuation(this, cont), DirectExecutor.INSTANCE)
564 cont.invokeOnCancellation {
565 if (it is WorkerStoppedException) {
566 worker.stop(it.reason)
567 }
568 cancel(false)
569 }
570 }
571 }
572
573 @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
574 class WorkerStoppedException(val reason: Int) : CancellationException()
575
576 private class ToContinuation<T>(
577 val futureToObserve: ListenableFuture<T>,
578 val continuation: CancellableContinuation<T>
579 ) : Runnable {
runnull580 override fun run() {
581 if (futureToObserve.isCancelled) {
582 continuation.cancel()
583 } else {
584 try {
585 continuation.resumeWith(Result.success(getUninterruptibly(futureToObserve)))
586 } catch (e: ExecutionException) {
587 // ExecutionException is the only kind of exception that can be thrown from a gotten
588 // Future. Anything else showing up here indicates a very fundamental bug in a
589 // Future implementation.
590 continuation.resumeWithException(e.nonNullCause())
591 }
592 }
593 }
594 }
595
getUninterruptiblynull596 private fun <V> getUninterruptibly(future: Future<V>): V {
597 var interrupted = false
598 try {
599 while (true) {
600 try {
601 return future.get()
602 } catch (e: InterruptedException) {
603 interrupted = true
604 }
605 }
606 } finally {
607 if (interrupted) {
608 Thread.currentThread().interrupt()
609 }
610 }
611 }
612
ExecutionExceptionnull613 private fun ExecutionException.nonNullCause(): Throwable {
614 return this.cause!!
615 }
616