1 /*
<lambda>null2 * Copyright (C) 2022 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package com.android.pandora
18
19 import android.bluetooth.BluetoothAdapter
20 import android.bluetooth.BluetoothDevice
21 import android.bluetooth.BluetoothManager
22 import android.bluetooth.BluetoothProfile
23 import android.content.BroadcastReceiver
24 import android.content.Context
25 import android.content.Intent
26 import android.content.IntentFilter
27 import android.media.*
28 import android.net.MacAddress
29 import android.os.ParcelFileDescriptor
30 import android.util.Log
31 import androidx.test.platform.app.InstrumentationRegistry
32 import com.google.protobuf.Any
33 import com.google.protobuf.ByteString
34 import io.grpc.stub.ServerCallStreamObserver
35 import io.grpc.stub.StreamObserver
36 import java.io.BufferedReader
37 import java.io.InputStreamReader
38 import java.util.concurrent.CancellationException
39 import java.util.stream.Collectors
40 import kotlinx.coroutines.CoroutineScope
41 import kotlinx.coroutines.Job
42 import kotlinx.coroutines.channels.Channel
43 import kotlinx.coroutines.channels.awaitClose
44 import kotlinx.coroutines.channels.trySendBlocking
45 import kotlinx.coroutines.flow.Flow
46 import kotlinx.coroutines.flow.callbackFlow
47 import kotlinx.coroutines.flow.catch
48 import kotlinx.coroutines.flow.consumeAsFlow
49 import kotlinx.coroutines.flow.first
50 import kotlinx.coroutines.flow.launchIn
51 import kotlinx.coroutines.flow.onCompletion
52 import kotlinx.coroutines.flow.onEach
53 import kotlinx.coroutines.launch
54 import kotlinx.coroutines.runBlocking
55 import kotlinx.coroutines.withTimeout
56 import kotlinx.coroutines.withTimeoutOrNull
57 import pandora.AndroidProto.InternalConnectionRef
58 import pandora.HostProto.Connection
59
60 private const val TAG = "PandoraUtils"
61 private val alphanumeric = ('A'..'Z') + ('a'..'z') + ('0'..'9')
62
63 fun shell(cmd: String): String {
64 val fd = InstrumentationRegistry.getInstrumentation().getUiAutomation().executeShellCommand(cmd)
65 val input_stream = ParcelFileDescriptor.AutoCloseInputStream(fd)
66 return BufferedReader(InputStreamReader(input_stream)).lines().collect(Collectors.joining("\n"))
67 }
68
69 /**
70 * Creates a cold flow of intents based on an intent filter. If used multiple times in a same class,
71 * this flow should be transformed into a shared flow.
72 *
73 * @param context context on which to register the broadcast receiver.
74 * @param intentFilter intent filter.
75 * @return cold flow.
76 */
77 @kotlinx.coroutines.ExperimentalCoroutinesApi
<lambda>null78 fun intentFlow(context: Context, intentFilter: IntentFilter) = callbackFlow {
79 val broadcastReceiver: BroadcastReceiver =
80 object : BroadcastReceiver() {
81 override fun onReceive(context: Context, intent: Intent) {
82 trySendBlocking(intent)
83 }
84 }
85 context.registerReceiver(broadcastReceiver, intentFilter)
86
87 awaitClose { context.unregisterReceiver(broadcastReceiver) }
88 }
89
90 /**
91 * Creates a gRPC coroutine in a given coroutine scope which executes a given suspended function
92 * returning a gRPC response and sends it on a given gRPC stream observer.
93 *
94 * @param T the type of gRPC response.
95 * @param scope coroutine scope used to run the coroutine.
96 * @param responseObserver the gRPC stream observer on which to send the response.
97 * @param timeout the duration in seconds after which the coroutine is automatically cancelled and
98 * returns a timeout error. Default: 60s.
99 * @param block the suspended function to execute to get the response.
100 * @return reference to the coroutine as a Job.
101 *
102 * Example usage:
103 * ```
104 * override fun grpcMethod(
105 * request: TypeOfRequest,
106 * responseObserver: StreamObserver<TypeOfResponse> {
107 * grpcUnary(scope, responseObserver) {
108 * block
109 * }
110 * }
111 * }
112 * ```
113 */
114 @kotlinx.coroutines.ExperimentalCoroutinesApi
grpcUnarynull115 fun <T> grpcUnary(
116 scope: CoroutineScope,
117 responseObserver: StreamObserver<T>,
118 timeout: Long = 60,
119 block: suspend () -> T
120 ): Job {
121 return scope.launch {
122 try {
123 val response = withTimeout(timeout * 1000) { block() }
124 responseObserver.onNext(response)
125 responseObserver.onCompleted()
126 } catch (e: Throwable) {
127 e.printStackTrace()
128 responseObserver.onError(e)
129 }
130 }
131 }
132
133 /**
134 * Creates a gRPC coroutine in a given coroutine scope which executes a given suspended function
135 * taking in a Flow of gRPC requests and returning a Flow of gRPC responses and sends it on a given
136 * gRPC stream observer.
137 *
138 * @param T the type of gRPC response.
139 * @param scope coroutine scope used to run the coroutine.
140 * @param responseObserver the gRPC stream observer on which to send the response.
141 * @param block the suspended function transforming the request Flow to the response Flow.
142 * @return a StreamObserver for the incoming requests.
143 *
144 * Example usage:
145 * ```
146 * override fun grpcMethod(
147 * responseObserver: StreamObserver<TypeOfResponse> {
148 * grpcBidirectionalStream(scope, responseObserver) {
149 * block
150 * }
151 * }
152 * }
153 * ```
154 */
155 @kotlinx.coroutines.ExperimentalCoroutinesApi
grpcBidirectionalStreamnull156 fun <T, U> grpcBidirectionalStream(
157 scope: CoroutineScope,
158 responseObserver: StreamObserver<U>,
159 block: CoroutineScope.(Flow<T>) -> Flow<U>
160 ): StreamObserver<T> {
161
162 val inputChannel = Channel<T>()
163
164 val job =
165 scope.launch {
166 block(inputChannel.consumeAsFlow())
167 .onEach { responseObserver.onNext(it) }
168 .onCompletion { error ->
169 if (error == null) {
170 responseObserver.onCompleted()
171 }
172 }
173 .catch {
174 it.printStackTrace()
175 responseObserver.onError(it)
176 }
177 .launchIn(this)
178 }
179
180 return object : StreamObserver<T> {
181 override fun onNext(req: T) {
182 // Note: this should be made a blocking call, and the handler should run in a separate thread
183 // so we get flow control - but for now we can live with this
184 if (inputChannel.trySend(req).isFailure) {
185 job.cancel(CancellationException("too many incoming requests, buffer exceeded"))
186 responseObserver.onError(
187 CancellationException("too many incoming requests, buffer exceeded")
188 )
189 }
190 }
191
192 override fun onCompleted() {
193 // stop the input flow, but keep the job running
194 inputChannel.close()
195 }
196
197 override fun onError(e: Throwable) {
198 job.cancel()
199 e.printStackTrace()
200 }
201 }
202 }
203
204 /**
205 * Creates a gRPC coroutine in a given coroutine scope which executes a given suspended function
206 * taking in a Flow of gRPC requests and returning a Flow of gRPC responses and sends it on a given
207 * gRPC stream observer.
208 *
209 * @param T the type of gRPC response.
210 * @param scope coroutine scope used to run the coroutine.
211 * @param responseObserver the gRPC stream observer on which to send the response.
212 * @param block the suspended function producing the response Flow.
213 * @return a StreamObserver for the incoming requests.
214 *
215 * Example usage:
216 * ```
217 * override fun grpcMethod(
218 * request: TypeOfRequest,
219 * responseObserver: StreamObserver<TypeOfResponse> {
220 * grpcServerStream(scope, responseObserver) {
221 * block
222 * }
223 * }
224 * }
225 * ```
226 */
227 @kotlinx.coroutines.ExperimentalCoroutinesApi
grpcServerStreamnull228 fun <T> grpcServerStream(
229 scope: CoroutineScope,
230 responseObserver: StreamObserver<T>,
231 block: CoroutineScope.() -> Flow<T>
232 ) {
233 val serverCallStreamObserver = responseObserver as ServerCallStreamObserver<T>
234
235 val job =
236 scope.launch {
237 block()
238 .onEach { responseObserver.onNext(it) }
239 .onCompletion { error ->
240 if (error == null) {
241 responseObserver.onCompleted()
242 }
243 }
244 .catch {
245 it.printStackTrace()
246 responseObserver.onError(it)
247 }
248 .launchIn(this)
249 }
250
251 serverCallStreamObserver.setOnCancelHandler { job.cancel() }
252 }
253
254 /**
255 * Synchronous method to get a Bluetooth profile proxy.
256 *
257 * @param T the type of profile proxy (e.g. BluetoothA2dp)
258 * @param context context
259 * @param bluetoothAdapter local Bluetooth adapter
260 * @param profile identifier of the Bluetooth profile (e.g. BluetoothProfile#A2DP)
261 * @return T the desired profile proxy
262 */
263 @Suppress("UNCHECKED_CAST")
264 @kotlinx.coroutines.ExperimentalCoroutinesApi
getProfileProxynull265 fun <T> getProfileProxy(context: Context, profile: Int): T {
266 var proxy: BluetoothProfile?
267 runBlocking {
268 val bluetoothManager = context.getSystemService(BluetoothManager::class.java)!!
269 val bluetoothAdapter = bluetoothManager.adapter
270
271 val flow = callbackFlow {
272 val serviceListener =
273 object : BluetoothProfile.ServiceListener {
274 override fun onServiceConnected(profile: Int, proxy: BluetoothProfile) {
275 trySendBlocking(proxy)
276 }
277 override fun onServiceDisconnected(profile: Int) {}
278 }
279
280 bluetoothAdapter.getProfileProxy(context, serviceListener, profile)
281
282 awaitClose {}
283 }
284 proxy = withTimeoutOrNull(5_000) { flow.first() }
285 }
286 if (proxy == null) {
287 Log.w(TAG, "profile proxy $profile is null")
288 }
289 return proxy!! as T
290 }
291
Intentnull292 fun Intent.getBluetoothDeviceExtra(): BluetoothDevice =
293 this.getParcelableExtra(BluetoothDevice.EXTRA_DEVICE, BluetoothDevice::class.java)!!
294
295 fun ByteString.decodeAsMacAddressToString(): String =
296 MacAddress.fromBytes(this.toByteArray()).toString().uppercase()
297
298 fun ByteString.toBluetoothDevice(adapter: BluetoothAdapter): BluetoothDevice =
299 adapter.getRemoteDevice(this.decodeAsMacAddressToString())
300
301 fun Connection.toBluetoothDevice(adapter: BluetoothAdapter): BluetoothDevice =
302 adapter.getRemoteDevice(this.address)
303
304 val Connection.address: String
305 get() = InternalConnectionRef.parseFrom(this.cookie.value).address.decodeAsMacAddressToString()
306
307 val Connection.transport: Int
308 get() = InternalConnectionRef.parseFrom(this.cookie.value).transport
309
310 fun BluetoothDevice.toByteString() =
311 ByteString.copyFrom(MacAddress.fromString(this.address).toByteArray())!!
312
313 fun BluetoothDevice.toConnection(transport: Int): Connection {
314 val internal_connection_ref =
315 InternalConnectionRef.newBuilder()
316 .setAddress(ByteString.copyFrom(MacAddress.fromString(this.address).toByteArray()))
317 .setTransport(transport)
318 .build()
319 val cookie = Any.newBuilder().setValue(internal_connection_ref.toByteString()).build()
320
321 return Connection.newBuilder().setCookie(cookie).build()
322 }
323
324 /** Creates Audio track instance and returns the reference. */
buildAudioTracknull325 fun buildAudioTrack(): AudioTrack? {
326 return AudioTrack.Builder()
327 .setAudioAttributes(
328 AudioAttributes.Builder()
329 .setUsage(AudioAttributes.USAGE_MEDIA)
330 .setContentType(AudioAttributes.CONTENT_TYPE_MUSIC)
331 .build()
332 )
333 .setAudioFormat(
334 AudioFormat.Builder()
335 .setEncoding(AudioFormat.ENCODING_PCM_16BIT)
336 .setSampleRate(44100)
337 .setChannelMask(AudioFormat.CHANNEL_OUT_STEREO)
338 .build()
339 )
340 .setTransferMode(AudioTrack.MODE_STREAM)
341 .setBufferSizeInBytes(44100 * 2 * 2)
342 .build()
343 }
344
345 /**
346 * Generates Alpha-numeric string of given length.
347 *
348 * @param length required string size.
349 * @return a generated string
350 */
generateAlphanumericStringnull351 fun generateAlphanumericString(length: Int): String {
352 return buildString { repeat(length) { append(alphanumeric.random()) } }
353 }
354