1 /* 2 * Copyright (C) 2024 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.adservices.service.measurement.reporting; 18 19 import static com.android.adservices.service.measurement.util.JobLockHolder.Type.AGGREGATE_REPORTING; 20 import static com.android.adservices.service.measurement.util.JobLockHolder.Type.EVENT_REPORTING; 21 import static com.android.adservices.service.stats.AdServicesStatsLog.AD_SERVICES_BACKGROUND_JOBS_EXECUTION_REPORTED__EXECUTION_RESULT_CODE__SKIP_FOR_KILL_SWITCH_ON; 22 import static com.android.adservices.spe.AdServicesJobInfo.MEASUREMENT_REPORTING_JOB; 23 24 import android.app.job.JobInfo; 25 import android.app.job.JobParameters; 26 import android.app.job.JobScheduler; 27 import android.app.job.JobService; 28 import android.content.ComponentName; 29 import android.content.Context; 30 31 import com.android.adservices.LogUtil; 32 import com.android.adservices.LoggerFactory; 33 import com.android.adservices.concurrency.AdServicesExecutors; 34 import com.android.adservices.data.measurement.DatastoreManager; 35 import com.android.adservices.data.measurement.DatastoreManagerFactory; 36 import com.android.adservices.service.Flags; 37 import com.android.adservices.service.FlagsFactory; 38 import com.android.adservices.service.common.compat.ServiceCompatUtils; 39 import com.android.adservices.service.measurement.KeyValueData; 40 import com.android.adservices.service.measurement.aggregation.AggregateEncryptionKeyManager; 41 import com.android.adservices.service.measurement.util.JobLockHolder; 42 import com.android.adservices.service.stats.AdServicesLoggerImpl; 43 import com.android.adservices.spe.AdServicesJobServiceLogger; 44 import com.android.internal.annotations.VisibleForTesting; 45 46 import com.google.android.libraries.mobiledatadownload.internal.AndroidTimeSource; 47 import com.google.common.util.concurrent.ListeningExecutorService; 48 49 import java.util.Optional; 50 import java.util.concurrent.Future; 51 52 /** 53 * Service for sending event and aggregate reports. Reporting logic contained in {@link 54 * EventReportingJobHandler} and {@link AggregateReportingJobHandler}. 55 * 56 * <p>Bug(b/342687912): This will eventually replace {@link EventReportingJobService} and {@link 57 * AggregateReportingJobService}. 58 */ 59 // TODO(b/311183933): Remove passed in Context from static method. 60 @SuppressWarnings("AvoidStaticContext") 61 public final class ReportingJobService extends JobService { 62 private static final ListeningExecutorService sBlockingExecutor = 63 AdServicesExecutors.getBlockingExecutor(); 64 private Future mExecutorFuture; 65 private static final String JOB_NEXT_EXECUTION_TIME = "job_next_execution_time"; 66 private static final String JOB_LAST_EXECUTION_TIME = "job_last_execution_time"; 67 68 @Override onStartJob(JobParameters params)69 public boolean onStartJob(JobParameters params) { 70 // Always ensure that the first thing this job does is check if it should be running, and 71 // cancel itself if it's not supposed to be. 72 if (ServiceCompatUtils.shouldDisableExtServicesJobOnTPlus(this)) { 73 LogUtil.d( 74 "Disabling %s job because it's running in" + " ExtServices on T+", 75 this.getClass().getSimpleName()); 76 return skipAndCancelBackgroundJob(params, /* skipReason= */ 0, /* doRecord= */ false); 77 } 78 79 AdServicesJobServiceLogger.getInstance() 80 .recordOnStartJob(MEASUREMENT_REPORTING_JOB.getJobId()); 81 82 if (!FlagsFactory.getFlags().getMeasurementReportingJobServiceEnabled()) { 83 LoggerFactory.getMeasurementLogger() 84 .e("%s is disabled", this.getClass().getSimpleName()); 85 return skipAndCancelBackgroundJob( 86 params, 87 AD_SERVICES_BACKGROUND_JOBS_EXECUTION_REPORTED__EXECUTION_RESULT_CODE__SKIP_FOR_KILL_SWITCH_ON, 88 /* doRecord= */ true); 89 } 90 91 LoggerFactory.getMeasurementLogger().d("%s.onStartJob", this.getClass().getSimpleName()); 92 mExecutorFuture = 93 sBlockingExecutor.submit( 94 () -> { 95 try { 96 saveExecutionStartTime(); 97 processPendingAggregateReports(); 98 processPendingEventReports(); 99 100 AdServicesJobServiceLogger.getInstance() 101 .recordJobFinished( 102 MEASUREMENT_REPORTING_JOB.getJobId(), 103 /* isSuccessful= */ true, 104 /* shouldRetry= */ false); 105 jobFinished(params, /* wantsReschedule= */ false); 106 } finally { 107 scheduleIfNeeded( 108 getApplicationContext(), /* forceSchedule= */ false); 109 } 110 }); 111 return true; 112 } 113 114 @Override onStopJob(JobParameters params)115 public boolean onStopJob(JobParameters params) { 116 LoggerFactory.getMeasurementLogger().d("%s.onStopJob", this.getClass().getSimpleName()); 117 boolean shouldRetry = true; 118 if (mExecutorFuture != null) { 119 shouldRetry = mExecutorFuture.cancel(/* mayInterruptIfRunning */ true); 120 } 121 AdServicesJobServiceLogger.getInstance() 122 .recordOnStopJob(params, MEASUREMENT_REPORTING_JOB.getJobId(), shouldRetry); 123 return shouldRetry; 124 } 125 126 /** 127 * Schedule execution of this job service based on pending reports in the database, either 128 * aggregate or event. 129 * 130 * <p>If there are no pending reports, this service will not be scheduled. 131 * 132 * <p>This job will be scheduled to the latest report within the batching window. The batching 133 * window is the window of time between the next earliest report and the window length specified 134 * in the flags. 135 * 136 * <p>Job scheduling will also be throttled by a minimum execution window specified in the 137 * flags. 138 * 139 * @param context application context 140 * @param forceSchedule true if the job is to be scheduled at the next pending report, 141 * disregarding the minimum execution window. If there is no pending report, this job will 142 * not be scheduled. 143 */ scheduleIfNeeded(Context context, boolean forceSchedule)144 public static void scheduleIfNeeded(Context context, boolean forceSchedule) { 145 Flags flags = FlagsFactory.getFlags(); 146 if (!flags.getMeasurementReportingJobServiceEnabled()) { 147 LoggerFactory.getMeasurementLogger() 148 .e("ReportingJobService is disabled, skip scheduling"); 149 return; 150 } 151 152 final JobScheduler jobScheduler = context.getSystemService(JobScheduler.class); 153 if (jobScheduler == null) { 154 LoggerFactory.getMeasurementLogger().e("JobScheduler not found"); 155 return; 156 } 157 158 Optional<Long> latestReportTimeInBatchOpt = getLastReportTimeInBatch(context, flags); 159 if (latestReportTimeInBatchOpt.isEmpty()) { 160 LoggerFactory.getMeasurementLogger() 161 .d("ReportingJobService found no pending reports. Aborting scheduling."); 162 return; 163 } 164 165 long latestReportTimeInBatch = latestReportTimeInBatchOpt.get(); 166 long lastExecution = getLastExecution(context); 167 Long nextScheduledExecution = getNextScheduledExecution(context); 168 long minExecutionWindowEnd = 169 lastExecution + flags.getMeasurementReportingJobServiceMinExecutionWindowMillis(); 170 171 final JobInfo scheduledJob = 172 jobScheduler.getPendingJob(MEASUREMENT_REPORTING_JOB.getJobId()); 173 174 long nextExecutionTime = 175 getNextExecutionTime(forceSchedule, latestReportTimeInBatch, minExecutionWindowEnd); 176 JobInfo jobInfo = buildJobInfo(context, flags, nextExecutionTime); 177 if (forceSchedule 178 || !isNextReportScheduled( 179 scheduledJob, nextScheduledExecution, latestReportTimeInBatch)) { 180 jobScheduler.schedule(jobInfo); 181 saveNextExecution(context, latestReportTimeInBatch); 182 LoggerFactory.getMeasurementLogger().d("Scheduled ReportingJobService"); 183 } 184 } 185 getNextExecutionTime( boolean forceSchedule, long latestReportTimeInBatch, long minExecutionWindowEnd)186 private static long getNextExecutionTime( 187 boolean forceSchedule, long latestReportTimeInBatch, long minExecutionWindowEnd) { 188 return forceSchedule 189 ? latestReportTimeInBatch 190 : Math.max(minExecutionWindowEnd, latestReportTimeInBatch); 191 } 192 saveNextExecution(Context context, Long latestReportTimeInBatch)193 private static void saveNextExecution(Context context, Long latestReportTimeInBatch) { 194 DatastoreManager datastoreManager = DatastoreManagerFactory.getDatastoreManager(); 195 datastoreManager.runInTransaction(getSaveNextExecutionConsumer(latestReportTimeInBatch)); 196 } 197 getSaveNextExecutionConsumer( Long latestReportTimeInBatch)198 private static DatastoreManager.ThrowingCheckedConsumer getSaveNextExecutionConsumer( 199 Long latestReportTimeInBatch) { 200 return measurementDao -> { 201 KeyValueData nextScheduledExecution = 202 measurementDao.getKeyValueData( 203 JOB_NEXT_EXECUTION_TIME, KeyValueData.DataType.JOB_NEXT_EXECUTION_TIME); 204 nextScheduledExecution.setReportingJobNextExecutionTime(latestReportTimeInBatch); 205 measurementDao.insertOrUpdateKeyValueData(nextScheduledExecution); 206 }; 207 } 208 isNextReportScheduled( JobInfo scheduledJob, Long nextScheduledExecution, long latestReportTimeInBatch)209 private static boolean isNextReportScheduled( 210 JobInfo scheduledJob, Long nextScheduledExecution, long latestReportTimeInBatch) { 211 return scheduledJob != null 212 && nextScheduledExecution != null 213 && nextScheduledExecution == latestReportTimeInBatch; 214 } 215 getNextScheduledExecution(Context context)216 private static Long getNextScheduledExecution(Context context) { 217 DatastoreManager dataStoreManager = DatastoreManagerFactory.getDatastoreManager(); 218 219 KeyValueData kvData = 220 dataStoreManager 221 .runInTransactionWithResult( 222 measurementDao -> 223 measurementDao.getKeyValueData( 224 JOB_NEXT_EXECUTION_TIME, 225 KeyValueData.DataType.JOB_NEXT_EXECUTION_TIME)) 226 .orElseThrow(); 227 228 return kvData.getReportingJobNextExecutionTime(); 229 } 230 getLastReportTimeInBatch(Context context, Flags flags)231 private static Optional<Long> getLastReportTimeInBatch(Context context, Flags flags) { 232 DatastoreManager dataStoreManager = DatastoreManagerFactory.getDatastoreManager(); 233 234 return dataStoreManager.runInTransactionWithResult( 235 measurementDao -> 236 measurementDao.getLatestReportTimeInBatchWindow( 237 flags.getMeasurementReportingJobServiceBatchWindowMillis())); 238 } 239 saveExecutionStartTime()240 private void saveExecutionStartTime() { 241 DatastoreManager datastoreManager = DatastoreManagerFactory.getDatastoreManager(); 242 datastoreManager.runInTransaction(getSaveExecutionTimeConsumer()); 243 } 244 getSaveExecutionTimeConsumer()245 private DatastoreManager.ThrowingCheckedConsumer getSaveExecutionTimeConsumer() { 246 return measurementDao -> { 247 KeyValueData lastExecution = 248 measurementDao.getKeyValueData( 249 JOB_LAST_EXECUTION_TIME, KeyValueData.DataType.JOB_LAST_EXECUTION_TIME); 250 251 lastExecution.setReportingJobLastExecutionTime(System.currentTimeMillis()); 252 measurementDao.insertOrUpdateKeyValueData(lastExecution); 253 }; 254 } 255 256 private static long getLastExecution(Context context) { 257 DatastoreManager dataStoreManager = DatastoreManagerFactory.getDatastoreManager(); 258 259 KeyValueData lastExecution = 260 dataStoreManager 261 .runInTransactionWithResult( 262 measurementDao -> 263 measurementDao.getKeyValueData( 264 JOB_LAST_EXECUTION_TIME, 265 KeyValueData.DataType.JOB_LAST_EXECUTION_TIME)) 266 .orElseThrow(); 267 268 return lastExecution.getReportingJobLastExecutionTime() != null 269 ? lastExecution.getReportingJobLastExecutionTime() 270 : Long.MIN_VALUE; 271 } 272 273 private boolean skipAndCancelBackgroundJob( 274 final JobParameters params, int skipReason, boolean doRecord) { 275 final JobScheduler jobScheduler = this.getSystemService(JobScheduler.class); 276 if (jobScheduler != null) { 277 jobScheduler.cancel(MEASUREMENT_REPORTING_JOB.getJobId()); 278 saveNextExecution(getApplicationContext(), null); 279 } 280 281 if (doRecord) { 282 AdServicesJobServiceLogger.getInstance() 283 .recordJobSkipped(MEASUREMENT_REPORTING_JOB.getJobId(), skipReason); 284 } 285 286 // Tell the JobScheduler that the job has completed and does not need to be rescheduled. 287 jobFinished(params, false); 288 289 // Returning false means that this job has completed its work. 290 return false; 291 } 292 293 private static JobInfo buildJobInfo(Context context, Flags flags, long nextExecutionTime) { 294 JobInfo.Builder builder = 295 new JobInfo.Builder( 296 MEASUREMENT_REPORTING_JOB.getJobId(), 297 new ComponentName(context, ReportingJobService.class)) 298 .setRequiresBatteryNotLow( 299 flags.getMeasurementReportingJobRequiredBatteryNotLow()) 300 .setRequiredNetworkType( 301 flags.getMeasurementReportingJobRequiredNetworkType()) 302 .setPersisted(flags.getMeasurementReportingJobPersisted()); 303 // nextExecutionTime could potentially be in the past, i.e. for Aggregate Reports with 304 // trigger context ids. Using such a timestamp would result in a negative minimum latency. 305 if (nextExecutionTime > System.currentTimeMillis()) { 306 builder.setMinimumLatency(nextExecutionTime - System.currentTimeMillis()); 307 } 308 309 return builder.build(); 310 } 311 312 @VisibleForTesting 313 void processPendingAggregateReports() { 314 JobLockHolder.getInstance(AGGREGATE_REPORTING) 315 .runWithLock( 316 "ReportingJobService", 317 () -> { 318 long maxAggregateReportUploadRetryWindowMs = 319 FlagsFactory.getFlags() 320 .getMeasurementMaxAggregateReportUploadRetryWindowMs(); 321 DatastoreManager datastoreManager = 322 DatastoreManagerFactory.getDatastoreManager(); 323 new AggregateReportingJobHandler( 324 datastoreManager, 325 new AggregateEncryptionKeyManager( 326 datastoreManager, getApplicationContext()), 327 FlagsFactory.getFlags(), 328 AdServicesLoggerImpl.getInstance(), 329 ReportingStatus.ReportType.AGGREGATE, 330 ReportingStatus.UploadMethod.REGULAR, 331 getApplicationContext(), 332 new AndroidTimeSource()) 333 .performScheduledPendingReportsInWindow( 334 System.currentTimeMillis() 335 - maxAggregateReportUploadRetryWindowMs, 336 System.currentTimeMillis()); 337 }); 338 } 339 340 @VisibleForTesting 341 void processPendingEventReports() { 342 JobLockHolder.getInstance(EVENT_REPORTING) 343 .runWithLock( 344 "ReportingJobService", 345 () -> { 346 long maxEventReportUploadRetryWindowMs = 347 FlagsFactory.getFlags() 348 .getMeasurementMaxEventReportUploadRetryWindowMs(); 349 new EventReportingJobHandler( 350 DatastoreManagerFactory.getDatastoreManager(), 351 FlagsFactory.getFlags(), 352 AdServicesLoggerImpl.getInstance(), 353 ReportingStatus.ReportType.EVENT, 354 ReportingStatus.UploadMethod.REGULAR, 355 getApplicationContext(), 356 new AndroidTimeSource()) 357 .performScheduledPendingReportsInWindow( 358 System.currentTimeMillis() 359 - maxEventReportUploadRetryWindowMs, 360 System.currentTimeMillis()); 361 }); 362 } 363 364 @VisibleForTesting 365 static void schedule(JobScheduler jobScheduler, JobInfo jobInfo) { 366 jobScheduler.schedule(jobInfo); 367 } 368 369 @VisibleForTesting 370 Future getFutureForTesting() { 371 return mExecutorFuture; 372 } 373 } 374