• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.adservices.service.customaudience;
18 
19 import static android.adservices.common.AdServicesStatusUtils.STATUS_SUCCESS;
20 
21 import static com.android.adservices.service.stats.AdServicesLoggerUtil.FIELD_UNSET;
22 
23 import android.annotation.NonNull;
24 import android.content.Context;
25 import android.content.Intent;
26 
27 import com.android.adservices.LoggerFactory;
28 import com.android.adservices.concurrency.AdServicesExecutors;
29 import com.android.adservices.data.adselection.AppInstallDao;
30 import com.android.adservices.data.adselection.SharedStorageDatabase;
31 import com.android.adservices.data.customaudience.CustomAudienceDao;
32 import com.android.adservices.data.customaudience.CustomAudienceDatabase;
33 import com.android.adservices.data.customaudience.DBCustomAudienceBackgroundFetchData;
34 import com.android.adservices.data.enrollment.EnrollmentDao;
35 import com.android.adservices.service.DebugFlags;
36 import com.android.adservices.service.Flags;
37 import com.android.adservices.service.FlagsFactory;
38 import com.android.adservices.service.common.SingletonRunner;
39 import com.android.adservices.service.stats.AdServicesLoggerUtil;
40 import com.android.adservices.service.stats.BackgroundFetchExecutionLogger;
41 import com.android.adservices.service.stats.CustomAudienceLoggerFactory;
42 import com.android.adservices.shared.common.ApplicationContextSingleton;
43 import com.android.internal.annotations.VisibleForTesting;
44 
45 import com.google.common.collect.ImmutableList;
46 import com.google.common.collect.Lists;
47 import com.google.common.util.concurrent.ExecutionSequencer;
48 import com.google.common.util.concurrent.FluentFuture;
49 import com.google.common.util.concurrent.FutureCallback;
50 import com.google.common.util.concurrent.Futures;
51 import com.google.common.util.concurrent.ListenableFuture;
52 import com.google.errorprone.annotations.concurrent.GuardedBy;
53 
54 import java.time.Clock;
55 import java.time.Instant;
56 import java.util.ArrayList;
57 import java.util.List;
58 import java.util.Objects;
59 import java.util.concurrent.TimeUnit;
60 import java.util.function.Supplier;
61 
62 /** Worker instance for updating custom audiences in the background. */
63 public final class BackgroundFetchWorker {
64     private static final LoggerFactory.Logger sLogger = LoggerFactory.getFledgeLogger();
65     public static final String JOB_DESCRIPTION = "FLEDGE background fetch";
66     private static final Object SINGLETON_LOCK = new Object();
67     private static final String ACTION_BACKGROUND_FETCH_JOB_FINISHED =
68             "ACTION_BACKGROUND_FETCH_JOB_FINISHED";
69 
70     @GuardedBy("SINGLETON_LOCK")
71     private static volatile BackgroundFetchWorker sBackgroundFetchWorker;
72 
73     private final CustomAudienceDao mCustomAudienceDao;
74     private final Flags mFlags;
75     private final BackgroundFetchRunner mBackgroundFetchRunner;
76     private final Clock mClock;
77     private final CustomAudienceLoggerFactory mCustomAudienceLoggerFactory;
78     private final SingletonRunner<Void> mSingletonRunner =
79             new SingletonRunner<>(JOB_DESCRIPTION, this::doRun);
80 
81     @VisibleForTesting
BackgroundFetchWorker( @onNull CustomAudienceDao customAudienceDao, @NonNull Flags flags, @NonNull BackgroundFetchRunner backgroundFetchRunner, @NonNull Clock clock, @NonNull CustomAudienceLoggerFactory customAudienceLoggerFactory)82     protected BackgroundFetchWorker(
83             @NonNull CustomAudienceDao customAudienceDao,
84             @NonNull Flags flags,
85             @NonNull BackgroundFetchRunner backgroundFetchRunner,
86             @NonNull Clock clock,
87             @NonNull CustomAudienceLoggerFactory customAudienceLoggerFactory) {
88         Objects.requireNonNull(customAudienceDao);
89         Objects.requireNonNull(flags);
90         Objects.requireNonNull(backgroundFetchRunner);
91         Objects.requireNonNull(clock);
92         Objects.requireNonNull(customAudienceLoggerFactory);
93         mCustomAudienceDao = customAudienceDao;
94         mFlags = flags;
95         mBackgroundFetchRunner = backgroundFetchRunner;
96         mClock = clock;
97         mCustomAudienceLoggerFactory = customAudienceLoggerFactory;
98     }
99 
100     /**
101      * Gets an instance of a {@link BackgroundFetchWorker}.
102      *
103      * <p>If an instance hasn't been initialized, a new singleton will be created and returned.
104      */
getInstance()105     public static BackgroundFetchWorker getInstance() {
106         if (sBackgroundFetchWorker == null) {
107             synchronized (SINGLETON_LOCK) {
108                 if (sBackgroundFetchWorker == null) {
109                     Context context = ApplicationContextSingleton.get();
110                     CustomAudienceDao customAudienceDao =
111                             CustomAudienceDatabase.getInstance().customAudienceDao();
112                     AppInstallDao appInstallDao =
113                             SharedStorageDatabase.getInstance().appInstallDao();
114                     CustomAudienceLoggerFactory customAudienceLoggerFactory =
115                             CustomAudienceLoggerFactory.getInstance();
116                     Flags flags = FlagsFactory.getFlags();
117                     sBackgroundFetchWorker =
118                             new BackgroundFetchWorker(
119                                     customAudienceDao,
120                                     flags,
121                                     new BackgroundFetchRunner(
122                                             customAudienceDao,
123                                             appInstallDao,
124                                             context.getPackageManager(),
125                                             EnrollmentDao.getInstance(),
126                                             flags,
127                                             customAudienceLoggerFactory),
128                                     Clock.systemUTC(),
129                                     customAudienceLoggerFactory);
130                 }
131             }
132         }
133 
134         return sBackgroundFetchWorker;
135     }
136 
137     /**
138      * Runs the background fetch job for FLEDGE, including garbage collection and updating custom
139      * audiences.
140      *
141      * @return A future to be used to check when the task has completed.
142      */
runBackgroundFetch()143     public FluentFuture<Void> runBackgroundFetch() {
144         sLogger.d("Starting %s", JOB_DESCRIPTION);
145         return mSingletonRunner.runSingleInstance();
146     }
147 
148     /** Requests that any ongoing work be stopped gracefully and waits for work to be stopped. */
stopWork()149     public void stopWork() {
150         mSingletonRunner.stopWork();
151     }
152 
doRun(@onNull Supplier<Boolean> shouldStop)153     private FluentFuture<Void> doRun(@NonNull Supplier<Boolean> shouldStop) {
154         Instant jobStartTime = mClock.instant();
155         BackgroundFetchExecutionLogger backgroundFetchExecutionLogger =
156                 mCustomAudienceLoggerFactory.getBackgroundFetchExecutionLogger();
157         FluentFuture<Void> run =
158                 cleanupFledgeData(jobStartTime, backgroundFetchExecutionLogger)
159                         .transform(
160                                 ignored -> getFetchDataList(shouldStop, jobStartTime),
161                                 AdServicesExecutors.getBackgroundExecutor())
162                         .transformAsync(
163                                 fetchDataList ->
164                                         updateData(
165                                                 fetchDataList,
166                                                 shouldStop,
167                                                 jobStartTime,
168                                                 backgroundFetchExecutionLogger),
169                                 AdServicesExecutors.getBackgroundExecutor())
170                         .withTimeout(
171                                 mFlags.getFledgeBackgroundFetchJobMaxRuntimeMs(),
172                                 TimeUnit.MILLISECONDS,
173                                 AdServicesExecutors.getScheduler());
174 
175         run.addCallback(
176                 getCloseBackgroundFetchExecutionLoggerCallback(backgroundFetchExecutionLogger),
177                 AdServicesExecutors.getBackgroundExecutor());
178 
179         return run;
180     }
181 
sendBroadcastIntentIfEnabled()182     private void sendBroadcastIntentIfEnabled() {
183         if (DebugFlags.getInstance().getFledgeBackgroundFetchCompleteBroadcastEnabled()) {
184             Context context = ApplicationContextSingleton.get();
185             sLogger.d(
186                     "Sending a broadcast intent with intent action: %s",
187                     ACTION_BACKGROUND_FETCH_JOB_FINISHED);
188             context.sendBroadcast(new Intent(ACTION_BACKGROUND_FETCH_JOB_FINISHED));
189         }
190     }
191 
updateData( @onNull List<DBCustomAudienceBackgroundFetchData> fetchDataList, @NonNull Supplier<Boolean> shouldStop, @NonNull Instant jobStartTime, @NonNull BackgroundFetchExecutionLogger backgroundFetchExecutionLogger)192     private ListenableFuture<Void> updateData(
193             @NonNull List<DBCustomAudienceBackgroundFetchData> fetchDataList,
194             @NonNull Supplier<Boolean> shouldStop,
195             @NonNull Instant jobStartTime,
196             @NonNull BackgroundFetchExecutionLogger backgroundFetchExecutionLogger) {
197         if (fetchDataList.isEmpty()) {
198             sLogger.d("No custom audiences found to update");
199             backgroundFetchExecutionLogger.setNumOfEligibleToUpdateCAs(0);
200             return FluentFuture.from(Futures.immediateVoidFuture());
201         }
202 
203         sLogger.d("Updating %d custom audiences", fetchDataList.size());
204         backgroundFetchExecutionLogger.setNumOfEligibleToUpdateCAs(fetchDataList.size());
205         // Divide the gathered CAs among worker threads
206         int numWorkers =
207                 Math.min(
208                         Math.max(1, Runtime.getRuntime().availableProcessors() - 2),
209                         mFlags.getFledgeBackgroundFetchThreadPoolSize());
210         int numCustomAudiencesPerWorker =
211                 (fetchDataList.size() / numWorkers)
212                         + (((fetchDataList.size() % numWorkers) == 0) ? 0 : 1);
213 
214         List<ListenableFuture<?>> subListFutureUpdates = new ArrayList<>();
215         for (final List<DBCustomAudienceBackgroundFetchData> fetchDataSubList :
216                 Lists.partition(fetchDataList, numCustomAudiencesPerWorker)) {
217             if (shouldStop.get()) {
218                 break;
219             }
220             // Updates in each batch are sequenced
221             ExecutionSequencer sequencer = ExecutionSequencer.create();
222             for (DBCustomAudienceBackgroundFetchData fetchData : fetchDataSubList) {
223                 subListFutureUpdates.add(
224                         sequencer.submitAsync(
225                                 () ->
226                                         mBackgroundFetchRunner.updateCustomAudience(
227                                                 jobStartTime, fetchData),
228                                 AdServicesExecutors.getBackgroundExecutor()));
229             }
230         }
231 
232         return FluentFuture.from(Futures.allAsList(subListFutureUpdates))
233                 .transform(ignored -> null, AdServicesExecutors.getLightWeightExecutor());
234     }
235 
getFetchDataList( @onNull Supplier<Boolean> shouldStop, @NonNull Instant jobStartTime)236     private List<DBCustomAudienceBackgroundFetchData> getFetchDataList(
237             @NonNull Supplier<Boolean> shouldStop, @NonNull Instant jobStartTime) {
238         if (shouldStop.get()) {
239             sLogger.d("Stopping " + JOB_DESCRIPTION);
240             return ImmutableList.of();
241         }
242 
243         // Fetch stale/eligible/delinquent custom audiences
244         return mCustomAudienceDao.getActiveEligibleCustomAudienceBackgroundFetchData(
245                 jobStartTime, mFlags.getFledgeBackgroundFetchMaxNumUpdated());
246     }
247 
cleanupFledgeData( Instant jobStartTime, BackgroundFetchExecutionLogger backgroundFetchExecutionLogger)248     private FluentFuture<?> cleanupFledgeData(
249             Instant jobStartTime, BackgroundFetchExecutionLogger backgroundFetchExecutionLogger) {
250         return FluentFuture.from(
251                 AdServicesExecutors.getBackgroundExecutor()
252                         .submit(
253                                 () -> {
254                                     // Start background fetch execution logger.
255                                     backgroundFetchExecutionLogger.start();
256                                     backgroundFetchExecutionLogger.setNumOfEligibleToUpdateCAs(
257                                             FIELD_UNSET);
258                                     // Clean up custom audiences first so the actual fetch won't do
259                                     // unnecessary work
260                                     mBackgroundFetchRunner.deleteExpiredCustomAudiences(
261                                             jobStartTime);
262                                     mBackgroundFetchRunner.deleteDisallowedOwnerCustomAudiences();
263                                     mBackgroundFetchRunner.deleteDisallowedBuyerCustomAudiences();
264                                     if (mFlags.getFledgeAppInstallFilteringEnabled()) {
265                                         mBackgroundFetchRunner
266                                                 .deleteDisallowedPackageAppInstallEntries();
267                                     }
268                                 }));
269     }
270 
getCloseBackgroundFetchExecutionLoggerCallback( BackgroundFetchExecutionLogger backgroundFetchExecutionLogger)271     private FutureCallback<Void> getCloseBackgroundFetchExecutionLoggerCallback(
272             BackgroundFetchExecutionLogger backgroundFetchExecutionLogger) {
273         return new FutureCallback<>() {
274             @Override
275             public void onSuccess(Void result) {
276                 closeBackgroundFetchExecutionLogger(
277                         backgroundFetchExecutionLogger,
278                         backgroundFetchExecutionLogger.getNumOfEligibleToUpdateCAs(),
279                         STATUS_SUCCESS);
280                 sendBroadcastIntentIfEnabled();
281             }
282 
283             @Override
284             public void onFailure(Throwable t) {
285                 sLogger.d(t, "Error in Custom Audience Background Fetch");
286                 int resultCode = AdServicesLoggerUtil.getResultCodeFromException(t);
287                 closeBackgroundFetchExecutionLogger(
288                         backgroundFetchExecutionLogger,
289                         backgroundFetchExecutionLogger.getNumOfEligibleToUpdateCAs(),
290                         resultCode);
291                 sendBroadcastIntentIfEnabled();
292             }
293         };
294     }
295 
296     private void closeBackgroundFetchExecutionLogger(
297             BackgroundFetchExecutionLogger backgroundFetchExecutionLogger,
298             int numOfEligibleToUpdateCAs,
299             int resultCode) {
300         try {
301             backgroundFetchExecutionLogger.close(numOfEligibleToUpdateCAs, resultCode);
302         } catch (Exception e) {
303             sLogger.d(
304                     "Error when closing backgroundFetchExecutionLogger, "
305                             + "skipping metrics logging: %s",
306                     e.getMessage());
307         }
308     }
309 }
310