• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package android.nearby.fastpair.provider.simulator.testing;
18 
19 import static com.google.common.base.Preconditions.checkState;
20 
21 import android.util.Log;
22 
23 import com.google.common.util.concurrent.FutureCallback;
24 import com.google.common.util.concurrent.Futures;
25 import com.google.common.util.concurrent.ListenableFuture;
26 import com.google.common.util.concurrent.ListeningExecutorService;
27 import com.google.common.util.concurrent.MoreExecutors;
28 import com.google.protobuf.ByteString;
29 
30 import java.io.IOException;
31 import java.util.HashMap;
32 import java.util.Map;
33 import java.util.concurrent.Callable;
34 import java.util.concurrent.Executors;
35 
36 /**
37  * Manages the IO streams with remote devices.
38  *
39  * <p>The caller must invoke {@link #registerRemoteDevice} before starting to communicate with the
40  * remote device, and invoke {@link #unregisterRemoteDevice} after finishing tasks. If this instance
41  * is not used anymore, the caller need to invoke {@link #destroy} to release all resources.
42  *
43  * <p>All of the methods are thread-safe.
44  */
45 public class RemoteDevicesManager {
46     private static final String TAG = "RemoteDevicesManager";
47 
48     private final Map<String, RemoteDevice> mRemoteDeviceMap = new HashMap<>();
49     private final ListeningExecutorService mBackgroundExecutor =
50             MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
51     private final ListeningExecutorService mListenInputStreamExecutors =
52             MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
53     private final Map<String, ListenableFuture<Void>> mListeningTaskMap = new HashMap<>();
54 
55     /**
56      * Opens input and output data streams for {@code remoteDevice} in the background and notifies
57      * the
58      * open result via {@code callback}, and assigns a dedicated executor to listen the input data
59      * stream if data streams are opened successfully. The dedicated executor will invoke the
60      * {@code
61      * remoteDevice.inputStreamListener().onInputData()} directly if the new data exists in the
62      * input
63      * stream and invoke the {@code remoteDevice.inputStreamListener().onClose()} if the input
64      * stream
65      * is closed.
66      */
registerRemoteDevice(String id, RemoteDevice remoteDevice)67     public synchronized void registerRemoteDevice(String id, RemoteDevice remoteDevice) {
68         checkState(mRemoteDeviceMap.put(id, remoteDevice) == null,
69                 "The %s is already registered", id);
70         startListeningInputStreamTask(remoteDevice);
71     }
72 
73     /**
74      * Closes the data streams for specific remote device {@code id} in the background and notifies
75      * the result via {@code callback}.
76      */
unregisterRemoteDevice(String id)77     public synchronized void unregisterRemoteDevice(String id) {
78         RemoteDevice remoteDevice = mRemoteDeviceMap.remove(id);
79         checkState(remoteDevice != null, "The %s is not registered", id);
80         if (mListeningTaskMap.containsKey(id)) {
81             mListeningTaskMap.remove(id).cancel(/* mayInterruptIfRunning= */ true);
82         }
83     }
84 
85     /** Closes all data streams of registered remote devices and stop all background tasks. */
destroy()86     public synchronized void destroy() {
87         mRemoteDeviceMap.clear();
88         mListeningTaskMap.clear();
89         mListenInputStreamExecutors.shutdownNow();
90     }
91 
92     /**
93      * Writes {@code data} into the output data stream of specific remote device {@code id} in the
94      * background and notifies the result via {@code callback}.
95      */
writeDataToRemoteDevice( String id, ByteString data, FutureCallback<Void> callback)96     public synchronized void writeDataToRemoteDevice(
97             String id, ByteString data, FutureCallback<Void> callback) {
98         RemoteDevice remoteDevice = mRemoteDeviceMap.get(id);
99         checkState(remoteDevice != null, "The %s is not registered", id);
100 
101         runInBackground(() -> {
102             remoteDevice.getStreamIOHandler().write(data);
103             return null;
104         }, callback);
105     }
106 
runInBackground(Callable<Void> callable, FutureCallback<Void> callback)107     private void runInBackground(Callable<Void> callable, FutureCallback<Void> callback) {
108         Futures.addCallback(
109                 mBackgroundExecutor.submit(callable), callback, MoreExecutors.directExecutor());
110     }
111 
startListeningInputStreamTask(RemoteDevice remoteDevice)112     private void startListeningInputStreamTask(RemoteDevice remoteDevice) {
113         ListenableFuture<Void> listenFuture = mListenInputStreamExecutors.submit(() -> {
114             Log.i(TAG, "Start listening " + remoteDevice.getId());
115             while (true) {
116                 ByteString data;
117                 try {
118                     data = remoteDevice.getStreamIOHandler().read();
119                 } catch (IOException | IllegalStateException e) {
120                     break;
121                 }
122                 remoteDevice.getInputStreamListener().onInputData(data);
123             }
124         }, /* result= */ null);
125         Futures.addCallback(listenFuture, new FutureCallback<Void>() {
126             @Override
127             public void onSuccess(Void result) {
128                 Log.i(TAG, "Stop listening " + remoteDevice.getId());
129                 remoteDevice.getInputStreamListener().onClose();
130             }
131 
132             @Override
133             public void onFailure(Throwable t) {
134                 Log.w(TAG, "Stop listening " + remoteDevice.getId() + ", cause: " + t);
135                 remoteDevice.getInputStreamListener().onClose();
136             }
137         }, MoreExecutors.directExecutor());
138         mListeningTaskMap.put(remoteDevice.getId(), listenFuture);
139     }
140 }
141