• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.adservices.service.customaudience;
18 
19 import android.annotation.NonNull;
20 import android.content.Context;
21 
22 import com.android.adservices.LoggerFactory;
23 import com.android.adservices.concurrency.AdServicesExecutors;
24 import com.android.adservices.data.adselection.AppInstallDao;
25 import com.android.adservices.data.adselection.SharedStorageDatabase;
26 import com.android.adservices.data.customaudience.CustomAudienceDao;
27 import com.android.adservices.data.customaudience.CustomAudienceDatabase;
28 import com.android.adservices.data.customaudience.DBCustomAudienceBackgroundFetchData;
29 import com.android.adservices.data.enrollment.EnrollmentDao;
30 import com.android.adservices.service.Flags;
31 import com.android.adservices.service.FlagsFactory;
32 import com.android.adservices.service.common.SingletonRunner;
33 import com.android.internal.annotations.VisibleForTesting;
34 
35 import com.google.common.collect.ImmutableList;
36 import com.google.common.collect.Lists;
37 import com.google.common.util.concurrent.ExecutionSequencer;
38 import com.google.common.util.concurrent.FluentFuture;
39 import com.google.common.util.concurrent.Futures;
40 import com.google.common.util.concurrent.ListenableFuture;
41 
42 import java.time.Clock;
43 import java.time.Instant;
44 import java.util.ArrayList;
45 import java.util.List;
46 import java.util.Objects;
47 import java.util.concurrent.TimeUnit;
48 import java.util.function.Supplier;
49 
50 /** Worker instance for updating custom audiences in the background. */
51 public class BackgroundFetchWorker {
52     private static final LoggerFactory.Logger sLogger = LoggerFactory.getFledgeLogger();
53     public static final String JOB_DESCRIPTION = "FLEDGE background fetch";
54     private static final Object SINGLETON_LOCK = new Object();
55     private static volatile BackgroundFetchWorker sBackgroundFetchWorker;
56 
57     private final CustomAudienceDao mCustomAudienceDao;
58     private final Flags mFlags;
59     private final BackgroundFetchRunner mBackgroundFetchRunner;
60     private final Clock mClock;
61     private final SingletonRunner<Void> mSingletonRunner =
62             new SingletonRunner<>(JOB_DESCRIPTION, this::doRun);
63 
64     @VisibleForTesting
BackgroundFetchWorker( @onNull CustomAudienceDao customAudienceDao, @NonNull Flags flags, @NonNull BackgroundFetchRunner backgroundFetchRunner, @NonNull Clock clock)65     protected BackgroundFetchWorker(
66             @NonNull CustomAudienceDao customAudienceDao,
67             @NonNull Flags flags,
68             @NonNull BackgroundFetchRunner backgroundFetchRunner,
69             @NonNull Clock clock) {
70         Objects.requireNonNull(customAudienceDao);
71         Objects.requireNonNull(flags);
72         Objects.requireNonNull(backgroundFetchRunner);
73         Objects.requireNonNull(clock);
74         mCustomAudienceDao = customAudienceDao;
75         mFlags = flags;
76         mBackgroundFetchRunner = backgroundFetchRunner;
77         mClock = clock;
78     }
79 
80     /**
81      * Gets an instance of a {@link BackgroundFetchWorker}.
82      *
83      * <p>If an instance hasn't been initialized, a new singleton will be created and returned.
84      */
85     @NonNull
getInstance(@onNull Context context)86     public static BackgroundFetchWorker getInstance(@NonNull Context context) {
87         Objects.requireNonNull(context);
88 
89         if (sBackgroundFetchWorker == null) {
90             synchronized (SINGLETON_LOCK) {
91                 if (sBackgroundFetchWorker == null) {
92                     CustomAudienceDao customAudienceDao =
93                             CustomAudienceDatabase.getInstance(context).customAudienceDao();
94                     AppInstallDao appInstallDao =
95                             SharedStorageDatabase.getInstance(context).appInstallDao();
96                     Flags flags = FlagsFactory.getFlags();
97                     sBackgroundFetchWorker =
98                             new BackgroundFetchWorker(
99                                     customAudienceDao,
100                                     flags,
101                                     new BackgroundFetchRunner(
102                                             customAudienceDao,
103                                             appInstallDao,
104                                             context.getPackageManager(),
105                                             EnrollmentDao.getInstance(context),
106                                             flags),
107                                     Clock.systemUTC());
108                 }
109             }
110         }
111 
112         return sBackgroundFetchWorker;
113     }
114 
115     /**
116      * Runs the background fetch job for FLEDGE, including garbage collection and updating custom
117      * audiences.
118      *
119      * @return A future to be used to check when the task has completed.
120      */
runBackgroundFetch()121     public FluentFuture<Void> runBackgroundFetch() {
122         sLogger.d("Starting %s", JOB_DESCRIPTION);
123         return mSingletonRunner.runSingleInstance();
124     }
125 
126     /** Requests that any ongoing work be stopped gracefully and waits for work to be stopped. */
stopWork()127     public void stopWork() {
128         mSingletonRunner.stopWork();
129     }
130 
doRun(@onNull Supplier<Boolean> shouldStop)131     private FluentFuture<Void> doRun(@NonNull Supplier<Boolean> shouldStop) {
132         Instant jobStartTime = mClock.instant();
133         return cleanupFledgeData(jobStartTime)
134                 .transform(
135                         ignored -> getFetchDataList(shouldStop, jobStartTime),
136                         AdServicesExecutors.getBackgroundExecutor())
137                 .transformAsync(
138                         fetchDataList -> updateData(fetchDataList, shouldStop, jobStartTime),
139                         AdServicesExecutors.getBackgroundExecutor())
140                 .withTimeout(
141                         mFlags.getFledgeBackgroundFetchJobMaxRuntimeMs(),
142                         TimeUnit.MILLISECONDS,
143                         AdServicesExecutors.getScheduler());
144     }
145 
updateData( @onNull List<DBCustomAudienceBackgroundFetchData> fetchDataList, @NonNull Supplier<Boolean> shouldStop, @NonNull Instant jobStartTime)146     private ListenableFuture<Void> updateData(
147             @NonNull List<DBCustomAudienceBackgroundFetchData> fetchDataList,
148             @NonNull Supplier<Boolean> shouldStop,
149             @NonNull Instant jobStartTime) {
150         if (fetchDataList.isEmpty()) {
151             sLogger.d("No custom audiences found to update");
152             return FluentFuture.from(Futures.immediateVoidFuture());
153         }
154 
155         sLogger.d("Updating %d custom audiences", fetchDataList.size());
156         // Divide the gathered CAs among worker threads
157         int numWorkers =
158                 Math.min(
159                         Math.max(1, Runtime.getRuntime().availableProcessors() - 2),
160                         mFlags.getFledgeBackgroundFetchThreadPoolSize());
161         int numCustomAudiencesPerWorker =
162                 (fetchDataList.size() / numWorkers)
163                         + (((fetchDataList.size() % numWorkers) == 0) ? 0 : 1);
164 
165         List<ListenableFuture<?>> subListFutureUpdates = new ArrayList<>();
166         for (final List<DBCustomAudienceBackgroundFetchData> fetchDataSubList :
167                 Lists.partition(fetchDataList, numCustomAudiencesPerWorker)) {
168             if (shouldStop.get()) {
169                 break;
170             }
171             // Updates in each batch are sequenced
172             ExecutionSequencer sequencer = ExecutionSequencer.create();
173             for (DBCustomAudienceBackgroundFetchData fetchData : fetchDataSubList) {
174                 subListFutureUpdates.add(
175                         sequencer.submitAsync(
176                                 () ->
177                                         mBackgroundFetchRunner.updateCustomAudience(
178                                                 jobStartTime, fetchData),
179                                 AdServicesExecutors.getBackgroundExecutor()));
180             }
181         }
182 
183         return FluentFuture.from(Futures.allAsList(subListFutureUpdates))
184                 .transform(ignored -> null, AdServicesExecutors.getLightWeightExecutor());
185     }
186 
getFetchDataList( @onNull Supplier<Boolean> shouldStop, @NonNull Instant jobStartTime)187     private List<DBCustomAudienceBackgroundFetchData> getFetchDataList(
188             @NonNull Supplier<Boolean> shouldStop, @NonNull Instant jobStartTime) {
189         if (shouldStop.get()) {
190             sLogger.d("Stopping " + JOB_DESCRIPTION);
191             return ImmutableList.of();
192         }
193 
194         // Fetch stale/eligible/delinquent custom audiences
195         return mCustomAudienceDao.getActiveEligibleCustomAudienceBackgroundFetchData(
196                 jobStartTime, mFlags.getFledgeBackgroundFetchMaxNumUpdated());
197     }
198 
cleanupFledgeData(Instant jobStartTime)199     private FluentFuture<?> cleanupFledgeData(Instant jobStartTime) {
200         return FluentFuture.from(
201                 AdServicesExecutors.getBackgroundExecutor()
202                         .submit(
203                                 () -> {
204                                     // Clean up custom audiences first so the actual fetch won't do
205                                     // unnecessary work
206                                     mBackgroundFetchRunner.deleteExpiredCustomAudiences(
207                                             jobStartTime);
208                                     mBackgroundFetchRunner.deleteDisallowedOwnerCustomAudiences();
209                                     mBackgroundFetchRunner.deleteDisallowedBuyerCustomAudiences();
210                                     if (mFlags.getFledgeAdSelectionFilteringEnabled()) {
211                                         mBackgroundFetchRunner
212                                                 .deleteDisallowedPackageAppInstallEntries();
213                                     }
214                                 }));
215     }
216 }
217