/* * Copyright (C) 2021 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.android.server.appsearch.contactsindexer; import static com.android.server.appsearch.indexer.IndexerMaintenanceConfig.CONTACTS_INDEXER; import android.annotation.NonNull; import android.annotation.Nullable; import android.app.appsearch.AppSearchEnvironmentFactory; import android.app.appsearch.AppSearchResult; import android.app.appsearch.util.LogUtil; import android.content.ContentResolver; import android.content.Context; import android.database.ContentObserver; import android.net.Uri; import android.os.CancellationSignal; import android.provider.ContactsContract; import android.util.Log; import android.util.Slog; import com.android.internal.annotations.GuardedBy; import com.android.internal.annotations.VisibleForTesting; import com.android.server.appsearch.indexer.IndexerMaintenanceService; import com.android.server.appsearch.stats.AppSearchStatsLog; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintWriter; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; /** * Contacts Indexer for a single user. * *
It reads the updated/newly-inserted/deleted contacts from CP2, and sync the changes into * AppSearch. * *
This class is thread safe. * * @hide */ public final class ContactsIndexerUserInstance { private static final String TAG = "ContactsIndexerUserInst"; private final Context mContext; private final File mDataDir; private final ContactsIndexerSettings mSettings; private final ContactsObserver mContactsObserver; // Those two booleans below are used for batching/throttling the contact change // notification so we won't schedule too many delta updates. private final Object mDeltaUpdateLock = new Object(); // Whether a delta update has been scheduled or run. Now we only allow one delta update being // run at a time. @GuardedBy("mDeltaUpdateLock") private boolean mDeltaUpdateScheduled = false; // Whether we are receiving notifications from CP2. @GuardedBy("mDeltaUpdateLock") private boolean mCp2ChangePending = false; private final AppSearchHelper mAppSearchHelper; private final ContactsIndexerImpl mContactsIndexerImpl; private final ContactsIndexerConfig mContactsIndexerConfig; /** * Single threaded executor to make sure there is only one active sync for this {@link * ContactsIndexerUserInstance}. Background tasks should be scheduled using {@link * #executeOnSingleThreadedExecutor(Runnable)} which ensures that they are not executed if the * executor is shutdown during {@link #shutdown()}. * *
Note that this executor is used as both work and callback executors by {@link * #mAppSearchHelper} which is fine because AppSearch should be able to handle exceptions thrown * by them. */ private final ExecutorService mSingleThreadedExecutor; /** * Constructs and initializes a {@link ContactsIndexerUserInstance}. * *
Heavy operations such as connecting to AppSearch are performed asynchronously.
*
* @param contactsDir data directory for ContactsIndexer.
*/
@NonNull
public static ContactsIndexerUserInstance createInstance(
@NonNull Context userContext,
@NonNull File contactsDir,
@NonNull ContactsIndexerConfig contactsIndexerConfig) {
Objects.requireNonNull(userContext);
Objects.requireNonNull(contactsDir);
Objects.requireNonNull(contactsIndexerConfig);
ExecutorService singleThreadedExecutor =
AppSearchEnvironmentFactory.getEnvironmentInstance().createSingleThreadExecutor();
return createInstance(
userContext, contactsDir, contactsIndexerConfig, singleThreadedExecutor);
}
@VisibleForTesting
@NonNull
/*package*/ static ContactsIndexerUserInstance createInstance(
@NonNull Context context,
@NonNull File contactsDir,
@NonNull ContactsIndexerConfig contactsIndexerConfig,
@NonNull ExecutorService executorService) {
Objects.requireNonNull(context);
Objects.requireNonNull(contactsDir);
Objects.requireNonNull(contactsIndexerConfig);
Objects.requireNonNull(executorService);
AppSearchHelper appSearchHelper =
AppSearchHelper.createAppSearchHelper(
context, executorService, contactsIndexerConfig);
ContactsIndexerUserInstance indexer =
new ContactsIndexerUserInstance(
context,
contactsDir,
appSearchHelper,
contactsIndexerConfig,
executorService);
indexer.loadSettingsAsync();
return indexer;
}
/**
* Constructs a {@link ContactsIndexerUserInstance}.
*
* @param context Context object passed from {@link ContactsIndexerManagerService}
* @param dataDir data directory for storing contacts indexer state.
* @param contactsIndexerConfig configuration for the Contacts Indexer.
* @param singleThreadedExecutor an {@link ExecutorService} with at most one thread to ensure
* the thread safety of this class.
*/
private ContactsIndexerUserInstance(
@NonNull Context context,
@NonNull File dataDir,
@NonNull AppSearchHelper appSearchHelper,
@NonNull ContactsIndexerConfig contactsIndexerConfig,
@NonNull ExecutorService singleThreadedExecutor) {
mContext = Objects.requireNonNull(context);
mDataDir = Objects.requireNonNull(dataDir);
mContactsIndexerConfig = Objects.requireNonNull(contactsIndexerConfig);
mSettings = new ContactsIndexerSettings(mDataDir);
mAppSearchHelper = Objects.requireNonNull(appSearchHelper);
mSingleThreadedExecutor = Objects.requireNonNull(singleThreadedExecutor);
mContactsObserver = new ContactsObserver();
mContactsIndexerImpl = new ContactsIndexerImpl(context, appSearchHelper);
}
public void startAsync() {
if (LogUtil.DEBUG) {
Log.d(TAG, "Registering ContactsObserver for " + mContext.getUser());
}
mContext.getContentResolver()
.registerContentObserver(
ContactsContract.Contacts.CONTENT_URI,
/* notifyForDescendants= */ true,
mContactsObserver);
executeOnSingleThreadedExecutor(
() -> {
mAppSearchHelper
.isDataLikelyWipedDuringInitAsync()
.thenCompose(
isDataLikelyWipedDuringInit -> {
if (isDataLikelyWipedDuringInit) {
mSettings.reset();
// Persist the settings right away just in case there is
// a crash later.
// In this case, the full update still need to be run
// during the next
// boot to reindex the data.
persistSettings();
}
doCp2SyncFirstRun();
// This value won't be used anymore, so return null here.
return CompletableFuture.completedFuture(null);
})
.exceptionally(e -> Log.w(TAG, "Got exception in startAsync", e));
});
}
public void shutdown() throws InterruptedException {
if (LogUtil.DEBUG) {
Log.d(TAG, "Unregistering ContactsObserver for " + mContext.getUser());
}
mContext.getContentResolver().unregisterContentObserver(mContactsObserver);
IndexerMaintenanceService.cancelUpdateJobIfScheduled(
mContext, mContext.getUser(), CONTACTS_INDEXER);
synchronized (mSingleThreadedExecutor) {
mSingleThreadedExecutor.shutdown();
}
boolean unused = mSingleThreadedExecutor.awaitTermination(30L, TimeUnit.SECONDS);
}
private class ContactsObserver extends ContentObserver {
public ContactsObserver() {
super(/* handler= */ null);
}
@Override
public void onChange(boolean selfChange, @NonNull Collection This handles the scenario where this contacts indexer instance has been started for the
* current device user for the first time or the background full update job is not scheduled.
* The full-update job which syncs all CP2 contacts is scheduled to run when the device is idle
* and its battery is not low. It can take several minutes or hours for these constraints to be
* met. Additionally, the delta-update job which runs on each CP2 change notification is
* designed to sync only the changed contacts because the user might be actively using the
* device at that time.
*
* Schedules a one-off full update job to sync all CP2 contacts when the device is idle. Also
* syncs a configurable number of CP2 contacts into the AppSearch Person corpus so that it's
* nominally functional.
*/
private void doCp2SyncFirstRun() {
// If this is not the first run of contacts indexer (lastFullUpdateTimestampMillis is not 0)
// for the given user and a full update job is scheduled, this means that contacts indexer
// has been running recently and contacts should be up to date. The initial sync can be
// skipped in this case.
// If the job is not scheduled but lastFullUpdateTimestampMillis is not 0, the contacts
// indexer was disabled before. We need to reschedule the job and run a limited delta update
// to bring latest contact change in AppSearch right away, after it is re-enabled.
if (mSettings.getLastFullUpdateTimestampMillis() != 0
&& IndexerMaintenanceService.isUpdateJobScheduled(
mContext, mContext.getUser(), CONTACTS_INDEXER)) {
return;
}
IndexerMaintenanceService.scheduleUpdateJob(
mContext,
mContext.getUser(),
CONTACTS_INDEXER,
/* periodic= */ false,
/* intervalMillis= */ -1);
// TODO(b/222126568): refactor doDeltaUpdateAsync() to return a future value of
// ContactsUpdateStats so that it can be checked and logged here, instead of the
// placeholder exceptionally() block that only logs to the console.
doDeltaUpdateAsync(
mContactsIndexerConfig.getContactsFirstRunIndexingLimit(),
new ContactsUpdateStats())
.exceptionally(
t -> {
if (LogUtil.DEBUG) {
Log.d(
TAG,
"Failed to bootstrap Person corpus with CP2 contacts",
t);
}
return null;
});
}
/**
* Performs a full sync of CP2 contacts to AppSearch builtin:Person corpus.
*
* @param signal Used to indicate if the full update task should be cancelled.
*/
public void doFullUpdateAsync(@Nullable CancellationSignal signal) {
executeOnSingleThreadedExecutor(
() -> {
ContactsUpdateStats updateStats = new ContactsUpdateStats();
doFullUpdateInternalAsync(signal, updateStats);
IndexerMaintenanceService.scheduleUpdateJob(
mContext,
mContext.getUser(),
CONTACTS_INDEXER,
/* periodic= */ true,
mContactsIndexerConfig.getContactsFullUpdateIntervalMillis());
});
}
/** Dumps the internal state of this {@link ContactsIndexerUserInstance}. */
public void dump(@NonNull PrintWriter pw, boolean verbose) {
// Those timestamps are not protected by any lock since in ContactsIndexerUserInstance
// we only have one thread to handle all the updates. It is possible we might run into
// race condition if there is an update running while those numbers are being printed.
// This is acceptable though for debug purpose, so still no lock here.
pw.println(
"last_full_update_timestamp_millis: "
+ mSettings.getLastFullUpdateTimestampMillis());
pw.println(
"last_delta_update_timestamp_millis: "
+ mSettings.getLastDeltaUpdateTimestampMillis());
pw.println(
"last_contact_update_timestamp_millis: "
+ mSettings.getLastContactUpdateTimestampMillis());
pw.println(
"last_contact_delete_timestamp_millis: "
+ mSettings.getLastContactDeleteTimestampMillis());
}
@VisibleForTesting
CompletableFuture {@link #mDeltaUpdateScheduled} is being used to avoid scheduling any update BEFORE an
* active update finishes.
*
* {@link #mSingleThreadedExecutor} is being used to make sure there is one and only one
* delta update can be scheduled and run.
*/
@VisibleForTesting
/*package*/ void handleDeltaUpdate() {
if (!ContentResolver.getCurrentSyncs().isEmpty()) {
// TODO(b/221905367): make sure that the delta update is scheduled as soon
// as the current sync is completed.
if (LogUtil.DEBUG) {
Log.v(TAG, "Deferring delta updates until the current sync is complete");
}
return;
}
synchronized (mDeltaUpdateLock) {
// Record that a CP2 change notification has been received, and will be handled
// by the next delta update task.
mCp2ChangePending = true;
scheduleDeltaUpdateLocked();
}
}
/**
* Schedule a delta update. No new delta update can be scheduled if there is one delta update
* already scheduled or currently being run.
*
* ATTENTION!!! This function needs to be light weight since it is being called by CP2 with a
* lock.
*/
@GuardedBy("mDeltaUpdateLock")
private void scheduleDeltaUpdateLocked() {
if (mDeltaUpdateScheduled) {
return;
}
mDeltaUpdateScheduled = true;
executeOnSingleThreadedExecutor(
() -> {
ContactsUpdateStats updateStats = new ContactsUpdateStats();
// TODO(b/226489369): apply instant indexing limit on CP2 changes also?
// TODO(b/222126568): refactor doDeltaUpdateAsync() to return a future value of
// ContactsUpdateStats so that it can be checked and logged here, instead of
// the
// placeholder exceptionally() block that only logs to the console.
doDeltaUpdateAsync(
mContactsIndexerConfig.getContactsDeltaUpdateLimit(),
updateStats)
.exceptionally(
t -> {
if (LogUtil.DEBUG) {
Log.d(TAG, "Failed to index CP2 change", t);
}
return null;
});
});
}
/**
* Does the delta update. It also resets {@link
* ContactsIndexerUserInstance#mDeltaUpdateScheduled} to false.
*/
@VisibleForTesting
/*package*/ CompletableFuture It doesn't throw here. If it fails to load file, ContactsIndexer would always use the
* timestamps persisted in the memory.
*/
private void loadSettingsAsync() {
executeOnSingleThreadedExecutor(
() -> {
boolean unused = mDataDir.mkdirs();
try {
mSettings.load();
} catch (IOException e) {
// Ignore file not found errors (bootstrap case)
if (!(e instanceof FileNotFoundException)) {
Log.w(TAG, "Failed to load settings from disk", e);
}
}
});
}
private void persistSettings() {
try {
mSettings.persist();
} catch (IOException e) {
Log.w(TAG, "Failed to save settings to disk", e);
}
}
/**
* Executes the given command on {@link #mSingleThreadedExecutor} if it is still alive.
*
* If the {@link #mSingleThreadedExecutor} has been shutdown, this method doesn't execute the
* given command, and returns silently. Specifically, it does not throw {@link
* java.util.concurrent.RejectedExecutionException}.
*
* @param command the runnable task
*/
private void executeOnSingleThreadedExecutor(Runnable command) {
synchronized (mSingleThreadedExecutor) {
if (mSingleThreadedExecutor.isShutdown()) {
Log.w(TAG, "Executor is shutdown, not executing task");
return;
}
mSingleThreadedExecutor.execute(
() -> {
try {
command.run();
} catch (RuntimeException e) {
Slog.wtf(
TAG,
"ContactsIndexerUserInstance"
+ ".executeOnSingleThreadedExecutor() failed ",
e);
}
});
}
}
}