1 /*
2  * Copyright 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package androidx.camera.core.impl;
18 
19 import android.os.SystemClock;
20 
21 import androidx.annotation.GuardedBy;
22 import androidx.camera.core.impl.utils.executor.CameraXExecutors;
23 import androidx.concurrent.futures.CallbackToFutureAdapter;
24 import androidx.core.util.Preconditions;
25 import androidx.lifecycle.LiveData;
26 import androidx.lifecycle.MutableLiveData;
27 
28 import com.google.common.util.concurrent.ListenableFuture;
29 
30 import org.jspecify.annotations.NonNull;
31 import org.jspecify.annotations.Nullable;
32 
33 import java.util.HashMap;
34 import java.util.Map;
35 import java.util.concurrent.Executor;
36 import java.util.concurrent.atomic.AtomicBoolean;
37 
38 /**
39  * An observable implemented using {@link LiveData}.
40  *
41  * <p>While this class can provide error reporting, it is prone to other issues. First, all updates
42  * will originate from the main thread before being sent to the observer's executor. Second, there
43  * exists the possibility of error and value elision. This means that some posted values and some
44  * posted errors may be ignored if a newer error/value is posted before the observers can be
45  * updated. If it is important for observers to receive all updates, then this class should not be
46  * used.
47  *
48  * @param <T> The data type used for
49  *            {@link Observable.Observer#onNewData(Object)}.
50  */
51 public final class LiveDataObservable<T> implements Observable<T> {
52 
53 
54     @SuppressWarnings("WeakerAccess") /* synthetic accessor */
55     final MutableLiveData<Result<T>> mLiveData = new MutableLiveData<>();
56     @GuardedBy("mObservers")
57     private final Map<Observer<? super T>, LiveDataObserverAdapter<T>> mObservers = new HashMap<>();
58 
59     /**
60      * Posts a new value to be used as the current value of this Observable.
61      */
postValue(@ullable T value)62     public void postValue(@Nullable T value) {
63         mLiveData.postValue(Result.fromValue(value));
64     }
65 
66     /**
67      * Posts a new error to be used as the current error state of this Observable.
68      */
postError(@onNull Throwable error)69     public void postError(@NonNull Throwable error) {
70         mLiveData.postValue(Result.fromError(error));
71     }
72 
73     /**
74      * Returns the underlying {@link LiveData} used to store and update {@link Result Results}.
75      */
getLiveData()76     public @NonNull LiveData<Result<T>> getLiveData() {
77         return mLiveData;
78     }
79 
80     @Override
81     @SuppressWarnings("ObjectToString")
fetchData()82     public @NonNull ListenableFuture<T> fetchData() {
83         return CallbackToFutureAdapter.getFuture(completer -> {
84             CameraXExecutors.mainThreadExecutor().execute(() -> {
85                 Result<T> result = mLiveData.getValue();
86                 if (result == null) {
87                     completer.setException(new IllegalStateException(
88                             "Observable has not yet been initialized with a value."));
89                 } else if (result.completedSuccessfully()) {
90                     completer.set(result.getValue());
91                 } else {
92                     Preconditions.checkNotNull(result.getError());
93                     completer.setException(result.getError());
94                 }
95             });
96 
97             return LiveDataObservable.this + " [fetch@" + SystemClock.uptimeMillis() + "]";
98         });
99     }
100 
101     @Override
102     public void addObserver(@NonNull Executor executor, @NonNull Observer<? super T> observer) {
103         synchronized (mObservers) {
104             final LiveDataObserverAdapter<T> oldAdapter = mObservers.get(observer);
105             if (oldAdapter != null) {
106                 oldAdapter.disable();
107             }
108 
109             final LiveDataObserverAdapter<T> newAdapter = new LiveDataObserverAdapter<>(executor,
110                     observer);
111             mObservers.put(observer, newAdapter);
112 
113             CameraXExecutors.mainThreadExecutor().execute(() -> {
114                 if (oldAdapter != null) {
115                     mLiveData.removeObserver(oldAdapter);
116                 }
117                 mLiveData.observeForever(newAdapter);
118             });
119         }
120     }
121 
122     @Override
123     public void removeObserver(@NonNull Observer<? super T> observer) {
124         synchronized (mObservers) {
125             LiveDataObserverAdapter<T> adapter = mObservers.remove(observer);
126 
127             if (adapter != null) {
128                 adapter.disable();
129                 CameraXExecutors.mainThreadExecutor().execute(
130                         () -> mLiveData.removeObserver(adapter));
131             }
132         }
133     }
134 
135     /**
136      * A wrapper class that allows error reporting.
137      *
138      * A Result can contain either a value or an error, but not both.
139      *
140      * @param <T> The data type used for
141      *            {@link Observable.Observer#onNewData(Object)}.
142      */
143     public static final class Result<T> {
144         private final @Nullable T mValue;
145         private final @Nullable Throwable mError;
146 
147         private Result(@Nullable T value, @Nullable Throwable error) {
148             mValue = value;
149             mError = error;
150         }
151 
152         /**
153          * Creates a successful result that contains a value.
154          */
155         static <T> Result<T> fromValue(@Nullable T value) {
156             return new Result<>(value, null);
157         }
158 
159         /**
160          * Creates a failed result that contains an error.
161          */
162         static <T> Result<T> fromError(@NonNull Throwable error) {
163             return new Result<>(null, Preconditions.checkNotNull(error));
164         }
165 
166         /**
167          * Returns whether this result contains a value or an error.
168          *
169          * <p>A successful result will contain a value.
170          */
171         public boolean completedSuccessfully() {
172             return mError == null;
173         }
174 
175         /**
176          * Returns the value contained within this result.
177          *
178          * @throws IllegalStateException if the result contains an error rather than a value.
179          */
180         public @Nullable T getValue() {
181             if (!completedSuccessfully()) {
182                 throw new IllegalStateException(
183                         "Result contains an error. Does not contain a value.");
184             }
185 
186             return mValue;
187         }
188 
189         /**
190          * Returns the error contained within this result, or {@code null} if the result contains
191          * a value.
192          */
193         public @Nullable Throwable getError() {
194             return mError;
195         }
196 
197         @Override
198         public @NonNull String toString() {
199             return "[Result: <" + (completedSuccessfully() ? "Value: " + mValue :
200                     "Error: " + mError) + ">]";
201         }
202     }
203 
204     private static final class LiveDataObserverAdapter<T> implements
205             androidx.lifecycle.Observer<Result<T>> {
206 
207         final AtomicBoolean mActive = new AtomicBoolean(true);
208         final Observer<? super T> mObserver;
209         final Executor mExecutor;
210 
211         LiveDataObserverAdapter(@NonNull Executor executor, @NonNull Observer<? super T> observer) {
212             mExecutor = executor;
213             mObserver = observer;
214         }
215 
216         void disable() {
217             mActive.set(false);
218         }
219 
220         @Override
221         public void onChanged(final @NonNull Result<T> result) {
222             mExecutor.execute(() -> {
223                 if (!mActive.get()) {
224                     // Observer has been disabled.
225                     return;
226                 }
227 
228                 if (result.completedSuccessfully()) {
229                     mObserver.onNewData(result.getValue());
230                 } else {
231                     Preconditions.checkNotNull(result.getError());
232                     mObserver.onError(result.getError());
233                 }
234             });
235         }
236     }
237 }
238