• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.pandora
18 
19 import android.bluetooth.BluetoothAdapter
20 import android.bluetooth.BluetoothDevice
21 import android.bluetooth.BluetoothManager
22 import android.bluetooth.BluetoothProfile
23 import android.content.BroadcastReceiver
24 import android.content.Context
25 import android.content.Intent
26 import android.content.IntentFilter
27 import android.media.*
28 import android.net.MacAddress
29 import android.os.ParcelFileDescriptor
30 import android.util.Log
31 import androidx.test.platform.app.InstrumentationRegistry
32 import com.google.protobuf.Any
33 import com.google.protobuf.ByteString
34 import io.grpc.stub.ServerCallStreamObserver
35 import io.grpc.stub.StreamObserver
36 import java.io.BufferedReader
37 import java.io.InputStreamReader
38 import java.util.concurrent.CancellationException
39 import java.util.stream.Collectors
40 import kotlinx.coroutines.CoroutineScope
41 import kotlinx.coroutines.Job
42 import kotlinx.coroutines.channels.Channel
43 import kotlinx.coroutines.channels.awaitClose
44 import kotlinx.coroutines.channels.trySendBlocking
45 import kotlinx.coroutines.flow.Flow
46 import kotlinx.coroutines.flow.callbackFlow
47 import kotlinx.coroutines.flow.catch
48 import kotlinx.coroutines.flow.consumeAsFlow
49 import kotlinx.coroutines.flow.first
50 import kotlinx.coroutines.flow.launchIn
51 import kotlinx.coroutines.flow.onCompletion
52 import kotlinx.coroutines.flow.onEach
53 import kotlinx.coroutines.launch
54 import kotlinx.coroutines.runBlocking
55 import kotlinx.coroutines.withTimeout
56 import kotlinx.coroutines.withTimeoutOrNull
57 import pandora.AndroidProto.InternalConnectionRef
58 import pandora.HostProto.Connection
59 
60 private const val TAG = "PandoraUtils"
61 private val alphanumeric = ('A'..'Z') + ('a'..'z') + ('0'..'9')
62 
63 fun shell(cmd: String): String {
64   val fd = InstrumentationRegistry.getInstrumentation().getUiAutomation().executeShellCommand(cmd)
65   val input_stream = ParcelFileDescriptor.AutoCloseInputStream(fd)
66   return BufferedReader(InputStreamReader(input_stream)).lines().collect(Collectors.joining("\n"))
67 }
68 
69 /**
70  * Creates a cold flow of intents based on an intent filter. If used multiple times in a same class,
71  * this flow should be transformed into a shared flow.
72  *
73  * @param context context on which to register the broadcast receiver.
74  * @param intentFilter intent filter.
75  * @return cold flow.
76  */
77 @kotlinx.coroutines.ExperimentalCoroutinesApi
<lambda>null78 fun intentFlow(context: Context, intentFilter: IntentFilter) = callbackFlow {
79   val broadcastReceiver: BroadcastReceiver =
80     object : BroadcastReceiver() {
81       override fun onReceive(context: Context, intent: Intent) {
82         trySendBlocking(intent)
83       }
84     }
85   context.registerReceiver(broadcastReceiver, intentFilter)
86 
87   awaitClose { context.unregisterReceiver(broadcastReceiver) }
88 }
89 
90 /**
91  * Creates a gRPC coroutine in a given coroutine scope which executes a given suspended function
92  * returning a gRPC response and sends it on a given gRPC stream observer.
93  *
94  * @param T the type of gRPC response.
95  * @param scope coroutine scope used to run the coroutine.
96  * @param responseObserver the gRPC stream observer on which to send the response.
97  * @param timeout the duration in seconds after which the coroutine is automatically cancelled and
98  * returns a timeout error. Default: 60s.
99  * @param block the suspended function to execute to get the response.
100  * @return reference to the coroutine as a Job.
101  *
102  * Example usage:
103  * ```
104  * override fun grpcMethod(
105  *   request: TypeOfRequest,
106  *   responseObserver: StreamObserver<TypeOfResponse> {
107  *     grpcUnary(scope, responseObserver) {
108  *       block
109  *     }
110  *   }
111  * }
112  * ```
113  */
114 @kotlinx.coroutines.ExperimentalCoroutinesApi
grpcUnarynull115 fun <T> grpcUnary(
116   scope: CoroutineScope,
117   responseObserver: StreamObserver<T>,
118   timeout: Long = 60,
119   block: suspend () -> T
120 ): Job {
121   return scope.launch {
122     try {
123       val response = withTimeout(timeout * 1000) { block() }
124       responseObserver.onNext(response)
125       responseObserver.onCompleted()
126     } catch (e: Throwable) {
127       e.printStackTrace()
128       responseObserver.onError(e)
129     }
130   }
131 }
132 
133 /**
134  * Creates a gRPC coroutine in a given coroutine scope which executes a given suspended function
135  * taking in a Flow of gRPC requests and returning a Flow of gRPC responses and sends it on a given
136  * gRPC stream observer.
137  *
138  * @param T the type of gRPC response.
139  * @param scope coroutine scope used to run the coroutine.
140  * @param responseObserver the gRPC stream observer on which to send the response.
141  * @param block the suspended function transforming the request Flow to the response Flow.
142  * @return a StreamObserver for the incoming requests.
143  *
144  * Example usage:
145  * ```
146  * override fun grpcMethod(
147  *   responseObserver: StreamObserver<TypeOfResponse> {
148  *     grpcBidirectionalStream(scope, responseObserver) {
149  *       block
150  *     }
151  *   }
152  * }
153  * ```
154  */
155 @kotlinx.coroutines.ExperimentalCoroutinesApi
grpcBidirectionalStreamnull156 fun <T, U> grpcBidirectionalStream(
157   scope: CoroutineScope,
158   responseObserver: StreamObserver<U>,
159   block: CoroutineScope.(Flow<T>) -> Flow<U>
160 ): StreamObserver<T> {
161 
162   val inputChannel = Channel<T>()
163 
164   val job =
165     scope.launch {
166       block(inputChannel.consumeAsFlow())
167         .onEach { responseObserver.onNext(it) }
168         .onCompletion { error ->
169           if (error == null) {
170             responseObserver.onCompleted()
171           }
172         }
173         .catch {
174           it.printStackTrace()
175           responseObserver.onError(it)
176         }
177         .launchIn(this)
178     }
179 
180   return object : StreamObserver<T> {
181     override fun onNext(req: T) {
182       // Note: this should be made a blocking call, and the handler should run in a separate thread
183       // so we get flow control - but for now we can live with this
184       if (inputChannel.trySend(req).isFailure) {
185         job.cancel(CancellationException("too many incoming requests, buffer exceeded"))
186         responseObserver.onError(
187           CancellationException("too many incoming requests, buffer exceeded")
188         )
189       }
190     }
191 
192     override fun onCompleted() {
193       // stop the input flow, but keep the job running
194       inputChannel.close()
195     }
196 
197     override fun onError(e: Throwable) {
198       job.cancel()
199       e.printStackTrace()
200     }
201   }
202 }
203 
204 /**
205  * Creates a gRPC coroutine in a given coroutine scope which executes a given suspended function
206  * taking in a Flow of gRPC requests and returning a Flow of gRPC responses and sends it on a given
207  * gRPC stream observer.
208  *
209  * @param T the type of gRPC response.
210  * @param scope coroutine scope used to run the coroutine.
211  * @param responseObserver the gRPC stream observer on which to send the response.
212  * @param block the suspended function producing the response Flow.
213  * @return a StreamObserver for the incoming requests.
214  *
215  * Example usage:
216  * ```
217  * override fun grpcMethod(
218  *   request: TypeOfRequest,
219  *   responseObserver: StreamObserver<TypeOfResponse> {
220  *     grpcServerStream(scope, responseObserver) {
221  *       block
222  *     }
223  *   }
224  * }
225  * ```
226  */
227 @kotlinx.coroutines.ExperimentalCoroutinesApi
grpcServerStreamnull228 fun <T> grpcServerStream(
229   scope: CoroutineScope,
230   responseObserver: StreamObserver<T>,
231   block: CoroutineScope.() -> Flow<T>
232 ) {
233   val serverCallStreamObserver = responseObserver as ServerCallStreamObserver<T>
234 
235   val job =
236     scope.launch {
237       block()
238         .onEach { responseObserver.onNext(it) }
239         .onCompletion { error ->
240           if (error == null) {
241             responseObserver.onCompleted()
242           }
243         }
244         .catch {
245           it.printStackTrace()
246           responseObserver.onError(it)
247         }
248         .launchIn(this)
249     }
250 
251   serverCallStreamObserver.setOnCancelHandler { job.cancel() }
252 }
253 
254 /**
255  * Synchronous method to get a Bluetooth profile proxy.
256  *
257  * @param T the type of profile proxy (e.g. BluetoothA2dp)
258  * @param context context
259  * @param bluetoothAdapter local Bluetooth adapter
260  * @param profile identifier of the Bluetooth profile (e.g. BluetoothProfile#A2DP)
261  * @return T the desired profile proxy
262  */
263 @Suppress("UNCHECKED_CAST")
264 @kotlinx.coroutines.ExperimentalCoroutinesApi
getProfileProxynull265 fun <T> getProfileProxy(context: Context, profile: Int): T {
266   var proxy: BluetoothProfile?
267   runBlocking {
268     val bluetoothManager = context.getSystemService(BluetoothManager::class.java)!!
269     val bluetoothAdapter = bluetoothManager.adapter
270 
271     val flow = callbackFlow {
272       val serviceListener =
273         object : BluetoothProfile.ServiceListener {
274           override fun onServiceConnected(profile: Int, proxy: BluetoothProfile) {
275             trySendBlocking(proxy)
276           }
277           override fun onServiceDisconnected(profile: Int) {}
278         }
279 
280       bluetoothAdapter.getProfileProxy(context, serviceListener, profile)
281 
282       awaitClose {}
283     }
284     proxy = withTimeoutOrNull(5_000) { flow.first() }
285   }
286   if (proxy == null) {
287     Log.w(TAG, "profile proxy $profile is null")
288   }
289   return proxy!! as T
290 }
291 
Intentnull292 fun Intent.getBluetoothDeviceExtra(): BluetoothDevice =
293   this.getParcelableExtra(BluetoothDevice.EXTRA_DEVICE, BluetoothDevice::class.java)!!
294 
295 fun ByteString.decodeAsMacAddressToString(): String =
296   MacAddress.fromBytes(this.toByteArray()).toString().uppercase()
297 
298 fun ByteString.toBluetoothDevice(adapter: BluetoothAdapter): BluetoothDevice =
299   adapter.getRemoteDevice(this.decodeAsMacAddressToString())
300 
301 fun Connection.toBluetoothDevice(adapter: BluetoothAdapter): BluetoothDevice =
302   adapter.getRemoteDevice(this.address)
303 
304 val Connection.address: String
305   get() = InternalConnectionRef.parseFrom(this.cookie.value).address.decodeAsMacAddressToString()
306 
307 val Connection.transport: Int
308   get() = InternalConnectionRef.parseFrom(this.cookie.value).transport
309 
310 fun BluetoothDevice.toByteString() =
311   ByteString.copyFrom(MacAddress.fromString(this.address).toByteArray())!!
312 
313 fun BluetoothDevice.toConnection(transport: Int): Connection {
314   val internal_connection_ref =
315     InternalConnectionRef.newBuilder()
316       .setAddress(ByteString.copyFrom(MacAddress.fromString(this.address).toByteArray()))
317       .setTransport(transport)
318       .build()
319   val cookie = Any.newBuilder().setValue(internal_connection_ref.toByteString()).build()
320 
321   return Connection.newBuilder().setCookie(cookie).build()
322 }
323 
324 /** Creates Audio track instance and returns the reference. */
buildAudioTracknull325 fun buildAudioTrack(): AudioTrack? {
326   return AudioTrack.Builder()
327     .setAudioAttributes(
328       AudioAttributes.Builder()
329         .setUsage(AudioAttributes.USAGE_MEDIA)
330         .setContentType(AudioAttributes.CONTENT_TYPE_MUSIC)
331         .build()
332     )
333     .setAudioFormat(
334       AudioFormat.Builder()
335         .setEncoding(AudioFormat.ENCODING_PCM_16BIT)
336         .setSampleRate(44100)
337         .setChannelMask(AudioFormat.CHANNEL_OUT_STEREO)
338         .build()
339     )
340     .setTransferMode(AudioTrack.MODE_STREAM)
341     .setBufferSizeInBytes(44100 * 2 * 2)
342     .build()
343 }
344 
345 /**
346  * Generates Alpha-numeric string of given length.
347  *
348  * @param length required string size.
349  * @return a generated string
350  */
generateAlphanumericStringnull351 fun generateAlphanumericString(length: Int): String {
352   return buildString { repeat(length) { append(alphanumeric.random()) } }
353 }
354