1 /* 2 * Copyright 2019 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package androidx.camera.core.impl; 18 19 import android.os.SystemClock; 20 21 import androidx.annotation.GuardedBy; 22 import androidx.camera.core.impl.utils.executor.CameraXExecutors; 23 import androidx.concurrent.futures.CallbackToFutureAdapter; 24 import androidx.core.util.Preconditions; 25 import androidx.lifecycle.LiveData; 26 import androidx.lifecycle.MutableLiveData; 27 28 import com.google.common.util.concurrent.ListenableFuture; 29 30 import org.jspecify.annotations.NonNull; 31 import org.jspecify.annotations.Nullable; 32 33 import java.util.HashMap; 34 import java.util.Map; 35 import java.util.concurrent.Executor; 36 import java.util.concurrent.atomic.AtomicBoolean; 37 38 /** 39 * An observable implemented using {@link LiveData}. 40 * 41 * <p>While this class can provide error reporting, it is prone to other issues. First, all updates 42 * will originate from the main thread before being sent to the observer's executor. Second, there 43 * exists the possibility of error and value elision. This means that some posted values and some 44 * posted errors may be ignored if a newer error/value is posted before the observers can be 45 * updated. If it is important for observers to receive all updates, then this class should not be 46 * used. 47 * 48 * @param <T> The data type used for 49 * {@link Observable.Observer#onNewData(Object)}. 50 */ 51 public final class LiveDataObservable<T> implements Observable<T> { 52 53 54 @SuppressWarnings("WeakerAccess") /* synthetic accessor */ 55 final MutableLiveData<Result<T>> mLiveData = new MutableLiveData<>(); 56 @GuardedBy("mObservers") 57 private final Map<Observer<? super T>, LiveDataObserverAdapter<T>> mObservers = new HashMap<>(); 58 59 /** 60 * Posts a new value to be used as the current value of this Observable. 61 */ postValue(@ullable T value)62 public void postValue(@Nullable T value) { 63 mLiveData.postValue(Result.fromValue(value)); 64 } 65 66 /** 67 * Posts a new error to be used as the current error state of this Observable. 68 */ postError(@onNull Throwable error)69 public void postError(@NonNull Throwable error) { 70 mLiveData.postValue(Result.fromError(error)); 71 } 72 73 /** 74 * Returns the underlying {@link LiveData} used to store and update {@link Result Results}. 75 */ getLiveData()76 public @NonNull LiveData<Result<T>> getLiveData() { 77 return mLiveData; 78 } 79 80 @Override 81 @SuppressWarnings("ObjectToString") fetchData()82 public @NonNull ListenableFuture<T> fetchData() { 83 return CallbackToFutureAdapter.getFuture(completer -> { 84 CameraXExecutors.mainThreadExecutor().execute(() -> { 85 Result<T> result = mLiveData.getValue(); 86 if (result == null) { 87 completer.setException(new IllegalStateException( 88 "Observable has not yet been initialized with a value.")); 89 } else if (result.completedSuccessfully()) { 90 completer.set(result.getValue()); 91 } else { 92 Preconditions.checkNotNull(result.getError()); 93 completer.setException(result.getError()); 94 } 95 }); 96 97 return LiveDataObservable.this + " [fetch@" + SystemClock.uptimeMillis() + "]"; 98 }); 99 } 100 101 @Override 102 public void addObserver(@NonNull Executor executor, @NonNull Observer<? super T> observer) { 103 synchronized (mObservers) { 104 final LiveDataObserverAdapter<T> oldAdapter = mObservers.get(observer); 105 if (oldAdapter != null) { 106 oldAdapter.disable(); 107 } 108 109 final LiveDataObserverAdapter<T> newAdapter = new LiveDataObserverAdapter<>(executor, 110 observer); 111 mObservers.put(observer, newAdapter); 112 113 CameraXExecutors.mainThreadExecutor().execute(() -> { 114 if (oldAdapter != null) { 115 mLiveData.removeObserver(oldAdapter); 116 } 117 mLiveData.observeForever(newAdapter); 118 }); 119 } 120 } 121 122 @Override 123 public void removeObserver(@NonNull Observer<? super T> observer) { 124 synchronized (mObservers) { 125 LiveDataObserverAdapter<T> adapter = mObservers.remove(observer); 126 127 if (adapter != null) { 128 adapter.disable(); 129 CameraXExecutors.mainThreadExecutor().execute( 130 () -> mLiveData.removeObserver(adapter)); 131 } 132 } 133 } 134 135 /** 136 * A wrapper class that allows error reporting. 137 * 138 * A Result can contain either a value or an error, but not both. 139 * 140 * @param <T> The data type used for 141 * {@link Observable.Observer#onNewData(Object)}. 142 */ 143 public static final class Result<T> { 144 private final @Nullable T mValue; 145 private final @Nullable Throwable mError; 146 147 private Result(@Nullable T value, @Nullable Throwable error) { 148 mValue = value; 149 mError = error; 150 } 151 152 /** 153 * Creates a successful result that contains a value. 154 */ 155 static <T> Result<T> fromValue(@Nullable T value) { 156 return new Result<>(value, null); 157 } 158 159 /** 160 * Creates a failed result that contains an error. 161 */ 162 static <T> Result<T> fromError(@NonNull Throwable error) { 163 return new Result<>(null, Preconditions.checkNotNull(error)); 164 } 165 166 /** 167 * Returns whether this result contains a value or an error. 168 * 169 * <p>A successful result will contain a value. 170 */ 171 public boolean completedSuccessfully() { 172 return mError == null; 173 } 174 175 /** 176 * Returns the value contained within this result. 177 * 178 * @throws IllegalStateException if the result contains an error rather than a value. 179 */ 180 public @Nullable T getValue() { 181 if (!completedSuccessfully()) { 182 throw new IllegalStateException( 183 "Result contains an error. Does not contain a value."); 184 } 185 186 return mValue; 187 } 188 189 /** 190 * Returns the error contained within this result, or {@code null} if the result contains 191 * a value. 192 */ 193 public @Nullable Throwable getError() { 194 return mError; 195 } 196 197 @Override 198 public @NonNull String toString() { 199 return "[Result: <" + (completedSuccessfully() ? "Value: " + mValue : 200 "Error: " + mError) + ">]"; 201 } 202 } 203 204 private static final class LiveDataObserverAdapter<T> implements 205 androidx.lifecycle.Observer<Result<T>> { 206 207 final AtomicBoolean mActive = new AtomicBoolean(true); 208 final Observer<? super T> mObserver; 209 final Executor mExecutor; 210 211 LiveDataObserverAdapter(@NonNull Executor executor, @NonNull Observer<? super T> observer) { 212 mExecutor = executor; 213 mObserver = observer; 214 } 215 216 void disable() { 217 mActive.set(false); 218 } 219 220 @Override 221 public void onChanged(final @NonNull Result<T> result) { 222 mExecutor.execute(() -> { 223 if (!mActive.get()) { 224 // Observer has been disabled. 225 return; 226 } 227 228 if (result.completedSuccessfully()) { 229 mObserver.onNewData(result.getValue()); 230 } else { 231 Preconditions.checkNotNull(result.getError()); 232 mObserver.onError(result.getError()); 233 } 234 }); 235 } 236 } 237 } 238