• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package android.adservices.ondevicepersonalization;
18 
19 import android.adservices.ondevicepersonalization.aidl.IDataAccessService;
20 import android.adservices.ondevicepersonalization.aidl.IFederatedComputeCallback;
21 import android.adservices.ondevicepersonalization.aidl.IFederatedComputeService;
22 import android.annotation.CallbackExecutor;
23 import android.annotation.FlaggedApi;
24 import android.annotation.NonNull;
25 import android.annotation.WorkerThread;
26 import android.federatedcompute.common.TrainingOptions;
27 import android.os.OutcomeReceiver;
28 import android.os.RemoteException;
29 
30 import com.android.adservices.ondevicepersonalization.flags.Flags;
31 import com.android.ondevicepersonalization.internal.util.LoggerFactory;
32 
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.Executor;
35 import java.util.concurrent.TimeUnit;
36 
37 /**
38  * Handles scheduling federated compute jobs. See {@link
39  * IsolatedService#getFederatedComputeScheduler}.
40  */
41 public class FederatedComputeScheduler {
42     private static final String TAG = FederatedComputeScheduler.class.getSimpleName();
43 
44     private static final int FEDERATED_COMPUTE_SCHEDULE_TIMEOUT_SECONDS = 30;
45     private static final LoggerFactory.Logger sLogger = LoggerFactory.getLogger();
46 
47     private final IFederatedComputeService mFcService;
48     private final IDataAccessService mDataAccessService;
49 
50     /** @hide */
FederatedComputeScheduler( IFederatedComputeService binder, IDataAccessService dataService)51     public FederatedComputeScheduler(
52             IFederatedComputeService binder, IDataAccessService dataService) {
53         mFcService = binder;
54         mDataAccessService = dataService;
55     }
56 
57     /**
58      * Schedules a federated compute job. In {@link IsolatedService#onRequest}, the app can call
59      * {@link IsolatedService#getFederatedComputeScheduler} to pass the scheduler when constructing
60      * the {@link IsolatedWorker}.
61      *
62      * @param params parameters related to job scheduling.
63      * @param input the configuration of the federated computation. It should be consistent with the
64      *     federated compute server setup as described in <a
65      *     href="https://developers.google.com/privacy-sandbox/protections/on-device-personalization/federated-compute-server">
66      *     Federated Compute Server documentation. </a>.
67      */
68     @WorkerThread
schedule(@onNull Params params, @NonNull FederatedComputeInput input)69     public void schedule(@NonNull Params params, @NonNull FederatedComputeInput input) {
70         if (mFcService == null) {
71             logApiCallStats(
72                     Constants.API_NAME_FEDERATED_COMPUTE_SCHEDULE,
73                     0,
74                     Constants.STATUS_INTERNAL_ERROR);
75             throw new IllegalStateException(
76                     "FederatedComputeScheduler not available for this instance.");
77         }
78         final long startTimeMillis = System.currentTimeMillis();
79         TrainingOptions trainingOptions =
80                 new TrainingOptions.Builder()
81                         .setPopulationName(input.getPopulationName())
82                         .setTrainingInterval(convertTrainingInterval(params.getTrainingInterval()))
83                         .build();
84 
85         CountDownLatch latch = new CountDownLatch(1);
86         final int[] err = {0};
87         int responseCode = Constants.STATUS_INTERNAL_ERROR;
88         try {
89             mFcService.schedule(
90                     trainingOptions,
91                     new IFederatedComputeCallback.Stub() {
92                         @Override
93                         public void onSuccess() {
94                             latch.countDown();
95                         }
96 
97                         @Override
98                         public void onFailure(int i) {
99                             err[0] = i;
100                             latch.countDown();
101                         }
102                     });
103 
104             boolean countedDown =
105                     latch.await(FEDERATED_COMPUTE_SCHEDULE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
106 
107             if (err[0] != 0) {
108                 sLogger.e(
109                         TAG + " : Internal failure occurred while scheduling job, error code %d",
110                         err[0]);
111                 responseCode = err[0];
112                 return;
113             } else if (!countedDown) {
114                 sLogger.d(TAG + " : timed out waiting for schedule operation to complete.");
115                 responseCode = Constants.STATUS_TIMEOUT;
116                 return;
117             }
118             responseCode = Constants.STATUS_SUCCESS;
119         } catch (RemoteException | InterruptedException e) {
120             responseCode = Constants.STATUS_REMOTE_EXCEPTION;
121             sLogger.e(TAG + ": Failed to schedule federated compute job", e);
122             throw new IllegalStateException(e);
123         } finally {
124             logApiCallStats(
125                     Constants.API_NAME_FEDERATED_COMPUTE_SCHEDULE,
126                     System.currentTimeMillis() - startTimeMillis,
127                     responseCode);
128         }
129     }
130 
131     /**
132      * Schedules a federated compute job. In {@link IsolatedService#onRequest}, the app can call
133      * {@link IsolatedService#getFederatedComputeScheduler} to pass the scheduler when constructing
134      * the {@link IsolatedWorker}.
135      *
136      * @param federatedComputeScheduleRequest input parameters related to job scheduling.
137      * @param executor the {@link Executor} on which to invoke the callback.
138      * @param outcomeReceiver This either returns a {@link FederatedComputeScheduleResponse} on
139      *     success, or {@link Exception} on failure. The exception type is {@link
140      *     OnDevicePersonalizationException} with error code {@link
141      *     OnDevicePersonalizationException#ERROR_INVALID_TRAINING_MANIFEST} if the manifest is
142      *     missing the federated compute server URL or {@link
143      *     OnDevicePersonalizationException#ERROR_SCHEDULE_TRAINING_FAILED} when scheduling fails
144      *     for other reasons.
145      */
146     @WorkerThread
147     @FlaggedApi(Flags.FLAG_FCP_SCHEDULE_WITH_OUTCOME_RECEIVER_ENABLED)
schedule( @onNull FederatedComputeScheduleRequest federatedComputeScheduleRequest, @NonNull @CallbackExecutor Executor executor, @NonNull OutcomeReceiver<FederatedComputeScheduleResponse, Exception> outcomeReceiver)148     public void schedule(
149             @NonNull FederatedComputeScheduleRequest federatedComputeScheduleRequest,
150             @NonNull @CallbackExecutor Executor executor,
151             @NonNull OutcomeReceiver<FederatedComputeScheduleResponse, Exception> outcomeReceiver) {
152         if (mFcService == null) {
153             logApiCallStats(
154                     Constants.API_NAME_FEDERATED_COMPUTE_SCHEDULE,
155                     0,
156                     Constants.STATUS_INTERNAL_ERROR);
157             executor.execute(
158                     () -> {
159                         outcomeReceiver.onError(new IllegalStateException(
160                                 "FederatedComputeScheduler not available for this instance."));
161                     });
162         }
163 
164         final long startTimeMillis = System.currentTimeMillis();
165         TrainingOptions trainingOptions =
166                 new TrainingOptions.Builder()
167                         .setPopulationName(federatedComputeScheduleRequest.getPopulationName())
168                         .setTrainingInterval(
169                                 convertTrainingInterval(
170                                         federatedComputeScheduleRequest
171                                                 .getParams()
172                                                 .getTrainingInterval()))
173                         .build();
174         try {
175             mFcService.schedule(
176                     trainingOptions,
177                     new IFederatedComputeCallback.Stub() {
178                         @Override
179                         public void onSuccess() {
180                             logApiCallStats(
181                                     Constants.API_NAME_FEDERATED_COMPUTE_SCHEDULE,
182                                     System.currentTimeMillis() - startTimeMillis,
183                                     Constants.STATUS_SUCCESS);
184                             executor.execute(
185                                     () -> {
186                                         outcomeReceiver.onResult(
187                                                 new FederatedComputeScheduleResponse(
188                                                         federatedComputeScheduleRequest));
189                                     });
190                         }
191 
192                         @Override
193                         public void onFailure(int errorCode) {
194                             logApiCallStats(
195                                     Constants.API_NAME_FEDERATED_COMPUTE_SCHEDULE,
196                                     System.currentTimeMillis() - startTimeMillis,
197                                     errorCode);
198                             executor.execute(
199                                     () -> {
200                                         outcomeReceiver.onError(
201                                                 new OnDevicePersonalizationException(
202                                                         translateErrorCode(errorCode)));
203                                     });
204                         }
205                     });
206         } catch (RemoteException e) {
207             sLogger.e(TAG + ": Failed to schedule federated compute job", e);
208             logApiCallStats(
209                     Constants.API_NAME_FEDERATED_COMPUTE_SCHEDULE,
210                     System.currentTimeMillis() - startTimeMillis,
211                     Constants.STATUS_REMOTE_EXCEPTION);
212             executor.execute(
213                     () -> {
214                         outcomeReceiver.onError(e);
215                     });
216         }
217     }
218 
219     /**
220      * Translate the failed error code from the {@link IFederatedComputeService} to appropriate API
221      * surface error code.
222      */
translateErrorCode(int i)223     private static int translateErrorCode(int i) {
224         // Returns invalid/missing manifest or general error code to caller. The general error code
225         // includes personalization disable and all other errors populated from FCP service.
226         return i == Constants.STATUS_FCP_MANIFEST_INVALID
227                 ? OnDevicePersonalizationException.ERROR_INVALID_TRAINING_MANIFEST
228                 : OnDevicePersonalizationException.ERROR_SCHEDULE_TRAINING_FAILED;
229     }
230 
231     /**
232      * Cancels a federated compute job with input training params. In {@link
233      * IsolatedService#onRequest}, the app can call {@link
234      * IsolatedService#getFederatedComputeScheduler} to pass scheduler when constructing the {@link
235      * IsolatedWorker}.
236      *
237      * @param input the configuration of the federated compute. It should be consistent with the
238      *     federated compute server setup.
239      */
240     @WorkerThread
cancel(@onNull FederatedComputeInput input)241     public void cancel(@NonNull FederatedComputeInput input) {
242         final long startTimeMillis = System.currentTimeMillis();
243         int responseCode = Constants.STATUS_INTERNAL_ERROR;
244         if (mFcService == null) {
245             logApiCallStats(
246                     Constants.API_NAME_FEDERATED_COMPUTE_CANCEL,
247                     System.currentTimeMillis() - startTimeMillis,
248                     responseCode);
249             throw new IllegalStateException(
250                     "FederatedComputeScheduler not available for this instance.");
251         }
252         CountDownLatch latch = new CountDownLatch(1);
253         final int[] err = {0};
254         try {
255             mFcService.cancel(
256                     input.getPopulationName(),
257                     new IFederatedComputeCallback.Stub() {
258                         @Override
259                         public void onSuccess() {
260                             latch.countDown();
261                         }
262 
263                         @Override
264                         public void onFailure(int errorCode) {
265                             err[0] = errorCode;
266                             latch.countDown();
267                         }
268                     });
269             boolean countedDown =
270                     latch.await(FEDERATED_COMPUTE_SCHEDULE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
271             if (err[0] != 0) {
272                 sLogger.e("Internal failure occurred while cancelling job, error code %d", err[0]);
273                 responseCode = Constants.STATUS_INTERNAL_ERROR;
274                 // Fail silently for now. TODO(b/346827691): update schedule/cancel API to return
275                 // error status to caller.
276                 return;
277             } else if (!countedDown) {
278                 sLogger.d(TAG + " : timed out waiting for cancel operation to complete.");
279                 responseCode = Constants.STATUS_INTERNAL_ERROR;
280                 return;
281             }
282             responseCode = Constants.STATUS_SUCCESS;
283         } catch (RemoteException | InterruptedException e) {
284             sLogger.e(TAG + ": Failed to cancel federated compute job", e);
285             throw new IllegalStateException(e);
286         } finally {
287             logApiCallStats(
288                     Constants.API_NAME_FEDERATED_COMPUTE_CANCEL,
289                     System.currentTimeMillis() - startTimeMillis,
290                     responseCode);
291         }
292     }
293 
convertTrainingInterval( TrainingInterval interval)294     private static android.federatedcompute.common.TrainingInterval convertTrainingInterval(
295             TrainingInterval interval) {
296         return new android.federatedcompute.common.TrainingInterval.Builder()
297                 .setMinimumIntervalMillis(interval.getMinimumInterval().toMillis())
298                 .setSchedulingMode(convertSchedulingMode(interval))
299                 .build();
300     }
301 
302     private static @android.federatedcompute.common.TrainingInterval.SchedulingMode int
convertSchedulingMode(TrainingInterval interval)303             convertSchedulingMode(TrainingInterval interval) {
304         switch (interval.getSchedulingMode()) {
305             case TrainingInterval.SCHEDULING_MODE_ONE_TIME:
306                 return android.federatedcompute.common.TrainingInterval.SCHEDULING_MODE_ONE_TIME;
307             case TrainingInterval.SCHEDULING_MODE_RECURRENT:
308                 return android.federatedcompute.common.TrainingInterval.SCHEDULING_MODE_RECURRENT;
309             default:
310                 throw new IllegalStateException(
311                         "Unsupported scheduling mode " + interval.getSchedulingMode());
312         }
313     }
314 
315     /** Helper method to log call stats based on response code. */
logApiCallStats(int apiName, long duration, int responseCode)316     private void logApiCallStats(int apiName, long duration, int responseCode) {
317         try {
318             mDataAccessService.logApiCallStats(apiName, duration, responseCode);
319         } catch (Exception e) {
320             sLogger.d(e, TAG + ": failed to log metrics");
321         }
322     }
323 
324     /** The parameters related to job scheduling. */
325     public static class Params {
326         /**
327          * If training interval is scheduled for recurrent tasks, the earliest time this task could
328          * start is after the minimum training interval expires. E.g. If the task is set to run
329          * maximum once per day, the first run of this task will be one day after this task is
330          * scheduled. When a one time job is scheduled, the earliest next runtime is calculated
331          * based on federated compute default interval.
332          */
333         @NonNull private final TrainingInterval mTrainingInterval;
334 
Params(@onNull TrainingInterval trainingInterval)335         public Params(@NonNull TrainingInterval trainingInterval) {
336             mTrainingInterval = trainingInterval;
337         }
338 
339         @NonNull
getTrainingInterval()340         public TrainingInterval getTrainingInterval() {
341             return mTrainingInterval;
342         }
343     }
344 }
345