1 /*
<lambda>null2 * Copyright (C) 2022 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package com.android.pandora
18
19 import android.bluetooth.BluetoothAdapter
20 import android.bluetooth.BluetoothDevice
21 import android.bluetooth.BluetoothManager
22 import android.bluetooth.BluetoothProfile
23 import android.content.BroadcastReceiver
24 import android.content.Context
25 import android.content.Intent
26 import android.content.IntentFilter
27 import android.media.*
28 import android.net.MacAddress
29 import android.os.ParcelFileDescriptor
30 import android.util.Log
31 import androidx.test.platform.app.InstrumentationRegistry
32 import com.google.protobuf.Any
33 import com.google.protobuf.ByteString
34 import io.grpc.Status
35 import io.grpc.stub.ServerCallStreamObserver
36 import io.grpc.stub.StreamObserver
37 import java.io.BufferedReader
38 import java.io.InputStreamReader
39 import java.io.PrintWriter
40 import java.io.StringWriter
41 import java.util.concurrent.CancellationException
42 import java.util.stream.Collectors
43 import kotlinx.coroutines.CoroutineScope
44 import kotlinx.coroutines.Job
45 import kotlinx.coroutines.channels.Channel
46 import kotlinx.coroutines.channels.awaitClose
47 import kotlinx.coroutines.channels.trySendBlocking
48 import kotlinx.coroutines.flow.Flow
49 import kotlinx.coroutines.flow.callbackFlow
50 import kotlinx.coroutines.flow.catch
51 import kotlinx.coroutines.flow.consumeAsFlow
52 import kotlinx.coroutines.flow.first
53 import kotlinx.coroutines.flow.launchIn
54 import kotlinx.coroutines.flow.onCompletion
55 import kotlinx.coroutines.flow.onEach
56 import kotlinx.coroutines.launch
57 import kotlinx.coroutines.runBlocking
58 import kotlinx.coroutines.withTimeout
59 import kotlinx.coroutines.withTimeoutOrNull
60 import pandora.AndroidProto.InternalConnectionRef
61 import pandora.HostProto.Connection
62
63 private const val TAG = "PandoraUtils"
64 private val alphanumeric = ('A'..'Z') + ('a'..'z') + ('0'..'9')
65
66 val initiatedConnection = HashSet<BluetoothDevice>()
67 val waitedAclConnection = HashSet<BluetoothDevice>()
68 val waitedAclDisconnection = HashSet<BluetoothDevice>()
69
70 fun shell(cmd: String): String {
71 val fd = InstrumentationRegistry.getInstrumentation().getUiAutomation().executeShellCommand(cmd)
72 val input_stream = ParcelFileDescriptor.AutoCloseInputStream(fd)
73 return BufferedReader(InputStreamReader(input_stream)).lines().collect(Collectors.joining("\n"))
74 }
75
76 /**
77 * Creates a cold flow of intents based on an intent filter. If used multiple times in a same class,
78 * this flow should be transformed into a shared flow.
79 *
80 * @param context context on which to register the broadcast receiver.
81 * @param intentFilter intent filter.
82 * @return cold flow.
83 */
84 @kotlinx.coroutines.ExperimentalCoroutinesApi
<lambda>null85 fun intentFlow(context: Context, intentFilter: IntentFilter) = callbackFlow {
86 val broadcastReceiver: BroadcastReceiver =
87 object : BroadcastReceiver() {
88 override fun onReceive(context: Context, intent: Intent) {
89 if (intent.action == BluetoothDevice.ACTION_ACL_CONNECTED) {
90 waitedAclDisconnection.remove(intent.getBluetoothDeviceExtra())
91 }
92 if (intent.action == BluetoothDevice.ACTION_ACL_DISCONNECTED) {
93 initiatedConnection.remove(intent.getBluetoothDeviceExtra())
94 waitedAclConnection.remove(intent.getBluetoothDeviceExtra())
95 waitedAclDisconnection.add(intent.getBluetoothDeviceExtra())
96 }
97 trySendBlocking(intent)
98 }
99 }
100 context.registerReceiver(broadcastReceiver, intentFilter)
101
102 awaitClose { context.unregisterReceiver(broadcastReceiver) }
103 }
104
105 /**
106 * Creates a gRPC coroutine in a given coroutine scope which executes a given suspended function
107 * returning a gRPC response and sends it on a given gRPC stream observer.
108 *
109 * @param T the type of gRPC response.
110 * @param scope coroutine scope used to run the coroutine.
111 * @param responseObserver the gRPC stream observer on which to send the response.
112 * @param timeout the duration in seconds after which the coroutine is automatically cancelled and
113 * returns a timeout error. Default: 60s.
114 * @param block the suspended function to execute to get the response.
115 * @return reference to the coroutine as a Job.
116 *
117 * Example usage:
118 * ```
119 * override fun grpcMethod(
120 * request: TypeOfRequest,
121 * responseObserver: StreamObserver<TypeOfResponse> {
122 * grpcUnary(scope, responseObserver) {
123 * block
124 * }
125 * }
126 * }
127 * ```
128 */
129 @kotlinx.coroutines.ExperimentalCoroutinesApi
grpcUnarynull130 fun <T> grpcUnary(
131 scope: CoroutineScope,
132 responseObserver: StreamObserver<T>,
133 timeout: Long = 60,
134 block: suspend () -> T
135 ): Job {
136 return scope.launch {
137 try {
138 val response = withTimeout(timeout * 1000) { block() }
139 responseObserver.onNext(response)
140 responseObserver.onCompleted()
141 } catch (e: Throwable) {
142 e.printStackTrace()
143 val sw = StringWriter()
144 e.printStackTrace(PrintWriter(sw))
145 responseObserver.onError(
146 Status.UNKNOWN.withCause(e).withDescription(sw.toString()).asException()
147 )
148 }
149 }
150 }
151
152 /**
153 * Creates a gRPC coroutine in a given coroutine scope which executes a given suspended function
154 * taking in a Flow of gRPC requests and returning a Flow of gRPC responses and sends it on a given
155 * gRPC stream observer.
156 *
157 * @param T the type of gRPC response.
158 * @param scope coroutine scope used to run the coroutine.
159 * @param responseObserver the gRPC stream observer on which to send the response.
160 * @param block the suspended function transforming the request Flow to the response Flow.
161 * @return a StreamObserver for the incoming requests.
162 *
163 * Example usage:
164 * ```
165 * override fun grpcMethod(
166 * responseObserver: StreamObserver<TypeOfResponse> {
167 * grpcBidirectionalStream(scope, responseObserver) {
168 * block
169 * }
170 * }
171 * }
172 * ```
173 */
174 @kotlinx.coroutines.ExperimentalCoroutinesApi
grpcBidirectionalStreamnull175 fun <T, U> grpcBidirectionalStream(
176 scope: CoroutineScope,
177 responseObserver: StreamObserver<U>,
178 block: CoroutineScope.(Flow<T>) -> Flow<U>
179 ): StreamObserver<T> {
180
181 val inputChannel = Channel<T>()
182 val serverCallStreamObserver = responseObserver as ServerCallStreamObserver<T>
183
184 val job =
185 scope.launch {
186 block(inputChannel.consumeAsFlow())
187 .onEach { responseObserver.onNext(it) }
188 .onCompletion { error ->
189 if (error == null) {
190 responseObserver.onCompleted()
191 }
192 }
193 .catch {
194 it.printStackTrace()
195 val sw = StringWriter()
196 it.printStackTrace(PrintWriter(sw))
197 responseObserver.onError(
198 Status.UNKNOWN.withCause(it).withDescription(sw.toString()).asException()
199 )
200 }
201 .launchIn(this)
202 }
203
204 serverCallStreamObserver.setOnCancelHandler { job.cancel() }
205
206 return object : StreamObserver<T> {
207 override fun onNext(req: T) {
208 // Note: this should be made a blocking call, and the handler should run in a separate thread
209 // so we get flow control - but for now we can live with this
210 if (inputChannel.trySend(req).isFailure) {
211 job.cancel(CancellationException("too many incoming requests, buffer exceeded"))
212 responseObserver.onError(
213 CancellationException("too many incoming requests, buffer exceeded")
214 )
215 }
216 }
217
218 override fun onCompleted() {
219 // stop the input flow, but keep the job running
220 inputChannel.close()
221 }
222
223 override fun onError(e: Throwable) {
224 job.cancel()
225 e.printStackTrace()
226 val sw = StringWriter()
227 e.printStackTrace(PrintWriter(sw))
228 responseObserver.onError(
229 Status.UNKNOWN.withCause(e).withDescription(sw.toString()).asException()
230 )
231 }
232 }
233 }
234
235 /**
236 * Creates a gRPC coroutine in a given coroutine scope which executes a given suspended function
237 * taking in a Flow of gRPC requests and returning a Flow of gRPC responses and sends it on a given
238 * gRPC stream observer.
239 *
240 * @param T the type of gRPC response.
241 * @param scope coroutine scope used to run the coroutine.
242 * @param responseObserver the gRPC stream observer on which to send the response.
243 * @param block the suspended function producing the response Flow.
244 * @return a StreamObserver for the incoming requests.
245 *
246 * Example usage:
247 * ```
248 * override fun grpcMethod(
249 * request: TypeOfRequest,
250 * responseObserver: StreamObserver<TypeOfResponse> {
251 * grpcServerStream(scope, responseObserver) {
252 * block
253 * }
254 * }
255 * }
256 * ```
257 */
258 @kotlinx.coroutines.ExperimentalCoroutinesApi
grpcServerStreamnull259 fun <T> grpcServerStream(
260 scope: CoroutineScope,
261 responseObserver: StreamObserver<T>,
262 block: CoroutineScope.() -> Flow<T>
263 ) {
264 val serverCallStreamObserver = responseObserver as ServerCallStreamObserver<T>
265
266 val job =
267 scope.launch {
268 block()
269 .onEach { responseObserver.onNext(it) }
270 .onCompletion { error ->
271 if (error == null) {
272 responseObserver.onCompleted()
273 }
274 }
275 .catch {
276 it.printStackTrace()
277 val sw = StringWriter()
278 it.printStackTrace(PrintWriter(sw))
279 responseObserver.onError(
280 Status.UNKNOWN.withCause(it).withDescription(sw.toString()).asException()
281 )
282 }
283 .launchIn(this)
284 }
285
286 serverCallStreamObserver.setOnCancelHandler { job.cancel() }
287 }
288
289 /**
290 * Synchronous method to get a Bluetooth profile proxy.
291 *
292 * @param T the type of profile proxy (e.g. BluetoothA2dp)
293 * @param context context
294 * @param bluetoothAdapter local Bluetooth adapter
295 * @param profile identifier of the Bluetooth profile (e.g. BluetoothProfile#A2DP)
296 * @return T the desired profile proxy
297 */
298 @Suppress("UNCHECKED_CAST")
299 @kotlinx.coroutines.ExperimentalCoroutinesApi
getProfileProxynull300 fun <T> getProfileProxy(context: Context, profile: Int): T {
301 var proxy: BluetoothProfile?
302 runBlocking {
303 val bluetoothManager = context.getSystemService(BluetoothManager::class.java)!!
304 val bluetoothAdapter = bluetoothManager.adapter
305
306 val flow = callbackFlow {
307 val serviceListener =
308 object : BluetoothProfile.ServiceListener {
309 override fun onServiceConnected(profile: Int, proxy: BluetoothProfile) {
310 trySendBlocking(proxy)
311 }
312 override fun onServiceDisconnected(profile: Int) {}
313 }
314
315 bluetoothAdapter.getProfileProxy(context, serviceListener, profile)
316
317 awaitClose {}
318 }
319 proxy = withTimeoutOrNull(5_000) { flow.first() }
320 }
321 if (proxy == null) {
322 Log.w(TAG, "profile proxy $profile is null")
323 }
324 return proxy!! as T
325 }
326
Intentnull327 fun Intent.getBluetoothDeviceExtra(): BluetoothDevice =
328 this.getParcelableExtra(BluetoothDevice.EXTRA_DEVICE, BluetoothDevice::class.java)!!
329
330 fun ByteString.decodeAsMacAddressToString(): String =
331 MacAddress.fromBytes(this.toByteArray()).toString().uppercase()
332
333 fun ByteString.toBluetoothDevice(adapter: BluetoothAdapter): BluetoothDevice =
334 adapter.getRemoteDevice(this.decodeAsMacAddressToString())
335
336 fun Connection.toBluetoothDevice(adapter: BluetoothAdapter): BluetoothDevice =
337 adapter.getRemoteDevice(this.address)
338
339 val Connection.address: String
340 get() = InternalConnectionRef.parseFrom(this.cookie.value).address.decodeAsMacAddressToString()
341
342 val Connection.transport: Int
343 get() = InternalConnectionRef.parseFrom(this.cookie.value).transport
344
345 fun BluetoothDevice.toByteString() =
346 ByteString.copyFrom(MacAddress.fromString(this.address).toByteArray())!!
347
348 fun BluetoothDevice.toConnection(transport: Int): Connection {
349 val internal_connection_ref =
350 InternalConnectionRef.newBuilder()
351 .setAddress(ByteString.copyFrom(MacAddress.fromString(this.address).toByteArray()))
352 .setTransport(transport)
353 .build()
354 val cookie = Any.newBuilder().setValue(internal_connection_ref.toByteString()).build()
355
356 return Connection.newBuilder().setCookie(cookie).build()
357 }
358
359 /** Creates Audio track instance and returns the reference. */
buildAudioTracknull360 fun buildAudioTrack(): AudioTrack? {
361 return AudioTrack.Builder()
362 .setAudioAttributes(
363 AudioAttributes.Builder()
364 .setUsage(AudioAttributes.USAGE_MEDIA)
365 .setContentType(AudioAttributes.CONTENT_TYPE_MUSIC)
366 .build()
367 )
368 .setAudioFormat(
369 AudioFormat.Builder()
370 .setEncoding(AudioFormat.ENCODING_PCM_16BIT)
371 .setSampleRate(44100)
372 .setChannelMask(AudioFormat.CHANNEL_OUT_STEREO)
373 .build()
374 )
375 .setTransferMode(AudioTrack.MODE_STREAM)
376 .setBufferSizeInBytes(44100 * 2 * 2)
377 .build()
378 }
379
380 /**
381 * Generates Alpha-numeric string of given length.
382 *
383 * @param length required string size.
384 * @return a generated string
385 */
generateAlphanumericStringnull386 fun generateAlphanumericString(length: Int): String {
387 return buildString { repeat(length) { append(alphanumeric.random()) } }
388 }
389