1 /*
<lambda>null2  * Copyright 2024 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package androidx.core.telecom.extensions
18 
19 import android.os.Build.VERSION_CODES
20 import android.util.Log
21 import androidx.annotation.IntDef
22 import androidx.annotation.RequiresApi
23 import androidx.core.telecom.internal.CapabilityExchangeRepository
24 import androidx.core.telecom.internal.MeetingSummaryStateListenerRemote
25 import androidx.core.telecom.internal.ParticipantActionCallbackRepository
26 import androidx.core.telecom.internal.ParticipantStateListenerRemote
27 import androidx.core.telecom.util.ExperimentalAppActions
28 import kotlinx.coroutines.CoroutineScope
29 import kotlinx.coroutines.flow.MutableStateFlow
30 import kotlinx.coroutines.flow.combine
31 import kotlinx.coroutines.flow.distinctUntilChanged
32 import kotlinx.coroutines.flow.launchIn
33 import kotlinx.coroutines.flow.onEach
34 
35 /**
36  * Called when a new remove connection to an action is being established. The
37  * [ParticipantStateListenerRemote] contains the remote interface used to send both the initial and
38  * ongoing updates to the state tracked by the action. Any collection of flows related to updating
39  * the remote session should use the provided [CoroutineScope]. For event callbacks from the remote,
40  * [ParticipantActionCallbackRepository] should be used to register the callbacks that the action
41  * should handle.
42  */
43 @OptIn(ExperimentalAppActions::class)
44 internal typealias ActionConnector =
45     (CoroutineScope, ParticipantActionCallbackRepository, ParticipantStateListenerRemote) -> Unit
46 
47 /**
48  * The participant extension that manages the state of Participants associated with this call as
49  * well as allowing participant related actions to register themselves with this extension.
50  *
51  * Along with updating the participants in a call to remote surfaces, this extension also allows the
52  * following optional actions to be supported:
53  * - [addRaiseHandSupport] - Support for allowing a remote surface to show which participants have
54  *   their hands raised to the user as well as update the raised hand state of the user.
55  * - [addKickParticipantSupport] = Support for allowing a user on a remote surface to kick a
56  *   participant.
57  *
58  * @param initialParticipants The initial list of Participants that are associated with this call.
59  * @param initialActiveParticipant The initial active Participant that is associated with this call.
60  */
61 @OptIn(ExperimentalAppActions::class)
62 @RequiresApi(VERSION_CODES.O)
63 internal class ParticipantExtensionImpl(
64     initialParticipants: List<Participant>,
65     initialActiveParticipant: Participant?
66 ) : ParticipantExtension {
67     companion object {
68         /**
69          * The version of this ParticipantExtension used for capability exchange. Should be updated
70          * whenever there is an API change to this extension or an existing action.
71          */
72         internal const val VERSION = 1
73         /**
74          * The version of this MeetingSummaryExtension used for capability exchange. Should be
75          * updated whenever there is an API change to this extension or an existing action.
76          */
77         internal const val MEETING_SUMMARY_VERSION = 1
78 
79         /**
80          * Constants used to denote the type of action supported by the [Capability] being
81          * registered.
82          */
83         @Target(AnnotationTarget.TYPE)
84         @Retention(AnnotationRetention.SOURCE)
85         @IntDef(RAISE_HAND_ACTION, KICK_PARTICIPANT_ACTION)
86         annotation class ExtensionActions
87 
88         /** Identifier for the raise hand action */
89         internal const val RAISE_HAND_ACTION = 1
90         /** Identifier for the kick participant action */
91         internal const val KICK_PARTICIPANT_ACTION = 2
92 
93         private const val LOG_TAG = Extensions.LOG_TAG + "(PE)"
94     }
95 
96     /** StateFlow of the current set of Participants associated with the call */
97     internal val participants: MutableStateFlow<List<Participant>> =
98         MutableStateFlow(initialParticipants)
99 
100     /** StateFlow containing the active participant of the call if it exists */
101     private val activeParticipant: MutableStateFlow<Participant?> =
102         MutableStateFlow(initialActiveParticipant)
103 
104     /** Maps an action to its [ActionConnector], which will be called during capability exchange */
105     private val actionRemoteConnector: HashMap<Int, ActionConnector> = HashMap()
106 
107     override suspend fun updateParticipants(newParticipants: List<Participant>) {
108         participants.emit(newParticipants.distinct())
109     }
110 
111     override suspend fun updateActiveParticipant(participant: Participant?) {
112         activeParticipant.emit(participant)
113     }
114 
115     override fun addRaiseHandSupport(
116         initialRaisedHands: List<Participant>,
117         onHandRaisedChanged: suspend (Boolean) -> Unit
118     ): RaiseHandState {
119         val state = RaiseHandStateImpl(participants, initialRaisedHands, onHandRaisedChanged)
120         registerAction(RAISE_HAND_ACTION, connector = state::connect)
121         return state
122     }
123 
124     override fun addKickParticipantSupport(onKickParticipant: suspend (Participant) -> Unit) {
125         val state = KickParticipantState(participants, onKickParticipant)
126         registerAction(KICK_PARTICIPANT_ACTION) { _, repo, _ -> state.connect(repo) }
127     }
128 
129     /**
130      * Setup the participant extension creation callback receiver and return the Capability of this
131      * extension to be shared with the remote.
132      */
133     internal fun onParticipantExchangeStarted(callbacks: CapabilityExchangeRepository): Capability {
134         callbacks.onCreateParticipantExtension = ::onCreateParticipantExtension
135         return Capability().apply {
136             featureId = Extensions.PARTICIPANT
137             featureVersion = VERSION
138             supportedActions = actionRemoteConnector.keys.toIntArray()
139         }
140     }
141 
142     /**
143      * Setup the Meeting Summary extension creation callback receiver and return the Capability of
144      * this extension to be shared with the remote.
145      */
146     internal fun onMeetingSummaryExchangeStarted(
147         callbacks: CapabilityExchangeRepository
148     ): Capability {
149         callbacks.onMeetingSummaryExtension = ::onCreateMeetingSummaryExtension
150         return Capability().apply {
151             featureId = Extensions.MEETING_SUMMARY
152             featureVersion = MEETING_SUMMARY_VERSION
153             supportedActions = actionRemoteConnector.keys.toIntArray()
154         }
155     }
156 
157     /**
158      * Register an action to this extension
159      *
160      * @param action The identifier of the action, which will be shared with the remote
161      * @param connector The method that is called every time a new remote connects to the action in
162      *   order to facilitate connecting this action to the remote.
163      */
164     private fun registerAction(action: Int, connector: ActionConnector) {
165         actionRemoteConnector[action] = connector
166     }
167 
168     /**
169      * Creates and initializes the meeting summary extension.
170      *
171      * This function is responsible for setting up the meeting summary extension, synchronizing the
172      * initial state with the remote, and establishing listeners for changes to the participant
173      * count and the current speaker.
174      *
175      * The process involves:
176      * 1. **Initial State Synchronization:** Retrieves the initial values of the `participants` list
177      *    and the `activeParticipant` from their respective `StateFlow`s. It then sends these
178      *    initial values to the remote side using the provided `binder`.
179      * 2. **Setting up Flow Listeners:** Creates `Flow` pipelines using `onEach`, `combine`, and
180      *    `distinctUntilChanged` to observe changes to both the `participants` list and the
181      *    `activeParticipant`.
182      *     - `participants.onEach`: This listener triggers whenever the `participants` list changes.
183      *       It sends the updated participant count to the remote.
184      *     - `combine(activeParticipant)`: This operator combines the latest values from the
185      *       `participants` flow and the `activeParticipant` flow. It emits a new value whenever
186      *       *either* flow emits. The lambda function checks if the `activeParticipant` is still
187      *       present in the `participants` list. If not, it emits `null`.
188      *     - `distinctUntilChanged`: This operator ensures that the downstream flow only receives
189      *       updates when the value emitted by `combine` actually changes. This prevents redundant
190      *       updates to the remote.
191      *     - `onEach`: This listener triggers whenever the combined and filtered value changes. It
192      *       sends the updated current speaker (or null) to the remote.
193      *     - `launchIn(coroutineScope)`: This terminal operator launches the entire flow pipeline in
194      *       the provided `coroutineScope`. This means the listeners will remain active as long as
195      *       the `coroutineScope` is active.
196      * 3. **Finishing Synchronization:** After setting up the listeners, it calls
197      *    `binder.finishSync()` to signal to the remote side that the initial synchronization is
198      *    complete.
199      *
200      * @param coroutineScope The [CoroutineScope] in which the flow listeners will be launched. This
201      *   scope should be tied to the lifecycle of the component managing the meeting summary
202      *   extension to ensure that the listeners are automatically cancelled when the component is
203      *   destroyed.
204      * @param binder The [MeetingSummaryStateListenerRemote] instance used to communicate with the
205      *   remote side. This binder provides methods for updating the participant count and current
206      *   speaker.
207      */
208     private fun onCreateMeetingSummaryExtension(
209         coroutineScope: CoroutineScope,
210         binder: MeetingSummaryStateListenerRemote
211     ) {
212         Log.i(LOG_TAG, "onCreateMeetingSummaryExtension")
213         // sync state
214         val initParticipants = participants.value
215         val initActiveParticipant = activeParticipant.value?.name.toString()
216         binder.updateParticipantCount(initParticipants.size)
217         binder.updateCurrentSpeaker(initActiveParticipant)
218         // Setup listeners for changes to state
219         participants
220             .onEach { updatedParticipants ->
221                 Log.i(LOG_TAG, "to remote: updateParticipantCount: ${updatedParticipants.size}")
222                 binder.updateParticipantCount(updatedParticipants.size)
223             }
224             .combine(activeParticipant) { p, a ->
225                 val result = if (a != null && p.contains(a)) a else null
226                 Log.v(LOG_TAG, "combine: $p + $a = $result")
227                 result
228             }
229             .distinctUntilChanged()
230             .onEach {
231                 Log.i(LOG_TAG, "to remote: updateCurrentSpeaker=${it?.name.toString()}")
232                 binder.updateCurrentSpeaker(it?.name.toString())
233             }
234             .launchIn(coroutineScope)
235         binder.finishSync()
236     }
237 
238     /**
239      * Function registered to [ExtensionInitializationScope] in order to handle the creation of the
240      * participant extension.
241      *
242      * @param coroutineScope the CoroutineScope used to launch tasks associated with participants
243      * @param remoteActions the actions reported as supported from the remote InCallService side
244      * @param binder the interface used to communicate with the remote InCallService.
245      */
246     private fun onCreateParticipantExtension(
247         coroutineScope: CoroutineScope,
248         remoteActions: Set<Int>,
249         binder: ParticipantStateListenerRemote
250     ) {
251         Log.i(LOG_TAG, "onCreatePE: actions=$remoteActions")
252 
253         // Synchronize initial state with remote
254         val initParticipants = participants.value.distinct()
255         val initActiveParticipant = activeParticipant.value
256         binder.updateParticipants(initParticipants)
257         if (initActiveParticipant != null && initParticipants.contains(initActiveParticipant)) {
258             binder.updateActiveParticipant(initActiveParticipant)
259         } else {
260             binder.updateActiveParticipant(null)
261         }
262 
263         // Setup listeners for changes to state
264         participants
265             .onEach { updatedParticipants ->
266                 Log.i(LOG_TAG, "to remote: updateParticipants: $updatedParticipants")
267                 binder.updateParticipants(updatedParticipants)
268             }
269             .combine(activeParticipant) { p, a ->
270                 val result = if (a != null && p.contains(a)) a else null
271                 Log.d(LOG_TAG, "combine: $p + $a = $result")
272                 result
273             }
274             .distinctUntilChanged()
275             .onEach {
276                 Log.d(LOG_TAG, "to remote: updateActiveParticipant=$it")
277                 binder.updateActiveParticipant(it)
278             }
279             .launchIn(coroutineScope)
280         Log.d(LOG_TAG, "onCreatePE: finished state update")
281 
282         // Setup one callback repository per connection to remote
283         val callbackRepository = ParticipantActionCallbackRepository(coroutineScope)
284         // Set up actions (only where the remote side supports it)
285         actionRemoteConnector
286             .filter { entry -> remoteActions.contains(entry.key) }
287             .map { entry -> entry.value }
288             .forEach { initializer -> initializer(coroutineScope, callbackRepository, binder) }
289         Log.d(LOG_TAG, "onCreatePE: calling finishSync")
290         binder.finishSync(callbackRepository.eventListener)
291     }
292 }
293