/*
* Copyright (C) 2016 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.android.modules.utils;
import android.annotation.NonNull;
import android.annotation.Nullable;
import android.os.Handler;
import android.os.Parcel;
import android.os.Parcelable;
import android.os.RemoteException;
import android.os.SystemClock;
import android.util.Log;
import com.android.internal.annotations.GuardedBy;
import java.io.Serializable;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Generic interface for receiving a callback result from someone.
* Allow the server end to synchronously wait on the response from the client.
* This enables an RPC like system but with the ability to timeout and discard late results.
*
*
NOTE: Use the static {@link #get} method to retrieve an available instance of this class.
* If no instances are available, a new one is created.
*/
public final class SynchronousResultReceiver implements Parcelable {
private static final String TAG = "SynchronousResultReceiver";
private final boolean mLocal;
private boolean mIsCompleted;
private final static Object sLock = new Object();
private final static int QUEUE_THRESHOLD = 4;
@GuardedBy("sLock")
private CompletableFuture> mFuture = new CompletableFuture<>();
@GuardedBy("sLock")
private static final ConcurrentLinkedQueue sAvailableReceivers
= new ConcurrentLinkedQueue<>();
public static SynchronousResultReceiver get() {
synchronized(sLock) {
if (sAvailableReceivers.isEmpty()) {
return new SynchronousResultReceiver();
}
SynchronousResultReceiver receiver = sAvailableReceivers.poll();
receiver.resetLocked();
return receiver;
}
}
private SynchronousResultReceiver() {
mLocal = true;
mIsCompleted = false;
}
@GuardedBy("sLock")
private void releaseLocked() {
mFuture = null;
if (sAvailableReceivers.size() < QUEUE_THRESHOLD) {
sAvailableReceivers.add(this);
}
}
@GuardedBy("sLock")
private void resetLocked() {
mFuture = new CompletableFuture<>();
mIsCompleted = false;
}
private CompletableFuture> getFuture() {
synchronized (sLock) {
return mFuture;
}
}
public static class Result implements Parcelable {
private final @Nullable T mObject;
private final RuntimeException mException;
public Result(RuntimeException exception) {
mObject = null;
mException = exception;
}
public Result(@Nullable T object) {
mObject = object;
mException = null;
}
/**
* Return the stored value
* May throw a {@link RuntimeException} thrown from the client
*/
public T getValue(T defaultValue) {
if (mException != null) {
throw mException;
}
if (mObject == null) {
return defaultValue;
}
return mObject;
}
public int describeContents() {
return 0;
}
public void writeToParcel(@NonNull Parcel out, int flags) {
out.writeValue(mObject);
out.writeValue(mException);
}
private Result(Parcel in) {
mObject = (T)in.readValue(null);
mException= (RuntimeException)in.readValue(null);
}
public static final @NonNull Parcelable.Creator> CREATOR =
new Parcelable.Creator>() {
public Result createFromParcel(Parcel in) {
return new Result(in);
}
public Result[] newArray(int size) {
return new Result[size];
}
};
}
private void complete(Result result) {
if (mIsCompleted) {
throw new IllegalStateException("Receiver has already been completed");
}
mIsCompleted = true;
if (mLocal) {
getFuture().complete(result);
} else {
final ISynchronousResultReceiver rr;
synchronized (this) {
rr = mReceiver;
}
if (rr != null) {
try {
rr.send(result);
} catch (RemoteException e) {
Log.w(TAG, "Failed to complete future");
}
}
}
}
/**
* Deliver a result to this receiver.
*
* @param resultData Additional data provided by you.
*/
public void send(@Nullable T resultData) {
complete(new Result<>(resultData));
}
/**
* Deliver an {@link Exception} to this receiver
*
* @param e exception to be sent
*/
public void propagateException(@NonNull RuntimeException e) {
Objects.requireNonNull(e, "RuntimeException cannot be null");
complete(new Result<>(e));
}
/**
* Blocks waiting for the result from the remote client.
*
* If it is interrupted before completion of the duration, wait again with remaining time until
* the deadline.
*
* @param timeout The duration to wait before sending a {@link TimeoutException}
* @return the Result
* @throws TimeoutException if the timeout in milliseconds expired.
*/
public @NonNull Result awaitResultNoInterrupt(@NonNull Duration timeout)
throws TimeoutException {
Objects.requireNonNull(timeout, "Null timeout is not allowed");
final long startWaitNanoTime = SystemClock.elapsedRealtimeNanos();
Duration remainingTime = timeout;
while (!remainingTime.isNegative()) {
try {
Result result = getFuture().get(remainingTime.toMillis(), TimeUnit.MILLISECONDS);
synchronized (sLock) {
releaseLocked();
return result;
}
} catch (ExecutionException e) {
// This will NEVER happen.
throw new AssertionError("Error receiving response", e);
} catch (InterruptedException e) {
// The thread was interrupted, try and get the value again, this time
// with the remaining time until the deadline.
remainingTime = timeout.minus(
Duration.ofNanos(SystemClock.elapsedRealtimeNanos() - startWaitNanoTime));
}
}
synchronized (sLock) {
releaseLocked();
}
throw new TimeoutException();
}
ISynchronousResultReceiver mReceiver = null;
private final class MyResultReceiver extends ISynchronousResultReceiver.Stub {
public void send(@SuppressWarnings("rawtypes") @NonNull Result result) {
@SuppressWarnings("unchecked") Result res = (Result) result;
CompletableFuture> future;
future = getFuture();
if (future != null) {
future.complete(res);
}
}
}
public int describeContents() {
return 0;
}
public void writeToParcel(@NonNull Parcel out, int flags) {
synchronized (this) {
if (mReceiver == null) {
mReceiver = new MyResultReceiver();
}
out.writeStrongBinder(mReceiver.asBinder());
}
}
private SynchronousResultReceiver(Parcel in) {
mLocal = false;
mIsCompleted = false;
mReceiver = ISynchronousResultReceiver.Stub.asInterface(in.readStrongBinder());
}
public static final @NonNull Parcelable.Creator> CREATOR =
new Parcelable.Creator>() {
public SynchronousResultReceiver> createFromParcel(Parcel in) {
return new SynchronousResultReceiver(in);
}
public SynchronousResultReceiver>[] newArray(int size) {
return new SynchronousResultReceiver[size];
}
};
}