1 /* 2 * Copyright (C) 2022 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.adservices.service.customaudience; 18 19 import android.annotation.NonNull; 20 import android.content.Context; 21 22 import com.android.adservices.LoggerFactory; 23 import com.android.adservices.concurrency.AdServicesExecutors; 24 import com.android.adservices.data.adselection.AppInstallDao; 25 import com.android.adservices.data.adselection.SharedStorageDatabase; 26 import com.android.adservices.data.customaudience.CustomAudienceDao; 27 import com.android.adservices.data.customaudience.CustomAudienceDatabase; 28 import com.android.adservices.data.customaudience.DBCustomAudienceBackgroundFetchData; 29 import com.android.adservices.data.enrollment.EnrollmentDao; 30 import com.android.adservices.service.Flags; 31 import com.android.adservices.service.FlagsFactory; 32 import com.android.adservices.service.common.SingletonRunner; 33 import com.android.internal.annotations.VisibleForTesting; 34 35 import com.google.common.collect.ImmutableList; 36 import com.google.common.collect.Lists; 37 import com.google.common.util.concurrent.ExecutionSequencer; 38 import com.google.common.util.concurrent.FluentFuture; 39 import com.google.common.util.concurrent.Futures; 40 import com.google.common.util.concurrent.ListenableFuture; 41 42 import java.time.Clock; 43 import java.time.Instant; 44 import java.util.ArrayList; 45 import java.util.List; 46 import java.util.Objects; 47 import java.util.concurrent.TimeUnit; 48 import java.util.function.Supplier; 49 50 /** Worker instance for updating custom audiences in the background. */ 51 public class BackgroundFetchWorker { 52 private static final LoggerFactory.Logger sLogger = LoggerFactory.getFledgeLogger(); 53 public static final String JOB_DESCRIPTION = "FLEDGE background fetch"; 54 private static final Object SINGLETON_LOCK = new Object(); 55 private static volatile BackgroundFetchWorker sBackgroundFetchWorker; 56 57 private final CustomAudienceDao mCustomAudienceDao; 58 private final Flags mFlags; 59 private final BackgroundFetchRunner mBackgroundFetchRunner; 60 private final Clock mClock; 61 private final SingletonRunner<Void> mSingletonRunner = 62 new SingletonRunner<>(JOB_DESCRIPTION, this::doRun); 63 64 @VisibleForTesting BackgroundFetchWorker( @onNull CustomAudienceDao customAudienceDao, @NonNull Flags flags, @NonNull BackgroundFetchRunner backgroundFetchRunner, @NonNull Clock clock)65 protected BackgroundFetchWorker( 66 @NonNull CustomAudienceDao customAudienceDao, 67 @NonNull Flags flags, 68 @NonNull BackgroundFetchRunner backgroundFetchRunner, 69 @NonNull Clock clock) { 70 Objects.requireNonNull(customAudienceDao); 71 Objects.requireNonNull(flags); 72 Objects.requireNonNull(backgroundFetchRunner); 73 Objects.requireNonNull(clock); 74 mCustomAudienceDao = customAudienceDao; 75 mFlags = flags; 76 mBackgroundFetchRunner = backgroundFetchRunner; 77 mClock = clock; 78 } 79 80 /** 81 * Gets an instance of a {@link BackgroundFetchWorker}. 82 * 83 * <p>If an instance hasn't been initialized, a new singleton will be created and returned. 84 */ 85 @NonNull getInstance(@onNull Context context)86 public static BackgroundFetchWorker getInstance(@NonNull Context context) { 87 Objects.requireNonNull(context); 88 89 if (sBackgroundFetchWorker == null) { 90 synchronized (SINGLETON_LOCK) { 91 if (sBackgroundFetchWorker == null) { 92 CustomAudienceDao customAudienceDao = 93 CustomAudienceDatabase.getInstance(context).customAudienceDao(); 94 AppInstallDao appInstallDao = 95 SharedStorageDatabase.getInstance(context).appInstallDao(); 96 Flags flags = FlagsFactory.getFlags(); 97 sBackgroundFetchWorker = 98 new BackgroundFetchWorker( 99 customAudienceDao, 100 flags, 101 new BackgroundFetchRunner( 102 customAudienceDao, 103 appInstallDao, 104 context.getPackageManager(), 105 EnrollmentDao.getInstance(context), 106 flags), 107 Clock.systemUTC()); 108 } 109 } 110 } 111 112 return sBackgroundFetchWorker; 113 } 114 115 /** 116 * Runs the background fetch job for FLEDGE, including garbage collection and updating custom 117 * audiences. 118 * 119 * @return A future to be used to check when the task has completed. 120 */ runBackgroundFetch()121 public FluentFuture<Void> runBackgroundFetch() { 122 sLogger.d("Starting %s", JOB_DESCRIPTION); 123 return mSingletonRunner.runSingleInstance(); 124 } 125 126 /** Requests that any ongoing work be stopped gracefully and waits for work to be stopped. */ stopWork()127 public void stopWork() { 128 mSingletonRunner.stopWork(); 129 } 130 doRun(@onNull Supplier<Boolean> shouldStop)131 private FluentFuture<Void> doRun(@NonNull Supplier<Boolean> shouldStop) { 132 Instant jobStartTime = mClock.instant(); 133 return cleanupFledgeData(jobStartTime) 134 .transform( 135 ignored -> getFetchDataList(shouldStop, jobStartTime), 136 AdServicesExecutors.getBackgroundExecutor()) 137 .transformAsync( 138 fetchDataList -> updateData(fetchDataList, shouldStop, jobStartTime), 139 AdServicesExecutors.getBackgroundExecutor()) 140 .withTimeout( 141 mFlags.getFledgeBackgroundFetchJobMaxRuntimeMs(), 142 TimeUnit.MILLISECONDS, 143 AdServicesExecutors.getScheduler()); 144 } 145 updateData( @onNull List<DBCustomAudienceBackgroundFetchData> fetchDataList, @NonNull Supplier<Boolean> shouldStop, @NonNull Instant jobStartTime)146 private ListenableFuture<Void> updateData( 147 @NonNull List<DBCustomAudienceBackgroundFetchData> fetchDataList, 148 @NonNull Supplier<Boolean> shouldStop, 149 @NonNull Instant jobStartTime) { 150 if (fetchDataList.isEmpty()) { 151 sLogger.d("No custom audiences found to update"); 152 return FluentFuture.from(Futures.immediateVoidFuture()); 153 } 154 155 sLogger.d("Updating %d custom audiences", fetchDataList.size()); 156 // Divide the gathered CAs among worker threads 157 int numWorkers = 158 Math.min( 159 Math.max(1, Runtime.getRuntime().availableProcessors() - 2), 160 mFlags.getFledgeBackgroundFetchThreadPoolSize()); 161 int numCustomAudiencesPerWorker = 162 (fetchDataList.size() / numWorkers) 163 + (((fetchDataList.size() % numWorkers) == 0) ? 0 : 1); 164 165 List<ListenableFuture<?>> subListFutureUpdates = new ArrayList<>(); 166 for (final List<DBCustomAudienceBackgroundFetchData> fetchDataSubList : 167 Lists.partition(fetchDataList, numCustomAudiencesPerWorker)) { 168 if (shouldStop.get()) { 169 break; 170 } 171 // Updates in each batch are sequenced 172 ExecutionSequencer sequencer = ExecutionSequencer.create(); 173 for (DBCustomAudienceBackgroundFetchData fetchData : fetchDataSubList) { 174 subListFutureUpdates.add( 175 sequencer.submitAsync( 176 () -> 177 mBackgroundFetchRunner.updateCustomAudience( 178 jobStartTime, fetchData), 179 AdServicesExecutors.getBackgroundExecutor())); 180 } 181 } 182 183 return FluentFuture.from(Futures.allAsList(subListFutureUpdates)) 184 .transform(ignored -> null, AdServicesExecutors.getLightWeightExecutor()); 185 } 186 getFetchDataList( @onNull Supplier<Boolean> shouldStop, @NonNull Instant jobStartTime)187 private List<DBCustomAudienceBackgroundFetchData> getFetchDataList( 188 @NonNull Supplier<Boolean> shouldStop, @NonNull Instant jobStartTime) { 189 if (shouldStop.get()) { 190 sLogger.d("Stopping " + JOB_DESCRIPTION); 191 return ImmutableList.of(); 192 } 193 194 // Fetch stale/eligible/delinquent custom audiences 195 return mCustomAudienceDao.getActiveEligibleCustomAudienceBackgroundFetchData( 196 jobStartTime, mFlags.getFledgeBackgroundFetchMaxNumUpdated()); 197 } 198 cleanupFledgeData(Instant jobStartTime)199 private FluentFuture<?> cleanupFledgeData(Instant jobStartTime) { 200 return FluentFuture.from( 201 AdServicesExecutors.getBackgroundExecutor() 202 .submit( 203 () -> { 204 // Clean up custom audiences first so the actual fetch won't do 205 // unnecessary work 206 mBackgroundFetchRunner.deleteExpiredCustomAudiences( 207 jobStartTime); 208 mBackgroundFetchRunner.deleteDisallowedOwnerCustomAudiences(); 209 mBackgroundFetchRunner.deleteDisallowedBuyerCustomAudiences(); 210 if (mFlags.getFledgeAdSelectionFilteringEnabled()) { 211 mBackgroundFetchRunner 212 .deleteDisallowedPackageAppInstallEntries(); 213 } 214 })); 215 } 216 } 217