• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.systemui.util.kotlin
18 
19 import com.android.app.tracing.coroutines.launchTraced as launch
20 import com.android.systemui.util.time.SystemClock
21 import com.android.systemui.util.time.SystemClockImpl
22 import java.util.LinkedList
23 import java.util.concurrent.atomic.AtomicReference
24 import kotlin.math.max
25 import kotlin.time.Duration
26 import kotlin.time.Duration.Companion.milliseconds
27 import kotlinx.coroutines.CoroutineStart
28 import kotlinx.coroutines.Dispatchers
29 import kotlinx.coroutines.Job
30 import kotlinx.coroutines.coroutineScope
31 import kotlinx.coroutines.delay
32 import kotlinx.coroutines.flow.Flow
33 import kotlinx.coroutines.flow.MutableSharedFlow
34 import kotlinx.coroutines.flow.channelFlow
35 import kotlinx.coroutines.flow.distinctUntilChanged
36 import kotlinx.coroutines.flow.filter
37 import kotlinx.coroutines.flow.flow
38 import kotlinx.coroutines.flow.map
39 import kotlinx.coroutines.flow.onStart
40 
41 /**
42  * Returns a new [Flow] that combines the two most recent emissions from [this] using [transform].
43  * Note that the new Flow will not start emitting until it has received two emissions from the
44  * upstream Flow.
45  *
46  * Useful for code that needs to compare the current value to the previous value.
47  */
48 fun <T, R> Flow<T>.pairwiseBy(transform: suspend (old: T, new: T) -> R): Flow<R> = flow {
49     val noVal = Any()
50     var previousValue: Any? = noVal
51     collect { newVal ->
52         if (previousValue != noVal) {
53             @Suppress("UNCHECKED_CAST") emit(transform(previousValue as T, newVal))
54         }
55         previousValue = newVal
56     }
57 }
58 
59 /**
60  * Returns a new [Flow] that combines the two most recent emissions from [this] using [transform].
61  * [initialValue] will be used as the "old" value for the first emission.
62  *
63  * Useful for code that needs to compare the current value to the previous value.
64  */
pairwiseBynull65 fun <S, T : S, R> Flow<T>.pairwiseBy(
66     initialValue: S,
67     transform: suspend (previousValue: S, newValue: T) -> R,
68 ): Flow<R> = pairwiseBy(getInitialValue = { initialValue }, transform)
69 
70 /**
71  * Returns a new [Flow] that combines the two most recent emissions from [this] using [transform].
72  *
73  * The output of [getInitialValue] will be used as the "old" value for the first emission. As
74  * opposed to the initial value in the above [pairwiseBy], [getInitialValue] can do some work before
75  * returning the initial value.
76  *
77  * Useful for code that needs to compare the current value to the previous value.
78  */
pairwiseBynull79 fun <S, T : S, R> Flow<T>.pairwiseBy(
80     getInitialValue: suspend () -> S,
81     transform: suspend (previousValue: S, newValue: T) -> R,
82 ): Flow<R> = flow {
83     var previousValue: S = getInitialValue()
84     collect { newVal ->
85         emit(transform(previousValue, newVal))
86         previousValue = newVal
87     }
88 }
89 
90 /**
91  * Returns a new [Flow] that produces the two most recent emissions from [this]. Note that the new
92  * Flow will not start emitting until it has received two emissions from the upstream Flow.
93  *
94  * Useful for code that needs to compare the current value to the previous value.
95  */
pairwisenull96 fun <T> Flow<T>.pairwise(): Flow<WithPrev<T, T>> = pairwiseBy(::WithPrev)
97 
98 /**
99  * Returns a new [Flow] that produces the two most recent emissions from [this]. [initialValue] will
100  * be used as the "old" value for the first emission.
101  *
102  * Useful for code that needs to compare the current value to the previous value.
103  */
104 fun <S, T : S> Flow<T>.pairwise(initialValue: S): Flow<WithPrev<S, T>> =
105     pairwiseBy(initialValue, ::WithPrev)
106 
107 /** Holds a [newValue] emitted from a [Flow], along with the [previousValue] emitted value. */
108 data class WithPrev<out S, out T : S>(val previousValue: S, val newValue: T)
109 
110 /** Emits a [Unit] only when the number of downstream subscribers of this flow increases. */
111 fun <T> MutableSharedFlow<T>.onSubscriberAdded(): Flow<Unit> {
112     return subscriptionCount
113         .pairwise(initialValue = 0)
114         .filter { (previous, current) -> current > previous }
115         .map {}
116 }
117 
118 /**
119  * Returns a new [Flow] that combines the [Set] changes between each emission from [this] using
120  * [transform].
121  *
122  * If [emitFirstEvent] is `true`, then the first [Set] emitted from the upstream [Flow] will cause a
123  * change event to be emitted that contains no removals, and all elements from that first [Set] as
124  * additions.
125  *
126  * If [emitFirstEvent] is `false`, then the first emission is ignored and no changes are emitted
127  * until a second [Set] has been emitted from the upstream [Flow].
128  */
setChangesBynull129 fun <T, R> Flow<Set<T>>.setChangesBy(
130     transform: suspend (removed: Set<T>, added: Set<T>) -> R,
131     emitFirstEvent: Boolean = true,
132 ): Flow<R> =
133     (if (emitFirstEvent) onStart { emit(emptySet()) } else this)
134         .distinctUntilChanged()
newnull135         .pairwiseBy { old: Set<T>, new: Set<T> ->
136             // If an element was present in the old set, but not the new one, then it was removed
137             val removed = old - new
138             // If an element is present in the new set, but on the old one, then it was added
139             val added = new - old
140             transform(removed, added)
141         }
142 
143 /**
144  * Returns a new [Flow] that produces the [Set] changes between each emission from [this].
145  *
146  * If [emitFirstEvent] is `true`, then the first [Set] emitted from the upstream [Flow] will cause a
147  * change event to be emitted that contains no removals, and all elements from that first [Set] as
148  * additions.
149  *
150  * If [emitFirstEvent] is `false`, then the first emission is ignored and no changes are emitted
151  * until a second [Set] has been emitted from the upstream [Flow].
152  */
setChangesnull153 fun <T> Flow<Set<T>>.setChanges(emitFirstEvent: Boolean = true): Flow<SetChanges<T>> =
154     setChangesBy(::SetChanges, emitFirstEvent)
155 
156 /** Contains the difference in elements between two [Set]s. */
157 data class SetChanges<T>(
158     /** Elements that are present in the first [Set] but not in the second. */
159     val removed: Set<T>,
160     /** Elements that are present in the second [Set] but not in the first. */
161     val added: Set<T>,
162 )
163 
164 /**
165  * Returns a new [Flow] that emits at the same rate as [this], but combines the emitted value with
166  * the most recent emission from [other] using [transform].
167  *
168  * Note that the returned Flow will not emit anything until [other] has emitted at least one value.
169  */
170 fun <A, B, C> Flow<A>.sample(other: Flow<B>, transform: suspend (A, B) -> C): Flow<C> = flow {
171     coroutineScope {
172         val noVal = Any()
173         val sampledRef = AtomicReference(noVal)
174         val job = launch(context = Dispatchers.Unconfined) { other.collect { sampledRef.set(it) } }
175         collect {
176             val sampled = sampledRef.get()
177             if (sampled != noVal) {
178                 @Suppress("UNCHECKED_CAST") emit(transform(it, sampled as B))
179             }
180         }
181         job.cancel()
182     }
183 }
184 
185 /**
186  * Returns a new [Flow] that emits at the same rate as [this], but emits the most recently emitted
187  * value from [other] instead.
188  *
189  * Note that the returned Flow will not emit anything until [other] has emitted at least one value.
190  */
anull191 fun <A> Flow<*>.sample(other: Flow<A>): Flow<A> = sample(other) { _, a -> a }
192 
193 /**
194  * Returns a flow that mirrors the original flow, but delays values following emitted values for the
195  * given [periodMs] as reported by the given [clock]. If the original flow emits more than one value
196  * during this period, only The latest value is emitted.
197  *
198  * Example:
199  * ```kotlin
200  * flow {
201  *     emit(1)     // t=0ms
202  *     delay(90)
203  *     emit(2)     // t=90ms
204  *     delay(90)
205  *     emit(3)     // t=180ms
206  *     delay(1010)
207  *     emit(4)     // t=1190ms
208  *     delay(1010)
209  *     emit(5)     // t=2200ms
210  * }.throttle(1000)
211  * ```
212  *
213  * produces the following emissions at the following times
214  *
215  * ```text
216  * 1 (t=0ms), 3 (t=1000ms), 4 (t=2000ms), 5 (t=3000ms)
217  * ```
218  */
throttlenull219 fun <T> Flow<T>.throttle(periodMs: Long, clock: SystemClock = SystemClockImpl()): Flow<T> =
220     channelFlow {
221         coroutineScope {
222             var previousEmitTimeMs = 0L
223             var delayJob: Job? = null
224             var sendJob: Job? = null
225             val outerScope = this
226 
227             collect {
228                 delayJob?.cancel()
229                 sendJob?.join()
230                 val currentTimeMs = clock.elapsedRealtime()
231                 val timeSinceLastEmit = currentTimeMs - previousEmitTimeMs
232                 val timeUntilNextEmit = max(0L, periodMs - timeSinceLastEmit)
233                 if (timeUntilNextEmit > 0L) {
234                     // We create delayJob to allow cancellation during the delay period
235                     delayJob = launch {
236                         delay(timeUntilNextEmit)
237                         sendJob =
238                             outerScope.launch(start = CoroutineStart.UNDISPATCHED) {
239                                 send(it)
240                                 previousEmitTimeMs = clock.elapsedRealtime()
241                             }
242                     }
243                 } else {
244                     send(it)
245                     previousEmitTimeMs = currentTimeMs
246                 }
247             }
248         }
249     }
250 
combinenull251 inline fun <T1, T2, T3, T4, T5, T6, R> combine(
252     flow: Flow<T1>,
253     flow2: Flow<T2>,
254     flow3: Flow<T3>,
255     flow4: Flow<T4>,
256     flow5: Flow<T5>,
257     flow6: Flow<T6>,
258     crossinline transform: suspend (T1, T2, T3, T4, T5, T6) -> R,
259 ): Flow<R> {
260     return kotlinx.coroutines.flow.combine(flow, flow2, flow3, flow4, flow5, flow6) { args: Array<*>
261         ->
262         @Suppress("UNCHECKED_CAST")
263         transform(
264             args[0] as T1,
265             args[1] as T2,
266             args[2] as T3,
267             args[3] as T4,
268             args[4] as T5,
269             args[5] as T6,
270         )
271     }
272 }
273 
combinenull274 inline fun <T1, T2, T3, T4, T5, T6, T7, R> combine(
275     flow: Flow<T1>,
276     flow2: Flow<T2>,
277     flow3: Flow<T3>,
278     flow4: Flow<T4>,
279     flow5: Flow<T5>,
280     flow6: Flow<T6>,
281     flow7: Flow<T7>,
282     crossinline transform: suspend (T1, T2, T3, T4, T5, T6, T7) -> R,
283 ): Flow<R> {
284     return kotlinx.coroutines.flow.combine(flow, flow2, flow3, flow4, flow5, flow6, flow7) {
285         args: Array<*> ->
286         @Suppress("UNCHECKED_CAST")
287         transform(
288             args[0] as T1,
289             args[1] as T2,
290             args[2] as T3,
291             args[3] as T4,
292             args[4] as T5,
293             args[5] as T6,
294             args[6] as T7,
295         )
296     }
297 }
298 
combinenull299 inline fun <T1, T2, T3, T4, T5, T6, T7, T8, R> combine(
300     flow: Flow<T1>,
301     flow2: Flow<T2>,
302     flow3: Flow<T3>,
303     flow4: Flow<T4>,
304     flow5: Flow<T5>,
305     flow6: Flow<T6>,
306     flow7: Flow<T7>,
307     flow8: Flow<T8>,
308     crossinline transform: suspend (T1, T2, T3, T4, T5, T6, T7, T8) -> R,
309 ): Flow<R> {
310     return kotlinx.coroutines.flow.combine(flow, flow2, flow3, flow4, flow5, flow6, flow7, flow8) {
311         args: Array<*> ->
312         @Suppress("UNCHECKED_CAST")
313         transform(
314             args[0] as T1,
315             args[1] as T2,
316             args[2] as T3,
317             args[3] as T4,
318             args[4] as T5,
319             args[5] as T6,
320             args[6] as T7,
321             args[7] as T8,
322         )
323     }
324 }
325 
combinenull326 inline fun <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> combine(
327     flow: Flow<T1>,
328     flow2: Flow<T2>,
329     flow3: Flow<T3>,
330     flow4: Flow<T4>,
331     flow5: Flow<T5>,
332     flow6: Flow<T6>,
333     flow7: Flow<T7>,
334     flow8: Flow<T8>,
335     flow9: Flow<T9>,
336     crossinline transform: suspend (T1, T2, T3, T4, T5, T6, T7, T8, T9) -> R,
337 ): Flow<R> {
338     return kotlinx.coroutines.flow.combine(
339         flow,
340         flow2,
341         flow3,
342         flow4,
343         flow5,
344         flow6,
345         flow7,
346         flow8,
347         flow9,
348     ) { args: Array<*> ->
349         @Suppress("UNCHECKED_CAST")
350         transform(
351             args[0] as T1,
352             args[1] as T2,
353             args[2] as T3,
354             args[3] as T4,
355             args[4] as T5,
356             args[5] as T6,
357             args[6] as T7,
358             args[7] as T8,
359             args[8] as T9,
360         )
361     }
362 }
363 
364 /**
365  * Returns a [Flow] that immediately emits [Unit] when started, then emits from the given upstream
366  * [Flow] as normal.
367  */
368 @Suppress("NOTHING_TO_INLINE")
<lambda>null369 inline fun Flow<Unit>.emitOnStart(): Flow<Unit> = onStart { emit(Unit) }
370 
371 /**
372  * Transforms a Flow<T> into a Flow<List<T>> by implementing a sliding window algorithm.
373  *
374  * This function creates a sliding window over the input Flow<T>. The window has a specified
375  * [windowDuration] and slides continuously as time progresses. The emitted List<T> contains all
376  * items from the input flow that fall within the current window.
377  *
378  * The window slides forward by the smallest possible increment to include or exclude *one* event
379  * based on the time the event was emitted (determined by the System.currentTimeMillis()). This
380  * means that consecutive emitted lists will have overlapping elements if the elements fall within
381  * the [windowDuration]
382  *
383  * @param windowDuration The duration of the sliding window.
384  * @return A Flow that emits Lists of elements within the current sliding window.
385  */
slidingWindownull386 fun <T> Flow<T>.slidingWindow(
387     windowDuration: Duration,
388     clock: SystemClock = SystemClockImpl(),
389 ): Flow<List<T>> = channelFlow {
390     require(windowDuration.isPositive()) { "Window duration must be positive" }
391     val buffer = LinkedList<Pair<Duration, T>>()
392 
393     coroutineScope {
394         var windowAdvancementJob: Job? = null
395 
396         collect { value ->
397             windowAdvancementJob?.cancel()
398             val now = clock.currentTimeMillis().milliseconds
399             buffer.addLast(now to value)
400 
401             while (buffer.isNotEmpty() && buffer.first.first + windowDuration <= now) {
402                 buffer.removeFirst()
403             }
404             send(buffer.map { it.second })
405 
406             // Keep the window advancing through time even if the source flow isn't emitting
407             // anymore. We stop advancing the window as soon as there are no items left in the
408             // buffer.
409             windowAdvancementJob = launch {
410                 while (buffer.isNotEmpty()) {
411                     val startOfWindow = clock.currentTimeMillis().milliseconds - windowDuration
412                     // Invariant: At this point, everything in the buffer is guaranteed to be in
413                     // the window, as we removed expired items above.
414                     val timeUntilNextOldest =
415                         (buffer.first.first - startOfWindow).coerceAtLeast(0.milliseconds)
416                     delay(timeUntilNextOldest)
417                     // Remove the oldest item, as it has now fallen out of the window.
418                     buffer.removeFirst()
419                     send(buffer.map { it.second })
420                 }
421             }
422         }
423     }
424 }
425