1 /*
<lambda>null2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.pandora
18 
19 import android.bluetooth.BluetoothAdapter
20 import android.bluetooth.BluetoothDevice
21 import android.bluetooth.BluetoothManager
22 import android.bluetooth.BluetoothProfile
23 import android.content.BroadcastReceiver
24 import android.content.Context
25 import android.content.Intent
26 import android.content.IntentFilter
27 import android.media.*
28 import android.net.MacAddress
29 import android.os.ParcelFileDescriptor
30 import android.util.Log
31 import androidx.test.platform.app.InstrumentationRegistry
32 import com.google.protobuf.Any
33 import com.google.protobuf.ByteString
34 import io.grpc.Status
35 import io.grpc.stub.ServerCallStreamObserver
36 import io.grpc.stub.StreamObserver
37 import java.io.BufferedReader
38 import java.io.InputStreamReader
39 import java.io.PrintWriter
40 import java.io.StringWriter
41 import java.util.concurrent.CancellationException
42 import java.util.stream.Collectors
43 import kotlinx.coroutines.CoroutineScope
44 import kotlinx.coroutines.Job
45 import kotlinx.coroutines.channels.Channel
46 import kotlinx.coroutines.channels.awaitClose
47 import kotlinx.coroutines.channels.trySendBlocking
48 import kotlinx.coroutines.flow.Flow
49 import kotlinx.coroutines.flow.callbackFlow
50 import kotlinx.coroutines.flow.catch
51 import kotlinx.coroutines.flow.consumeAsFlow
52 import kotlinx.coroutines.flow.first
53 import kotlinx.coroutines.flow.launchIn
54 import kotlinx.coroutines.flow.onCompletion
55 import kotlinx.coroutines.flow.onEach
56 import kotlinx.coroutines.launch
57 import kotlinx.coroutines.runBlocking
58 import kotlinx.coroutines.withTimeout
59 import kotlinx.coroutines.withTimeoutOrNull
60 import pandora.AndroidProto.InternalConnectionRef
61 import pandora.HostProto.Connection
62 
63 private const val TAG = "PandoraUtils"
64 private val alphanumeric = ('A'..'Z') + ('a'..'z') + ('0'..'9')
65 
66 val initiatedConnection = HashSet<BluetoothDevice>()
67 val waitedAclConnection = HashSet<BluetoothDevice>()
68 val waitedAclDisconnection = HashSet<BluetoothDevice>()
69 
70 fun shell(cmd: String): String {
71   val fd = InstrumentationRegistry.getInstrumentation().getUiAutomation().executeShellCommand(cmd)
72   val input_stream = ParcelFileDescriptor.AutoCloseInputStream(fd)
73   return BufferedReader(InputStreamReader(input_stream)).lines().collect(Collectors.joining("\n"))
74 }
75 
76 /**
77  * Creates a cold flow of intents based on an intent filter. If used multiple times in a same class,
78  * this flow should be transformed into a shared flow.
79  *
80  * @param context context on which to register the broadcast receiver.
81  * @param intentFilter intent filter.
82  * @return cold flow.
83  */
84 @kotlinx.coroutines.ExperimentalCoroutinesApi
<lambda>null85 fun intentFlow(context: Context, intentFilter: IntentFilter) = callbackFlow {
86   val broadcastReceiver: BroadcastReceiver =
87     object : BroadcastReceiver() {
88       override fun onReceive(context: Context, intent: Intent) {
89         if (intent.action == BluetoothDevice.ACTION_ACL_CONNECTED) {
90           waitedAclDisconnection.remove(intent.getBluetoothDeviceExtra())
91         }
92         if (intent.action == BluetoothDevice.ACTION_ACL_DISCONNECTED) {
93           initiatedConnection.remove(intent.getBluetoothDeviceExtra())
94           waitedAclConnection.remove(intent.getBluetoothDeviceExtra())
95           waitedAclDisconnection.add(intent.getBluetoothDeviceExtra())
96         }
97         trySendBlocking(intent)
98       }
99     }
100   context.registerReceiver(broadcastReceiver, intentFilter)
101 
102   awaitClose { context.unregisterReceiver(broadcastReceiver) }
103 }
104 
105 /**
106  * Creates a gRPC coroutine in a given coroutine scope which executes a given suspended function
107  * returning a gRPC response and sends it on a given gRPC stream observer.
108  *
109  * @param T the type of gRPC response.
110  * @param scope coroutine scope used to run the coroutine.
111  * @param responseObserver the gRPC stream observer on which to send the response.
112  * @param timeout the duration in seconds after which the coroutine is automatically cancelled and
113  *   returns a timeout error. Default: 60s.
114  * @param block the suspended function to execute to get the response.
115  * @return reference to the coroutine as a Job.
116  *
117  * Example usage:
118  * ```
119  * override fun grpcMethod(
120  *   request: TypeOfRequest,
121  *   responseObserver: StreamObserver<TypeOfResponse> {
122  *     grpcUnary(scope, responseObserver) {
123  *       block
124  *     }
125  *   }
126  * }
127  * ```
128  */
129 @kotlinx.coroutines.ExperimentalCoroutinesApi
grpcUnarynull130 fun <T> grpcUnary(
131   scope: CoroutineScope,
132   responseObserver: StreamObserver<T>,
133   timeout: Long = 60,
134   block: suspend () -> T
135 ): Job {
136   return scope.launch {
137     try {
138       val response = withTimeout(timeout * 1000) { block() }
139       responseObserver.onNext(response)
140       responseObserver.onCompleted()
141     } catch (e: Throwable) {
142       e.printStackTrace()
143       val sw = StringWriter()
144       e.printStackTrace(PrintWriter(sw))
145       responseObserver.onError(
146         Status.UNKNOWN.withCause(e).withDescription(sw.toString()).asException()
147       )
148     }
149   }
150 }
151 
152 /**
153  * Creates a gRPC coroutine in a given coroutine scope which executes a given suspended function
154  * taking in a Flow of gRPC requests and returning a Flow of gRPC responses and sends it on a given
155  * gRPC stream observer.
156  *
157  * @param T the type of gRPC response.
158  * @param scope coroutine scope used to run the coroutine.
159  * @param responseObserver the gRPC stream observer on which to send the response.
160  * @param block the suspended function transforming the request Flow to the response Flow.
161  * @return a StreamObserver for the incoming requests.
162  *
163  * Example usage:
164  * ```
165  * override fun grpcMethod(
166  *   responseObserver: StreamObserver<TypeOfResponse> {
167  *     grpcBidirectionalStream(scope, responseObserver) {
168  *       block
169  *     }
170  *   }
171  * }
172  * ```
173  */
174 @kotlinx.coroutines.ExperimentalCoroutinesApi
grpcBidirectionalStreamnull175 fun <T, U> grpcBidirectionalStream(
176   scope: CoroutineScope,
177   responseObserver: StreamObserver<U>,
178   block: CoroutineScope.(Flow<T>) -> Flow<U>
179 ): StreamObserver<T> {
180 
181   val inputChannel = Channel<T>()
182   val serverCallStreamObserver = responseObserver as ServerCallStreamObserver<T>
183 
184   val job =
185     scope.launch {
186       block(inputChannel.consumeAsFlow())
187         .onEach { responseObserver.onNext(it) }
188         .onCompletion { error ->
189           if (error == null) {
190             responseObserver.onCompleted()
191           }
192         }
193         .catch {
194           it.printStackTrace()
195           val sw = StringWriter()
196           it.printStackTrace(PrintWriter(sw))
197           responseObserver.onError(
198             Status.UNKNOWN.withCause(it).withDescription(sw.toString()).asException()
199           )
200         }
201         .launchIn(this)
202     }
203 
204   serverCallStreamObserver.setOnCancelHandler { job.cancel() }
205 
206   return object : StreamObserver<T> {
207     override fun onNext(req: T) {
208       // Note: this should be made a blocking call, and the handler should run in a separate thread
209       // so we get flow control - but for now we can live with this
210       if (inputChannel.trySend(req).isFailure) {
211         job.cancel(CancellationException("too many incoming requests, buffer exceeded"))
212         responseObserver.onError(
213           CancellationException("too many incoming requests, buffer exceeded")
214         )
215       }
216     }
217 
218     override fun onCompleted() {
219       // stop the input flow, but keep the job running
220       inputChannel.close()
221     }
222 
223     override fun onError(e: Throwable) {
224       job.cancel()
225       e.printStackTrace()
226       val sw = StringWriter()
227       e.printStackTrace(PrintWriter(sw))
228       responseObserver.onError(
229         Status.UNKNOWN.withCause(e).withDescription(sw.toString()).asException()
230       )
231     }
232   }
233 }
234 
235 /**
236  * Creates a gRPC coroutine in a given coroutine scope which executes a given suspended function
237  * taking in a Flow of gRPC requests and returning a Flow of gRPC responses and sends it on a given
238  * gRPC stream observer.
239  *
240  * @param T the type of gRPC response.
241  * @param scope coroutine scope used to run the coroutine.
242  * @param responseObserver the gRPC stream observer on which to send the response.
243  * @param block the suspended function producing the response Flow.
244  * @return a StreamObserver for the incoming requests.
245  *
246  * Example usage:
247  * ```
248  * override fun grpcMethod(
249  *   request: TypeOfRequest,
250  *   responseObserver: StreamObserver<TypeOfResponse> {
251  *     grpcServerStream(scope, responseObserver) {
252  *       block
253  *     }
254  *   }
255  * }
256  * ```
257  */
258 @kotlinx.coroutines.ExperimentalCoroutinesApi
grpcServerStreamnull259 fun <T> grpcServerStream(
260   scope: CoroutineScope,
261   responseObserver: StreamObserver<T>,
262   block: CoroutineScope.() -> Flow<T>
263 ) {
264   val serverCallStreamObserver = responseObserver as ServerCallStreamObserver<T>
265 
266   val job =
267     scope.launch {
268       block()
269         .onEach { responseObserver.onNext(it) }
270         .onCompletion { error ->
271           if (error == null) {
272             responseObserver.onCompleted()
273           }
274         }
275         .catch {
276           it.printStackTrace()
277           val sw = StringWriter()
278           it.printStackTrace(PrintWriter(sw))
279           responseObserver.onError(
280             Status.UNKNOWN.withCause(it).withDescription(sw.toString()).asException()
281           )
282         }
283         .launchIn(this)
284     }
285 
286   serverCallStreamObserver.setOnCancelHandler { job.cancel() }
287 }
288 
289 /**
290  * Synchronous method to get a Bluetooth profile proxy.
291  *
292  * @param T the type of profile proxy (e.g. BluetoothA2dp)
293  * @param context context
294  * @param bluetoothAdapter local Bluetooth adapter
295  * @param profile identifier of the Bluetooth profile (e.g. BluetoothProfile#A2DP)
296  * @return T the desired profile proxy
297  */
298 @Suppress("UNCHECKED_CAST")
299 @kotlinx.coroutines.ExperimentalCoroutinesApi
getProfileProxynull300 fun <T> getProfileProxy(context: Context, profile: Int): T {
301   var proxy: BluetoothProfile?
302   runBlocking {
303     val bluetoothManager = context.getSystemService(BluetoothManager::class.java)!!
304     val bluetoothAdapter = bluetoothManager.adapter
305 
306     val flow = callbackFlow {
307       val serviceListener =
308         object : BluetoothProfile.ServiceListener {
309           override fun onServiceConnected(profile: Int, proxy: BluetoothProfile) {
310             trySendBlocking(proxy)
311           }
312           override fun onServiceDisconnected(profile: Int) {}
313         }
314 
315       bluetoothAdapter.getProfileProxy(context, serviceListener, profile)
316 
317       awaitClose {}
318     }
319     proxy = withTimeoutOrNull(5_000) { flow.first() }
320   }
321   if (proxy == null) {
322     Log.w(TAG, "profile proxy $profile is null")
323   }
324   return proxy!! as T
325 }
326 
Intentnull327 fun Intent.getBluetoothDeviceExtra(): BluetoothDevice =
328   this.getParcelableExtra(BluetoothDevice.EXTRA_DEVICE, BluetoothDevice::class.java)!!
329 
330 fun ByteString.decodeAsMacAddressToString(): String =
331   MacAddress.fromBytes(this.toByteArray()).toString().uppercase()
332 
333 fun ByteString.toBluetoothDevice(adapter: BluetoothAdapter): BluetoothDevice =
334   adapter.getRemoteDevice(this.decodeAsMacAddressToString())
335 
336 fun Connection.toBluetoothDevice(adapter: BluetoothAdapter): BluetoothDevice =
337   adapter.getRemoteDevice(this.address)
338 
339 val Connection.address: String
340   get() = InternalConnectionRef.parseFrom(this.cookie.value).address.decodeAsMacAddressToString()
341 
342 val Connection.transport: Int
343   get() = InternalConnectionRef.parseFrom(this.cookie.value).transport
344 
345 fun BluetoothDevice.toByteString() =
346   ByteString.copyFrom(MacAddress.fromString(this.address).toByteArray())!!
347 
348 fun BluetoothDevice.toConnection(transport: Int): Connection {
349   val internal_connection_ref =
350     InternalConnectionRef.newBuilder()
351       .setAddress(ByteString.copyFrom(MacAddress.fromString(this.address).toByteArray()))
352       .setTransport(transport)
353       .build()
354   val cookie = Any.newBuilder().setValue(internal_connection_ref.toByteString()).build()
355 
356   return Connection.newBuilder().setCookie(cookie).build()
357 }
358 
359 /** Creates Audio track instance and returns the reference. */
buildAudioTracknull360 fun buildAudioTrack(): AudioTrack? {
361   return AudioTrack.Builder()
362     .setAudioAttributes(
363       AudioAttributes.Builder()
364         .setUsage(AudioAttributes.USAGE_MEDIA)
365         .setContentType(AudioAttributes.CONTENT_TYPE_MUSIC)
366         .build()
367     )
368     .setAudioFormat(
369       AudioFormat.Builder()
370         .setEncoding(AudioFormat.ENCODING_PCM_16BIT)
371         .setSampleRate(44100)
372         .setChannelMask(AudioFormat.CHANNEL_OUT_STEREO)
373         .build()
374     )
375     .setTransferMode(AudioTrack.MODE_STREAM)
376     .setBufferSizeInBytes(44100 * 2 * 2)
377     .build()
378 }
379 
380 /**
381  * Generates Alpha-numeric string of given length.
382  *
383  * @param length required string size.
384  * @return a generated string
385  */
generateAlphanumericStringnull386 fun generateAlphanumericString(length: Int): String {
387   return buildString { repeat(length) { append(alphanumeric.random()) } }
388 }
389