1 /* 2 * Copyright (C) 2022 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package android.adservices.ondevicepersonalization; 18 19 import android.adservices.ondevicepersonalization.aidl.IDataAccessService; 20 import android.adservices.ondevicepersonalization.aidl.IFederatedComputeCallback; 21 import android.adservices.ondevicepersonalization.aidl.IFederatedComputeService; 22 import android.annotation.CallbackExecutor; 23 import android.annotation.FlaggedApi; 24 import android.annotation.NonNull; 25 import android.annotation.WorkerThread; 26 import android.federatedcompute.common.TrainingOptions; 27 import android.os.OutcomeReceiver; 28 import android.os.RemoteException; 29 30 import com.android.adservices.ondevicepersonalization.flags.Flags; 31 import com.android.ondevicepersonalization.internal.util.LoggerFactory; 32 33 import java.util.concurrent.CountDownLatch; 34 import java.util.concurrent.Executor; 35 import java.util.concurrent.TimeUnit; 36 37 /** 38 * Handles scheduling federated compute jobs. See {@link 39 * IsolatedService#getFederatedComputeScheduler}. 40 */ 41 public class FederatedComputeScheduler { 42 private static final String TAG = FederatedComputeScheduler.class.getSimpleName(); 43 44 private static final int FEDERATED_COMPUTE_SCHEDULE_TIMEOUT_SECONDS = 30; 45 private static final LoggerFactory.Logger sLogger = LoggerFactory.getLogger(); 46 47 private final IFederatedComputeService mFcService; 48 private final IDataAccessService mDataAccessService; 49 50 /** @hide */ FederatedComputeScheduler( IFederatedComputeService binder, IDataAccessService dataService)51 public FederatedComputeScheduler( 52 IFederatedComputeService binder, IDataAccessService dataService) { 53 mFcService = binder; 54 mDataAccessService = dataService; 55 } 56 57 /** 58 * Schedules a federated compute job. In {@link IsolatedService#onRequest}, the app can call 59 * {@link IsolatedService#getFederatedComputeScheduler} to pass the scheduler when constructing 60 * the {@link IsolatedWorker}. 61 * 62 * @param params parameters related to job scheduling. 63 * @param input the configuration of the federated computation. It should be consistent with the 64 * federated compute server setup as described in <a 65 * href="https://developers.google.com/privacy-sandbox/protections/on-device-personalization/federated-compute-server"> 66 * Federated Compute Server documentation. </a>. 67 */ 68 @WorkerThread schedule(@onNull Params params, @NonNull FederatedComputeInput input)69 public void schedule(@NonNull Params params, @NonNull FederatedComputeInput input) { 70 if (mFcService == null) { 71 logApiCallStats( 72 Constants.API_NAME_FEDERATED_COMPUTE_SCHEDULE, 73 0, 74 Constants.STATUS_INTERNAL_ERROR); 75 throw new IllegalStateException( 76 "FederatedComputeScheduler not available for this instance."); 77 } 78 final long startTimeMillis = System.currentTimeMillis(); 79 TrainingOptions trainingOptions = 80 new TrainingOptions.Builder() 81 .setPopulationName(input.getPopulationName()) 82 .setTrainingInterval(convertTrainingInterval(params.getTrainingInterval())) 83 .build(); 84 85 CountDownLatch latch = new CountDownLatch(1); 86 final int[] err = {0}; 87 int responseCode = Constants.STATUS_INTERNAL_ERROR; 88 try { 89 mFcService.schedule( 90 trainingOptions, 91 new IFederatedComputeCallback.Stub() { 92 @Override 93 public void onSuccess() { 94 latch.countDown(); 95 } 96 97 @Override 98 public void onFailure(int i) { 99 err[0] = i; 100 latch.countDown(); 101 } 102 }); 103 104 boolean countedDown = 105 latch.await(FEDERATED_COMPUTE_SCHEDULE_TIMEOUT_SECONDS, TimeUnit.SECONDS); 106 107 if (err[0] != 0) { 108 sLogger.e( 109 TAG + " : Internal failure occurred while scheduling job, error code %d", 110 err[0]); 111 responseCode = err[0]; 112 return; 113 } else if (!countedDown) { 114 sLogger.d(TAG + " : timed out waiting for schedule operation to complete."); 115 responseCode = Constants.STATUS_TIMEOUT; 116 return; 117 } 118 responseCode = Constants.STATUS_SUCCESS; 119 } catch (RemoteException | InterruptedException e) { 120 responseCode = Constants.STATUS_REMOTE_EXCEPTION; 121 sLogger.e(TAG + ": Failed to schedule federated compute job", e); 122 throw new IllegalStateException(e); 123 } finally { 124 logApiCallStats( 125 Constants.API_NAME_FEDERATED_COMPUTE_SCHEDULE, 126 System.currentTimeMillis() - startTimeMillis, 127 responseCode); 128 } 129 } 130 131 /** 132 * Schedules a federated compute job. In {@link IsolatedService#onRequest}, the app can call 133 * {@link IsolatedService#getFederatedComputeScheduler} to pass the scheduler when constructing 134 * the {@link IsolatedWorker}. 135 * 136 * @param federatedComputeScheduleRequest input parameters related to job scheduling. 137 * @param executor the {@link Executor} on which to invoke the callback. 138 * @param outcomeReceiver This either returns a {@link FederatedComputeScheduleResponse} on 139 * success, or {@link Exception} on failure. The exception type is {@link 140 * OnDevicePersonalizationException} with error code {@link 141 * OnDevicePersonalizationException#ERROR_INVALID_TRAINING_MANIFEST} if the manifest is 142 * missing the federated compute server URL or {@link 143 * OnDevicePersonalizationException#ERROR_SCHEDULE_TRAINING_FAILED} when scheduling fails 144 * for other reasons. 145 */ 146 @WorkerThread 147 @FlaggedApi(Flags.FLAG_FCP_SCHEDULE_WITH_OUTCOME_RECEIVER_ENABLED) schedule( @onNull FederatedComputeScheduleRequest federatedComputeScheduleRequest, @NonNull @CallbackExecutor Executor executor, @NonNull OutcomeReceiver<FederatedComputeScheduleResponse, Exception> outcomeReceiver)148 public void schedule( 149 @NonNull FederatedComputeScheduleRequest federatedComputeScheduleRequest, 150 @NonNull @CallbackExecutor Executor executor, 151 @NonNull OutcomeReceiver<FederatedComputeScheduleResponse, Exception> outcomeReceiver) { 152 if (mFcService == null) { 153 logApiCallStats( 154 Constants.API_NAME_FEDERATED_COMPUTE_SCHEDULE, 155 0, 156 Constants.STATUS_INTERNAL_ERROR); 157 executor.execute( 158 () -> { 159 outcomeReceiver.onError(new IllegalStateException( 160 "FederatedComputeScheduler not available for this instance.")); 161 }); 162 } 163 164 final long startTimeMillis = System.currentTimeMillis(); 165 TrainingOptions trainingOptions = 166 new TrainingOptions.Builder() 167 .setPopulationName(federatedComputeScheduleRequest.getPopulationName()) 168 .setTrainingInterval( 169 convertTrainingInterval( 170 federatedComputeScheduleRequest 171 .getParams() 172 .getTrainingInterval())) 173 .build(); 174 try { 175 mFcService.schedule( 176 trainingOptions, 177 new IFederatedComputeCallback.Stub() { 178 @Override 179 public void onSuccess() { 180 logApiCallStats( 181 Constants.API_NAME_FEDERATED_COMPUTE_SCHEDULE, 182 System.currentTimeMillis() - startTimeMillis, 183 Constants.STATUS_SUCCESS); 184 executor.execute( 185 () -> { 186 outcomeReceiver.onResult( 187 new FederatedComputeScheduleResponse( 188 federatedComputeScheduleRequest)); 189 }); 190 } 191 192 @Override 193 public void onFailure(int errorCode) { 194 logApiCallStats( 195 Constants.API_NAME_FEDERATED_COMPUTE_SCHEDULE, 196 System.currentTimeMillis() - startTimeMillis, 197 errorCode); 198 executor.execute( 199 () -> { 200 outcomeReceiver.onError( 201 new OnDevicePersonalizationException( 202 translateErrorCode(errorCode))); 203 }); 204 } 205 }); 206 } catch (RemoteException e) { 207 sLogger.e(TAG + ": Failed to schedule federated compute job", e); 208 logApiCallStats( 209 Constants.API_NAME_FEDERATED_COMPUTE_SCHEDULE, 210 System.currentTimeMillis() - startTimeMillis, 211 Constants.STATUS_REMOTE_EXCEPTION); 212 executor.execute( 213 () -> { 214 outcomeReceiver.onError(e); 215 }); 216 } 217 } 218 219 /** 220 * Translate the failed error code from the {@link IFederatedComputeService} to appropriate API 221 * surface error code. 222 */ translateErrorCode(int i)223 private static int translateErrorCode(int i) { 224 // Returns invalid/missing manifest or general error code to caller. The general error code 225 // includes personalization disable and all other errors populated from FCP service. 226 return i == Constants.STATUS_FCP_MANIFEST_INVALID 227 ? OnDevicePersonalizationException.ERROR_INVALID_TRAINING_MANIFEST 228 : OnDevicePersonalizationException.ERROR_SCHEDULE_TRAINING_FAILED; 229 } 230 231 /** 232 * Cancels a federated compute job with input training params. In {@link 233 * IsolatedService#onRequest}, the app can call {@link 234 * IsolatedService#getFederatedComputeScheduler} to pass scheduler when constructing the {@link 235 * IsolatedWorker}. 236 * 237 * @param input the configuration of the federated compute. It should be consistent with the 238 * federated compute server setup. 239 */ 240 @WorkerThread cancel(@onNull FederatedComputeInput input)241 public void cancel(@NonNull FederatedComputeInput input) { 242 final long startTimeMillis = System.currentTimeMillis(); 243 int responseCode = Constants.STATUS_INTERNAL_ERROR; 244 if (mFcService == null) { 245 logApiCallStats( 246 Constants.API_NAME_FEDERATED_COMPUTE_CANCEL, 247 System.currentTimeMillis() - startTimeMillis, 248 responseCode); 249 throw new IllegalStateException( 250 "FederatedComputeScheduler not available for this instance."); 251 } 252 CountDownLatch latch = new CountDownLatch(1); 253 final int[] err = {0}; 254 try { 255 mFcService.cancel( 256 input.getPopulationName(), 257 new IFederatedComputeCallback.Stub() { 258 @Override 259 public void onSuccess() { 260 latch.countDown(); 261 } 262 263 @Override 264 public void onFailure(int errorCode) { 265 err[0] = errorCode; 266 latch.countDown(); 267 } 268 }); 269 boolean countedDown = 270 latch.await(FEDERATED_COMPUTE_SCHEDULE_TIMEOUT_SECONDS, TimeUnit.SECONDS); 271 if (err[0] != 0) { 272 sLogger.e("Internal failure occurred while cancelling job, error code %d", err[0]); 273 responseCode = Constants.STATUS_INTERNAL_ERROR; 274 // Fail silently for now. TODO(b/346827691): update schedule/cancel API to return 275 // error status to caller. 276 return; 277 } else if (!countedDown) { 278 sLogger.d(TAG + " : timed out waiting for cancel operation to complete."); 279 responseCode = Constants.STATUS_INTERNAL_ERROR; 280 return; 281 } 282 responseCode = Constants.STATUS_SUCCESS; 283 } catch (RemoteException | InterruptedException e) { 284 sLogger.e(TAG + ": Failed to cancel federated compute job", e); 285 throw new IllegalStateException(e); 286 } finally { 287 logApiCallStats( 288 Constants.API_NAME_FEDERATED_COMPUTE_CANCEL, 289 System.currentTimeMillis() - startTimeMillis, 290 responseCode); 291 } 292 } 293 convertTrainingInterval( TrainingInterval interval)294 private static android.federatedcompute.common.TrainingInterval convertTrainingInterval( 295 TrainingInterval interval) { 296 return new android.federatedcompute.common.TrainingInterval.Builder() 297 .setMinimumIntervalMillis(interval.getMinimumInterval().toMillis()) 298 .setSchedulingMode(convertSchedulingMode(interval)) 299 .build(); 300 } 301 302 private static @android.federatedcompute.common.TrainingInterval.SchedulingMode int convertSchedulingMode(TrainingInterval interval)303 convertSchedulingMode(TrainingInterval interval) { 304 switch (interval.getSchedulingMode()) { 305 case TrainingInterval.SCHEDULING_MODE_ONE_TIME: 306 return android.federatedcompute.common.TrainingInterval.SCHEDULING_MODE_ONE_TIME; 307 case TrainingInterval.SCHEDULING_MODE_RECURRENT: 308 return android.federatedcompute.common.TrainingInterval.SCHEDULING_MODE_RECURRENT; 309 default: 310 throw new IllegalStateException( 311 "Unsupported scheduling mode " + interval.getSchedulingMode()); 312 } 313 } 314 315 /** Helper method to log call stats based on response code. */ logApiCallStats(int apiName, long duration, int responseCode)316 private void logApiCallStats(int apiName, long duration, int responseCode) { 317 try { 318 mDataAccessService.logApiCallStats(apiName, duration, responseCode); 319 } catch (Exception e) { 320 sLogger.d(e, TAG + ": failed to log metrics"); 321 } 322 } 323 324 /** The parameters related to job scheduling. */ 325 public static class Params { 326 /** 327 * If training interval is scheduled for recurrent tasks, the earliest time this task could 328 * start is after the minimum training interval expires. E.g. If the task is set to run 329 * maximum once per day, the first run of this task will be one day after this task is 330 * scheduled. When a one time job is scheduled, the earliest next runtime is calculated 331 * based on federated compute default interval. 332 */ 333 @NonNull private final TrainingInterval mTrainingInterval; 334 Params(@onNull TrainingInterval trainingInterval)335 public Params(@NonNull TrainingInterval trainingInterval) { 336 mTrainingInterval = trainingInterval; 337 } 338 339 @NonNull getTrainingInterval()340 public TrainingInterval getTrainingInterval() { 341 return mTrainingInterval; 342 } 343 } 344 } 345