1 /* 2 * Copyright (C) 2022 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.adservices.service.customaudience; 18 19 import static android.adservices.common.AdServicesStatusUtils.STATUS_SUCCESS; 20 21 import static com.android.adservices.service.stats.AdServicesLoggerUtil.FIELD_UNSET; 22 23 import android.annotation.NonNull; 24 import android.content.Context; 25 import android.content.Intent; 26 27 import com.android.adservices.LoggerFactory; 28 import com.android.adservices.concurrency.AdServicesExecutors; 29 import com.android.adservices.data.adselection.AppInstallDao; 30 import com.android.adservices.data.adselection.SharedStorageDatabase; 31 import com.android.adservices.data.customaudience.CustomAudienceDao; 32 import com.android.adservices.data.customaudience.CustomAudienceDatabase; 33 import com.android.adservices.data.customaudience.DBCustomAudienceBackgroundFetchData; 34 import com.android.adservices.data.enrollment.EnrollmentDao; 35 import com.android.adservices.service.DebugFlags; 36 import com.android.adservices.service.Flags; 37 import com.android.adservices.service.FlagsFactory; 38 import com.android.adservices.service.common.SingletonRunner; 39 import com.android.adservices.service.stats.AdServicesLoggerUtil; 40 import com.android.adservices.service.stats.BackgroundFetchExecutionLogger; 41 import com.android.adservices.service.stats.CustomAudienceLoggerFactory; 42 import com.android.adservices.shared.common.ApplicationContextSingleton; 43 import com.android.internal.annotations.VisibleForTesting; 44 45 import com.google.common.collect.ImmutableList; 46 import com.google.common.collect.Lists; 47 import com.google.common.util.concurrent.ExecutionSequencer; 48 import com.google.common.util.concurrent.FluentFuture; 49 import com.google.common.util.concurrent.FutureCallback; 50 import com.google.common.util.concurrent.Futures; 51 import com.google.common.util.concurrent.ListenableFuture; 52 import com.google.errorprone.annotations.concurrent.GuardedBy; 53 54 import java.time.Clock; 55 import java.time.Instant; 56 import java.util.ArrayList; 57 import java.util.List; 58 import java.util.Objects; 59 import java.util.concurrent.TimeUnit; 60 import java.util.function.Supplier; 61 62 /** Worker instance for updating custom audiences in the background. */ 63 public final class BackgroundFetchWorker { 64 private static final LoggerFactory.Logger sLogger = LoggerFactory.getFledgeLogger(); 65 public static final String JOB_DESCRIPTION = "FLEDGE background fetch"; 66 private static final Object SINGLETON_LOCK = new Object(); 67 private static final String ACTION_BACKGROUND_FETCH_JOB_FINISHED = 68 "ACTION_BACKGROUND_FETCH_JOB_FINISHED"; 69 70 @GuardedBy("SINGLETON_LOCK") 71 private static volatile BackgroundFetchWorker sBackgroundFetchWorker; 72 73 private final CustomAudienceDao mCustomAudienceDao; 74 private final Flags mFlags; 75 private final BackgroundFetchRunner mBackgroundFetchRunner; 76 private final Clock mClock; 77 private final CustomAudienceLoggerFactory mCustomAudienceLoggerFactory; 78 private final SingletonRunner<Void> mSingletonRunner = 79 new SingletonRunner<>(JOB_DESCRIPTION, this::doRun); 80 81 @VisibleForTesting BackgroundFetchWorker( @onNull CustomAudienceDao customAudienceDao, @NonNull Flags flags, @NonNull BackgroundFetchRunner backgroundFetchRunner, @NonNull Clock clock, @NonNull CustomAudienceLoggerFactory customAudienceLoggerFactory)82 protected BackgroundFetchWorker( 83 @NonNull CustomAudienceDao customAudienceDao, 84 @NonNull Flags flags, 85 @NonNull BackgroundFetchRunner backgroundFetchRunner, 86 @NonNull Clock clock, 87 @NonNull CustomAudienceLoggerFactory customAudienceLoggerFactory) { 88 Objects.requireNonNull(customAudienceDao); 89 Objects.requireNonNull(flags); 90 Objects.requireNonNull(backgroundFetchRunner); 91 Objects.requireNonNull(clock); 92 Objects.requireNonNull(customAudienceLoggerFactory); 93 mCustomAudienceDao = customAudienceDao; 94 mFlags = flags; 95 mBackgroundFetchRunner = backgroundFetchRunner; 96 mClock = clock; 97 mCustomAudienceLoggerFactory = customAudienceLoggerFactory; 98 } 99 100 /** 101 * Gets an instance of a {@link BackgroundFetchWorker}. 102 * 103 * <p>If an instance hasn't been initialized, a new singleton will be created and returned. 104 */ getInstance()105 public static BackgroundFetchWorker getInstance() { 106 if (sBackgroundFetchWorker == null) { 107 synchronized (SINGLETON_LOCK) { 108 if (sBackgroundFetchWorker == null) { 109 Context context = ApplicationContextSingleton.get(); 110 CustomAudienceDao customAudienceDao = 111 CustomAudienceDatabase.getInstance().customAudienceDao(); 112 AppInstallDao appInstallDao = 113 SharedStorageDatabase.getInstance().appInstallDao(); 114 CustomAudienceLoggerFactory customAudienceLoggerFactory = 115 CustomAudienceLoggerFactory.getInstance(); 116 Flags flags = FlagsFactory.getFlags(); 117 sBackgroundFetchWorker = 118 new BackgroundFetchWorker( 119 customAudienceDao, 120 flags, 121 new BackgroundFetchRunner( 122 customAudienceDao, 123 appInstallDao, 124 context.getPackageManager(), 125 EnrollmentDao.getInstance(), 126 flags, 127 customAudienceLoggerFactory), 128 Clock.systemUTC(), 129 customAudienceLoggerFactory); 130 } 131 } 132 } 133 134 return sBackgroundFetchWorker; 135 } 136 137 /** 138 * Runs the background fetch job for FLEDGE, including garbage collection and updating custom 139 * audiences. 140 * 141 * @return A future to be used to check when the task has completed. 142 */ runBackgroundFetch()143 public FluentFuture<Void> runBackgroundFetch() { 144 sLogger.d("Starting %s", JOB_DESCRIPTION); 145 return mSingletonRunner.runSingleInstance(); 146 } 147 148 /** Requests that any ongoing work be stopped gracefully and waits for work to be stopped. */ stopWork()149 public void stopWork() { 150 mSingletonRunner.stopWork(); 151 } 152 doRun(@onNull Supplier<Boolean> shouldStop)153 private FluentFuture<Void> doRun(@NonNull Supplier<Boolean> shouldStop) { 154 Instant jobStartTime = mClock.instant(); 155 BackgroundFetchExecutionLogger backgroundFetchExecutionLogger = 156 mCustomAudienceLoggerFactory.getBackgroundFetchExecutionLogger(); 157 FluentFuture<Void> run = 158 cleanupFledgeData(jobStartTime, backgroundFetchExecutionLogger) 159 .transform( 160 ignored -> getFetchDataList(shouldStop, jobStartTime), 161 AdServicesExecutors.getBackgroundExecutor()) 162 .transformAsync( 163 fetchDataList -> 164 updateData( 165 fetchDataList, 166 shouldStop, 167 jobStartTime, 168 backgroundFetchExecutionLogger), 169 AdServicesExecutors.getBackgroundExecutor()) 170 .withTimeout( 171 mFlags.getFledgeBackgroundFetchJobMaxRuntimeMs(), 172 TimeUnit.MILLISECONDS, 173 AdServicesExecutors.getScheduler()); 174 175 run.addCallback( 176 getCloseBackgroundFetchExecutionLoggerCallback(backgroundFetchExecutionLogger), 177 AdServicesExecutors.getBackgroundExecutor()); 178 179 return run; 180 } 181 sendBroadcastIntentIfEnabled()182 private void sendBroadcastIntentIfEnabled() { 183 if (DebugFlags.getInstance().getFledgeBackgroundFetchCompleteBroadcastEnabled()) { 184 Context context = ApplicationContextSingleton.get(); 185 sLogger.d( 186 "Sending a broadcast intent with intent action: %s", 187 ACTION_BACKGROUND_FETCH_JOB_FINISHED); 188 context.sendBroadcast(new Intent(ACTION_BACKGROUND_FETCH_JOB_FINISHED)); 189 } 190 } 191 updateData( @onNull List<DBCustomAudienceBackgroundFetchData> fetchDataList, @NonNull Supplier<Boolean> shouldStop, @NonNull Instant jobStartTime, @NonNull BackgroundFetchExecutionLogger backgroundFetchExecutionLogger)192 private ListenableFuture<Void> updateData( 193 @NonNull List<DBCustomAudienceBackgroundFetchData> fetchDataList, 194 @NonNull Supplier<Boolean> shouldStop, 195 @NonNull Instant jobStartTime, 196 @NonNull BackgroundFetchExecutionLogger backgroundFetchExecutionLogger) { 197 if (fetchDataList.isEmpty()) { 198 sLogger.d("No custom audiences found to update"); 199 backgroundFetchExecutionLogger.setNumOfEligibleToUpdateCAs(0); 200 return FluentFuture.from(Futures.immediateVoidFuture()); 201 } 202 203 sLogger.d("Updating %d custom audiences", fetchDataList.size()); 204 backgroundFetchExecutionLogger.setNumOfEligibleToUpdateCAs(fetchDataList.size()); 205 // Divide the gathered CAs among worker threads 206 int numWorkers = 207 Math.min( 208 Math.max(1, Runtime.getRuntime().availableProcessors() - 2), 209 mFlags.getFledgeBackgroundFetchThreadPoolSize()); 210 int numCustomAudiencesPerWorker = 211 (fetchDataList.size() / numWorkers) 212 + (((fetchDataList.size() % numWorkers) == 0) ? 0 : 1); 213 214 List<ListenableFuture<?>> subListFutureUpdates = new ArrayList<>(); 215 for (final List<DBCustomAudienceBackgroundFetchData> fetchDataSubList : 216 Lists.partition(fetchDataList, numCustomAudiencesPerWorker)) { 217 if (shouldStop.get()) { 218 break; 219 } 220 // Updates in each batch are sequenced 221 ExecutionSequencer sequencer = ExecutionSequencer.create(); 222 for (DBCustomAudienceBackgroundFetchData fetchData : fetchDataSubList) { 223 subListFutureUpdates.add( 224 sequencer.submitAsync( 225 () -> 226 mBackgroundFetchRunner.updateCustomAudience( 227 jobStartTime, fetchData), 228 AdServicesExecutors.getBackgroundExecutor())); 229 } 230 } 231 232 return FluentFuture.from(Futures.allAsList(subListFutureUpdates)) 233 .transform(ignored -> null, AdServicesExecutors.getLightWeightExecutor()); 234 } 235 getFetchDataList( @onNull Supplier<Boolean> shouldStop, @NonNull Instant jobStartTime)236 private List<DBCustomAudienceBackgroundFetchData> getFetchDataList( 237 @NonNull Supplier<Boolean> shouldStop, @NonNull Instant jobStartTime) { 238 if (shouldStop.get()) { 239 sLogger.d("Stopping " + JOB_DESCRIPTION); 240 return ImmutableList.of(); 241 } 242 243 // Fetch stale/eligible/delinquent custom audiences 244 return mCustomAudienceDao.getActiveEligibleCustomAudienceBackgroundFetchData( 245 jobStartTime, mFlags.getFledgeBackgroundFetchMaxNumUpdated()); 246 } 247 cleanupFledgeData( Instant jobStartTime, BackgroundFetchExecutionLogger backgroundFetchExecutionLogger)248 private FluentFuture<?> cleanupFledgeData( 249 Instant jobStartTime, BackgroundFetchExecutionLogger backgroundFetchExecutionLogger) { 250 return FluentFuture.from( 251 AdServicesExecutors.getBackgroundExecutor() 252 .submit( 253 () -> { 254 // Start background fetch execution logger. 255 backgroundFetchExecutionLogger.start(); 256 backgroundFetchExecutionLogger.setNumOfEligibleToUpdateCAs( 257 FIELD_UNSET); 258 // Clean up custom audiences first so the actual fetch won't do 259 // unnecessary work 260 mBackgroundFetchRunner.deleteExpiredCustomAudiences( 261 jobStartTime); 262 mBackgroundFetchRunner.deleteDisallowedOwnerCustomAudiences(); 263 mBackgroundFetchRunner.deleteDisallowedBuyerCustomAudiences(); 264 if (mFlags.getFledgeAppInstallFilteringEnabled()) { 265 mBackgroundFetchRunner 266 .deleteDisallowedPackageAppInstallEntries(); 267 } 268 })); 269 } 270 getCloseBackgroundFetchExecutionLoggerCallback( BackgroundFetchExecutionLogger backgroundFetchExecutionLogger)271 private FutureCallback<Void> getCloseBackgroundFetchExecutionLoggerCallback( 272 BackgroundFetchExecutionLogger backgroundFetchExecutionLogger) { 273 return new FutureCallback<>() { 274 @Override 275 public void onSuccess(Void result) { 276 closeBackgroundFetchExecutionLogger( 277 backgroundFetchExecutionLogger, 278 backgroundFetchExecutionLogger.getNumOfEligibleToUpdateCAs(), 279 STATUS_SUCCESS); 280 sendBroadcastIntentIfEnabled(); 281 } 282 283 @Override 284 public void onFailure(Throwable t) { 285 sLogger.d(t, "Error in Custom Audience Background Fetch"); 286 int resultCode = AdServicesLoggerUtil.getResultCodeFromException(t); 287 closeBackgroundFetchExecutionLogger( 288 backgroundFetchExecutionLogger, 289 backgroundFetchExecutionLogger.getNumOfEligibleToUpdateCAs(), 290 resultCode); 291 sendBroadcastIntentIfEnabled(); 292 } 293 }; 294 } 295 296 private void closeBackgroundFetchExecutionLogger( 297 BackgroundFetchExecutionLogger backgroundFetchExecutionLogger, 298 int numOfEligibleToUpdateCAs, 299 int resultCode) { 300 try { 301 backgroundFetchExecutionLogger.close(numOfEligibleToUpdateCAs, resultCode); 302 } catch (Exception e) { 303 sLogger.d( 304 "Error when closing backgroundFetchExecutionLogger, " 305 + "skipping metrics logging: %s", 306 e.getMessage()); 307 } 308 } 309 } 310