1 /*
<lambda>null2  * Copyright 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package androidx.work.impl
17 
18 import android.annotation.SuppressLint
19 import android.content.Context
20 import androidx.annotation.RestrictTo
21 import androidx.annotation.VisibleForTesting
22 import androidx.work.Clock
23 import androidx.work.Configuration
24 import androidx.work.Data
25 import androidx.work.DirectExecutor
26 import androidx.work.ListenableWorker
27 import androidx.work.ListenableWorker.Result.Failure
28 import androidx.work.Logger
29 import androidx.work.WorkInfo
30 import androidx.work.WorkerExceptionInfo
31 import androidx.work.WorkerParameters
32 import androidx.work.impl.WorkerWrapper.Resolution.ResetWorkerStatus
33 import androidx.work.impl.foreground.ForegroundProcessor
34 import androidx.work.impl.model.DependencyDao
35 import androidx.work.impl.model.WorkGenerationalId
36 import androidx.work.impl.model.WorkSpec
37 import androidx.work.impl.model.WorkSpecDao
38 import androidx.work.impl.model.generationalId
39 import androidx.work.impl.utils.WorkForegroundUpdater
40 import androidx.work.impl.utils.WorkProgressUpdater
41 import androidx.work.impl.utils.safeAccept
42 import androidx.work.impl.utils.taskexecutor.TaskExecutor
43 import androidx.work.impl.utils.workForeground
44 import androidx.work.launchFuture
45 import androidx.work.logd
46 import androidx.work.loge
47 import androidx.work.logi
48 import com.google.common.util.concurrent.ListenableFuture
49 import java.util.UUID
50 import java.util.concurrent.Callable
51 import java.util.concurrent.CancellationException
52 import java.util.concurrent.ExecutionException
53 import java.util.concurrent.Future
54 import kotlin.collections.removeLast as removeLastKt
55 import kotlin.coroutines.coroutineContext
56 import kotlin.coroutines.resumeWithException
57 import kotlinx.coroutines.CancellableContinuation
58 import kotlinx.coroutines.Job
59 import kotlinx.coroutines.asCoroutineDispatcher
60 import kotlinx.coroutines.suspendCancellableCoroutine
61 import kotlinx.coroutines.withContext
62 
63 /**
64  * A runnable that looks up the [WorkSpec] from the database for a given id, instantiates its
65  * Worker, and then calls it.
66  */
67 @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
68 class WorkerWrapper internal constructor(builder: Builder) {
69     val workSpec: WorkSpec = builder.workSpec
70     private val appContext: Context = builder.appContext
71     private val workSpecId: String = workSpec.id
72     private val runtimeExtras: WorkerParameters.RuntimeExtras = builder.runtimeExtras
73 
74     private val builderWorker: ListenableWorker? = builder.worker
75     private val workTaskExecutor: TaskExecutor = builder.workTaskExecutor
76 
77     private val configuration: Configuration = builder.configuration
78     private val clock: Clock = configuration.clock
79     private val foregroundProcessor: ForegroundProcessor = builder.foregroundProcessor
80     private val workDatabase: WorkDatabase = builder.workDatabase
81     private val workSpecDao: WorkSpecDao = workDatabase.workSpecDao()
82     private val dependencyDao: DependencyDao = workDatabase.dependencyDao()
83     private val tags: List<String> = builder.tags
84     private val workDescription: String = createWorkDescription(tags)
85 
86     private val workerJob = Job()
87 
88     val workGenerationalId: WorkGenerationalId
89         get() = workSpec.generationalId()
90 
91     fun launch(): ListenableFuture<Boolean> =
92         launchFuture(workTaskExecutor.taskCoroutineDispatcher + Job()) {
93             val resolution: Resolution =
94                 try {
95                     // we're wrapping runWorker in separate job, so we can always run post
96                     // processing
97                     // without a fear of being cancelled.
98                     withContext(workerJob) { runWorker() }
99                 } catch (workerStoppedException: WorkerStoppedException) {
100                     ResetWorkerStatus(workerStoppedException.reason)
101                 } catch (e: CancellationException) {
102                     // means that worker was self-cancelled, which we treat as failure
103                     Resolution.Failed()
104                 } catch (throwable: Throwable) {
105                     loge(TAG, throwable) { "Unexpected error in WorkerWrapper" }
106                     Resolution.Failed()
107                 }
108             workDatabase.runInTransaction(
109                 Callable {
110                     when (resolution) {
111                         is Resolution.Finished -> onWorkFinished(resolution.result)
112                         is Resolution.Failed -> {
113                             setFailed(resolution.result)
114                             false
115                         }
116                         is ResetWorkerStatus -> resetWorkerStatus(resolution.reason)
117                     }
118                 }
119             )
120         }
121 
122     private sealed class Resolution {
123         class ResetWorkerStatus(val reason: Int = WorkInfo.STOP_REASON_NOT_STOPPED) : Resolution()
124 
125         class Failed(val result: ListenableWorker.Result = Failure()) : Resolution()
126 
127         class Finished(val result: ListenableWorker.Result) : Resolution()
128     }
129 
130     private suspend fun runWorker(): Resolution {
131         val isTracingEnabled = configuration.tracer.isEnabled()
132         val traceTag = workSpec.traceTag
133         if (isTracingEnabled && traceTag != null) {
134             configuration.tracer.beginAsyncSection(
135                 traceTag,
136                 // Use hashCode() instead of a generational id given we want to allow concurrent
137                 // execution of Workers with the same name. Additionally `generation` is already
138                 // a part of the WorkSpec's hashCode.
139                 workSpec.hashCode()
140             )
141         }
142         // Needed for nested transactions, such as when we're in a dependent work request when
143         // using a SynchronousExecutor.
144         val shouldExit =
145             workDatabase.runInTransaction(
146                 Callable {
147                     // Do a quick check to make sure we don't need to bail out in case this work is
148                     // already
149                     // running, finished, or is blocked.
150                     if (workSpec.state !== WorkInfo.State.ENQUEUED) {
151                         logd(TAG) {
152                             "${workSpec.workerClassName} is not in ENQUEUED state. Nothing more to do"
153                         }
154                         return@Callable true
155                     }
156 
157                     // Case 1:
158                     // Ensure that Workers that are backed off are only executed when they are
159                     // supposed to.
160                     // GreedyScheduler can schedule WorkSpecs that have already been backed off
161                     // because
162                     // it is holding on to snapshots of WorkSpecs. So WorkerWrapper needs to
163                     // determine
164                     // if the ListenableWorker is actually eligible to execute at this point in
165                     // time.
166 
167                     // Case 2:
168                     // On API 23, we double scheduler Workers because JobScheduler prefers batching.
169                     // So is the Work is periodic, we only need to execute it once per interval.
170                     // Also potential bugs in the platform may cause a Job to run more than once.
171                     if (workSpec.isPeriodic || workSpec.isBackedOff) {
172                         val now = clock.currentTimeMillis()
173                         if (now < workSpec.calculateNextRunTime()) {
174                             Logger.get()
175                                 .debug(
176                                     TAG,
177                                     "Delaying execution for ${workSpec.workerClassName} because it is " +
178                                         "being executed before schedule.",
179                                 )
180 
181                             // For AlarmManager implementation we need to reschedule this kind  of
182                             // Work.
183                             // This is not a problem for JobScheduler because we will only
184                             // reschedule
185                             // work if JobScheduler is unaware of a jobId.
186                             return@Callable true
187                         }
188                     }
189                     return@Callable false
190                 }
191             )
192 
193         if (shouldExit) return ResetWorkerStatus()
194 
195         // Merge inputs.  This can be potentially expensive code, so this should not be done inside
196         // a database transaction.
197         val input: Data =
198             if (workSpec.isPeriodic) {
199                 workSpec.input
200             } else {
201                 val inputMergerFactory = configuration.inputMergerFactory
202                 val inputMergerClassName = workSpec.inputMergerClassName
203                 val inputMerger =
204                     inputMergerFactory.createInputMergerWithDefaultFallback(inputMergerClassName)
205                 if (inputMerger == null) {
206                     loge(TAG) { "Could not create Input Merger ${workSpec.inputMergerClassName}" }
207                     return Resolution.Failed()
208                 }
209                 val inputs =
210                     listOf(workSpec.input) + workSpecDao.getInputsFromPrerequisites(workSpecId)
211                 inputMerger.merge(inputs)
212             }
213         val params =
214             WorkerParameters(
215                 UUID.fromString(workSpecId),
216                 input,
217                 tags,
218                 runtimeExtras,
219                 workSpec.runAttemptCount,
220                 workSpec.generation,
221                 configuration.executor,
222                 configuration.workerCoroutineContext,
223                 workTaskExecutor,
224                 configuration.workerFactory,
225                 WorkProgressUpdater(workDatabase, workTaskExecutor),
226                 WorkForegroundUpdater(workDatabase, foregroundProcessor, workTaskExecutor)
227             )
228 
229         // Not always creating a worker here, as the WorkerWrapper.Builder can set a worker override
230         // in test mode.
231         val worker =
232             builderWorker
233                 ?: try {
234                     configuration.workerFactory.createWorkerWithDefaultFallback(
235                         appContext,
236                         workSpec.workerClassName,
237                         params
238                     )
239                 } catch (e: Throwable) {
240                     loge(TAG) { "Could not create Worker ${workSpec.workerClassName}" }
241 
242                     configuration.workerInitializationExceptionHandler?.safeAccept(
243                         WorkerExceptionInfo(workSpec.workerClassName, params, e),
244                         TAG
245                     )
246                     return Resolution.Failed()
247                 }
248         worker.setUsed()
249         // we specifically use coroutineContext[Job] instead of workerJob
250         // because it will be complete once withContext finishes.
251         // This way if worker has successfully finished and then
252         // interrupt() is called, then it is ignored, because
253         // job is already completed.
254         val job = coroutineContext[Job]!!
255 
256         // worker stopping is complicated process.
257         // Historical behavior that we are trying to preserve is that
258         // worker.onStopped is always called in case of stoppage since the worker is instantiated,
259         // no matter if other methods such as startWork or getForegroundInfoAsync were called.
260         //
261         // Another important behavior is that worker should be marked as stopped before
262         // calling .cancel() on the future returned from the startWork(). So the listeners of this
263         // future could check what was the stop reason via `getStopReason()`, including listeners
264         // that were added with the direct executor.
265         // worker.stop() could be safely called multiple times, (only first one is effective),
266         // and we rely on this property.
267         // The completion listener below is for the cases when
268         // 1. getForegroundInfoAsync / startWork weren't called yet at all
269         // 2. when WorkerWrapper received stop signal when getForegroundInfoAsync() completed
270         //    and startWork() hasn't been called yet.
271         // 3. startWork's future was completed, but job was cancelled before we actually received
272         //    a notification about future's completion. (it is the natural race between stop signal
273         //    and future completion, that we can't avoid. In this case worker will be decided as
274         //    stopped and re-enqueued for another attempt)
275         job.invokeOnCompletion {
276             if (it is WorkerStoppedException) {
277                 worker.stop(it.reason)
278             }
279             if (isTracingEnabled && traceTag != null) {
280                 configuration.tracer.endAsyncSection(traceTag, workSpec.hashCode())
281             }
282         }
283 
284         // Try to set the work to the running state.  Note that this may fail because another thread
285         // may have modified the DB since we checked last at the top of this function.
286         if (!trySetRunning()) {
287             return ResetWorkerStatus()
288         }
289 
290         if (job.isCancelled) {
291             // doesn't matter job is cancelled anyway
292             return ResetWorkerStatus()
293         }
294 
295         val foregroundUpdater = params.foregroundUpdater
296         val mainDispatcher = workTaskExecutor.getMainThreadExecutor().asCoroutineDispatcher()
297         try {
298             val result =
299                 withContext(mainDispatcher) {
300                     workForeground(
301                         appContext,
302                         workSpec,
303                         worker,
304                         foregroundUpdater,
305                         workTaskExecutor
306                     )
307                     logd(TAG) { "Starting work for ${workSpec.workerClassName}" }
308                     // *important* we can't pass future around suspension points
309                     // because we will lose cancellation, so we have to await
310                     // right here on the main thread.
311                     worker.startWork().awaitWithin(worker)
312                 }
313             return Resolution.Finished(result)
314         } catch (cancellation: CancellationException) {
315             logi(TAG, cancellation) { "$workDescription was cancelled" }
316             throw cancellation
317         } catch (throwable: Throwable) {
318             loge(TAG, throwable) { "$workDescription failed because it threw an exception/error" }
319             configuration.workerExecutionExceptionHandler?.safeAccept(
320                 WorkerExceptionInfo(workSpec.workerClassName, params, throwable),
321                 TAG
322             )
323             return Resolution.Failed()
324         }
325     }
326 
327     private fun onWorkFinished(result: ListenableWorker.Result): Boolean {
328         val state = workSpecDao.getState(workSpecId)
329         workDatabase.workProgressDao().delete(workSpecId)
330         return if (state == null) {
331             // state can be null here with a REPLACE on beginUniqueWork().
332             // Treat it as a failure, and rescheduleAndResolve() will
333             // turn into a no-op. We still need to notify potential observers
334             // holding on to wake locks on our behalf.
335             false
336         } else if (state === WorkInfo.State.RUNNING) {
337             handleResult(result)
338         } else if (!state.isFinished) {
339             // counting this is stopped with unknown reason
340             reschedule(WorkInfo.STOP_REASON_UNKNOWN)
341         } else {
342             false
343         }
344     }
345 
346     @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
347     fun interrupt(stopReason: Int) {
348         workerJob.cancel(WorkerStoppedException(stopReason))
349     }
350 
351     private fun resetWorkerStatus(stopReason: Int): Boolean {
352         return if (workSpec.backOffOnSystemInterruptions == true) {
353             logd(TAG) { "Worker ${workSpec.workerClassName} was interrupted. Backing off." }
354             // Treat it as a reschedule. This ensures that we update the last enqueued time
355             // which in turn ensures that we correctly calculate the next runtime for the
356             // given Worker.
357             reschedule(stopReason)
358             true
359         } else {
360             val state = workSpecDao.getState(workSpecId)
361             return if (state != null && !state.isFinished) {
362                 logd(TAG) {
363                     "Status for $workSpecId is $state; not doing any work and " +
364                         "rescheduling for later execution"
365                 }
366                 // Set state to ENQUEUED again.
367                 // Reset scheduled state so it's picked up by background schedulers again.
368                 // We want to preserve time when work was enqueued so just explicitly set enqueued
369                 // instead using markEnqueuedState. Similarly, don't change any override time.
370                 workSpecDao.setState(WorkInfo.State.ENQUEUED, workSpecId)
371                 workSpecDao.setStopReason(workSpecId, stopReason)
372                 workSpecDao.markWorkSpecScheduled(workSpecId, WorkSpec.SCHEDULE_NOT_REQUESTED_YET)
373                 true
374             } else {
375                 logd(TAG) { "Status for $workSpecId is $state ; not doing any work" }
376                 false
377             }
378         }
379     }
380 
381     private fun handleResult(result: ListenableWorker.Result?): Boolean {
382         return if (result is ListenableWorker.Result.Success) {
383             logi(TAG) { "Worker result SUCCESS for $workDescription" }
384             if (workSpec.isPeriodic) {
385                 resetPeriodic()
386             } else {
387                 setSucceeded(result)
388             }
389         } else if (result is ListenableWorker.Result.Retry) {
390             logi(TAG) { "Worker result RETRY for $workDescription" }
391             reschedule(WorkInfo.STOP_REASON_NOT_STOPPED)
392         } else {
393             logi(TAG) { "Worker result FAILURE for $workDescription" }
394             if (workSpec.isPeriodic) {
395                 resetPeriodic()
396             } else {
397                 // we have here either failure or null
398                 setFailed(result ?: Failure())
399             }
400         }
401     }
402 
403     private fun trySetRunning(): Boolean =
404         workDatabase.runInTransaction(
405             Callable {
406                 val currentState = workSpecDao.getState(workSpecId)
407                 if (currentState === WorkInfo.State.ENQUEUED) {
408                     workSpecDao.setState(WorkInfo.State.RUNNING, workSpecId)
409                     workSpecDao.incrementWorkSpecRunAttemptCount(workSpecId)
410                     workSpecDao.setStopReason(workSpecId, WorkInfo.STOP_REASON_NOT_STOPPED)
411                     true
412                 } else false
413             }
414         )
415 
416     @VisibleForTesting
417     fun setFailed(result: ListenableWorker.Result): Boolean {
418         iterativelyFailWorkAndDependents(workSpecId)
419         val failure = result as Failure
420         // Update Data as necessary.
421         val output = failure.outputData
422         workSpecDao.resetWorkSpecNextScheduleTimeOverride(
423             workSpecId,
424             workSpec.nextScheduleTimeOverrideGeneration
425         )
426         workSpecDao.setOutput(workSpecId, output)
427         return false
428     }
429 
430     private fun iterativelyFailWorkAndDependents(workSpecId: String) {
431         val idsToProcess = mutableListOf(workSpecId)
432         while (idsToProcess.isNotEmpty()) {
433             val id = idsToProcess.removeLastKt()
434             // Don't fail already cancelled work.
435             if (workSpecDao.getState(id) !== WorkInfo.State.CANCELLED) {
436                 workSpecDao.setState(WorkInfo.State.FAILED, id)
437             }
438             idsToProcess.addAll(dependencyDao.getDependentWorkIds(id))
439         }
440     }
441 
442     private fun reschedule(stopReason: Int): Boolean {
443         workSpecDao.setState(WorkInfo.State.ENQUEUED, workSpecId)
444         workSpecDao.setLastEnqueueTime(workSpecId, clock.currentTimeMillis())
445         workSpecDao.resetWorkSpecNextScheduleTimeOverride(
446             workSpecId,
447             workSpec.nextScheduleTimeOverrideGeneration
448         )
449         workSpecDao.markWorkSpecScheduled(workSpecId, WorkSpec.SCHEDULE_NOT_REQUESTED_YET)
450         workSpecDao.setStopReason(workSpecId, stopReason)
451         return true
452     }
453 
454     private fun resetPeriodic(): Boolean {
455         // The system clock may have been changed such that the lastEnqueueTime was in the past.
456         // Therefore we always use the current time to determine the next run time of a Worker.
457         // This way, the Schedulers will correctly schedule the next instance of the
458         // PeriodicWork in the future. This happens in calculateNextRunTime() in WorkSpec.
459         workSpecDao.setLastEnqueueTime(workSpecId, clock.currentTimeMillis())
460         workSpecDao.setState(WorkInfo.State.ENQUEUED, workSpecId)
461         workSpecDao.resetWorkSpecRunAttemptCount(workSpecId)
462         workSpecDao.resetWorkSpecNextScheduleTimeOverride(
463             workSpecId,
464             workSpec.nextScheduleTimeOverrideGeneration
465         )
466         workSpecDao.incrementPeriodCount(workSpecId)
467         workSpecDao.markWorkSpecScheduled(workSpecId, WorkSpec.SCHEDULE_NOT_REQUESTED_YET)
468         return false
469     }
470 
471     private fun setSucceeded(result: ListenableWorker.Result): Boolean {
472         workSpecDao.setState(WorkInfo.State.SUCCEEDED, workSpecId)
473         val success = result as ListenableWorker.Result.Success
474         // Update Data as necessary.
475         val output = success.outputData
476         workSpecDao.setOutput(workSpecId, output)
477 
478         // Unblock Dependencies and set Period Start Time
479         val currentTimeMillis = clock.currentTimeMillis()
480         val dependentWorkIds = dependencyDao.getDependentWorkIds(workSpecId)
481         for (dependentWorkId in dependentWorkIds) {
482             if (
483                 workSpecDao.getState(dependentWorkId) === WorkInfo.State.BLOCKED &&
484                     dependencyDao.hasCompletedAllPrerequisites(dependentWorkId)
485             ) {
486                 logi(TAG) { "Setting status to enqueued for $dependentWorkId" }
487                 workSpecDao.setState(WorkInfo.State.ENQUEUED, dependentWorkId)
488                 workSpecDao.setLastEnqueueTime(dependentWorkId, currentTimeMillis)
489             }
490         }
491         return false
492     }
493 
494     private fun createWorkDescription(tags: List<String>) =
495         "Work [ id=$workSpecId, tags={ ${tags.joinToString(",")} } ]"
496 
497     /** Builder class for [WorkerWrapper] */
498     @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
499     class Builder
500     @SuppressLint("LambdaLast")
501     constructor(
502         context: Context,
503         val configuration: Configuration,
504         val workTaskExecutor: TaskExecutor,
505         val foregroundProcessor: ForegroundProcessor,
506         val workDatabase: WorkDatabase,
507         val workSpec: WorkSpec,
508         val tags: List<String>
509     ) {
510         val appContext: Context = context.applicationContext
511         var worker: ListenableWorker? = null
512         var runtimeExtras = WorkerParameters.RuntimeExtras()
513 
514         /**
515          * @param runtimeExtras The [WorkerParameters.RuntimeExtras] for the worker; if this is
516          *   `null`, it will be ignored and the default value will be retained.
517          * @return The instance of [Builder] for chaining.
518          */
519         fun withRuntimeExtras(runtimeExtras: WorkerParameters.RuntimeExtras?): Builder {
520             if (runtimeExtras != null) {
521                 this.runtimeExtras = runtimeExtras
522             }
523             return this
524         }
525 
526         /**
527          * @param worker The instance of [ListenableWorker] to be executed by [WorkerWrapper].
528          *   Useful in the context of testing.
529          * @return The instance of [Builder] for chaining.
530          */
531         @VisibleForTesting
532         fun withWorker(worker: ListenableWorker): Builder {
533             this.worker = worker
534             return this
535         }
536 
537         /** @return The instance of [WorkerWrapper]. */
538         fun build(): WorkerWrapper {
539             return WorkerWrapper(this)
540         }
541     }
542 }
543 
544 private val TAG = Logger.tagWithPrefix("WorkerWrapper")
545 
546 // copy of await() function but with specific cancellation propagation.
547 // it is needed that we specifically want to call .stop() on worker itself before
548 // calling cancel() of the future.
549 @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
awaitWithinnull550 suspend fun <T> ListenableFuture<T>.awaitWithin(worker: ListenableWorker): T {
551     try {
552         if (isDone) return getUninterruptibly(this)
553     } catch (e: ExecutionException) {
554         // ExecutionException is the only kind of exception that can be thrown from a gotten
555         // Future, other than CancellationException. Cancellation is propagated upward so that
556         // the coroutine running this suspend function may process it.
557         // Any other Exception showing up here indicates a very fundamental bug in a
558         // Future implementation.
559         throw e.nonNullCause()
560     }
561 
562     return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
563         addListener(ToContinuation(this, cont), DirectExecutor.INSTANCE)
564         cont.invokeOnCancellation {
565             if (it is WorkerStoppedException) {
566                 worker.stop(it.reason)
567             }
568             cancel(false)
569         }
570     }
571 }
572 
573 @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
574 class WorkerStoppedException(val reason: Int) : CancellationException()
575 
576 private class ToContinuation<T>(
577     val futureToObserve: ListenableFuture<T>,
578     val continuation: CancellableContinuation<T>
579 ) : Runnable {
runnull580     override fun run() {
581         if (futureToObserve.isCancelled) {
582             continuation.cancel()
583         } else {
584             try {
585                 continuation.resumeWith(Result.success(getUninterruptibly(futureToObserve)))
586             } catch (e: ExecutionException) {
587                 // ExecutionException is the only kind of exception that can be thrown from a gotten
588                 // Future. Anything else showing up here indicates a very fundamental bug in a
589                 // Future implementation.
590                 continuation.resumeWithException(e.nonNullCause())
591             }
592         }
593     }
594 }
595 
getUninterruptiblynull596 private fun <V> getUninterruptibly(future: Future<V>): V {
597     var interrupted = false
598     try {
599         while (true) {
600             try {
601                 return future.get()
602             } catch (e: InterruptedException) {
603                 interrupted = true
604             }
605         }
606     } finally {
607         if (interrupted) {
608             Thread.currentThread().interrupt()
609         }
610     }
611 }
612 
ExecutionExceptionnull613 private fun ExecutionException.nonNullCause(): Throwable {
614     return this.cause!!
615 }
616