1 /*
<lambda>null2 * Copyright (C) 2022 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package com.android.systemui.util.kotlin
18
19 import com.android.app.tracing.coroutines.launchTraced as launch
20 import com.android.systemui.util.time.SystemClock
21 import com.android.systemui.util.time.SystemClockImpl
22 import java.util.LinkedList
23 import java.util.concurrent.atomic.AtomicReference
24 import kotlin.math.max
25 import kotlin.time.Duration
26 import kotlin.time.Duration.Companion.milliseconds
27 import kotlinx.coroutines.CoroutineStart
28 import kotlinx.coroutines.Dispatchers
29 import kotlinx.coroutines.Job
30 import kotlinx.coroutines.coroutineScope
31 import kotlinx.coroutines.delay
32 import kotlinx.coroutines.flow.Flow
33 import kotlinx.coroutines.flow.MutableSharedFlow
34 import kotlinx.coroutines.flow.channelFlow
35 import kotlinx.coroutines.flow.distinctUntilChanged
36 import kotlinx.coroutines.flow.filter
37 import kotlinx.coroutines.flow.flow
38 import kotlinx.coroutines.flow.map
39 import kotlinx.coroutines.flow.onStart
40
41 /**
42 * Returns a new [Flow] that combines the two most recent emissions from [this] using [transform].
43 * Note that the new Flow will not start emitting until it has received two emissions from the
44 * upstream Flow.
45 *
46 * Useful for code that needs to compare the current value to the previous value.
47 */
48 fun <T, R> Flow<T>.pairwiseBy(transform: suspend (old: T, new: T) -> R): Flow<R> = flow {
49 val noVal = Any()
50 var previousValue: Any? = noVal
51 collect { newVal ->
52 if (previousValue != noVal) {
53 @Suppress("UNCHECKED_CAST") emit(transform(previousValue as T, newVal))
54 }
55 previousValue = newVal
56 }
57 }
58
59 /**
60 * Returns a new [Flow] that combines the two most recent emissions from [this] using [transform].
61 * [initialValue] will be used as the "old" value for the first emission.
62 *
63 * Useful for code that needs to compare the current value to the previous value.
64 */
pairwiseBynull65 fun <S, T : S, R> Flow<T>.pairwiseBy(
66 initialValue: S,
67 transform: suspend (previousValue: S, newValue: T) -> R,
68 ): Flow<R> = pairwiseBy(getInitialValue = { initialValue }, transform)
69
70 /**
71 * Returns a new [Flow] that combines the two most recent emissions from [this] using [transform].
72 *
73 * The output of [getInitialValue] will be used as the "old" value for the first emission. As
74 * opposed to the initial value in the above [pairwiseBy], [getInitialValue] can do some work before
75 * returning the initial value.
76 *
77 * Useful for code that needs to compare the current value to the previous value.
78 */
pairwiseBynull79 fun <S, T : S, R> Flow<T>.pairwiseBy(
80 getInitialValue: suspend () -> S,
81 transform: suspend (previousValue: S, newValue: T) -> R,
82 ): Flow<R> = flow {
83 var previousValue: S = getInitialValue()
84 collect { newVal ->
85 emit(transform(previousValue, newVal))
86 previousValue = newVal
87 }
88 }
89
90 /**
91 * Returns a new [Flow] that produces the two most recent emissions from [this]. Note that the new
92 * Flow will not start emitting until it has received two emissions from the upstream Flow.
93 *
94 * Useful for code that needs to compare the current value to the previous value.
95 */
pairwisenull96 fun <T> Flow<T>.pairwise(): Flow<WithPrev<T, T>> = pairwiseBy(::WithPrev)
97
98 /**
99 * Returns a new [Flow] that produces the two most recent emissions from [this]. [initialValue] will
100 * be used as the "old" value for the first emission.
101 *
102 * Useful for code that needs to compare the current value to the previous value.
103 */
104 fun <S, T : S> Flow<T>.pairwise(initialValue: S): Flow<WithPrev<S, T>> =
105 pairwiseBy(initialValue, ::WithPrev)
106
107 /** Holds a [newValue] emitted from a [Flow], along with the [previousValue] emitted value. */
108 data class WithPrev<out S, out T : S>(val previousValue: S, val newValue: T)
109
110 /** Emits a [Unit] only when the number of downstream subscribers of this flow increases. */
111 fun <T> MutableSharedFlow<T>.onSubscriberAdded(): Flow<Unit> {
112 return subscriptionCount
113 .pairwise(initialValue = 0)
114 .filter { (previous, current) -> current > previous }
115 .map {}
116 }
117
118 /**
119 * Returns a new [Flow] that combines the [Set] changes between each emission from [this] using
120 * [transform].
121 *
122 * If [emitFirstEvent] is `true`, then the first [Set] emitted from the upstream [Flow] will cause a
123 * change event to be emitted that contains no removals, and all elements from that first [Set] as
124 * additions.
125 *
126 * If [emitFirstEvent] is `false`, then the first emission is ignored and no changes are emitted
127 * until a second [Set] has been emitted from the upstream [Flow].
128 */
setChangesBynull129 fun <T, R> Flow<Set<T>>.setChangesBy(
130 transform: suspend (removed: Set<T>, added: Set<T>) -> R,
131 emitFirstEvent: Boolean = true,
132 ): Flow<R> =
133 (if (emitFirstEvent) onStart { emit(emptySet()) } else this)
134 .distinctUntilChanged()
newnull135 .pairwiseBy { old: Set<T>, new: Set<T> ->
136 // If an element was present in the old set, but not the new one, then it was removed
137 val removed = old - new
138 // If an element is present in the new set, but on the old one, then it was added
139 val added = new - old
140 transform(removed, added)
141 }
142
143 /**
144 * Returns a new [Flow] that produces the [Set] changes between each emission from [this].
145 *
146 * If [emitFirstEvent] is `true`, then the first [Set] emitted from the upstream [Flow] will cause a
147 * change event to be emitted that contains no removals, and all elements from that first [Set] as
148 * additions.
149 *
150 * If [emitFirstEvent] is `false`, then the first emission is ignored and no changes are emitted
151 * until a second [Set] has been emitted from the upstream [Flow].
152 */
setChangesnull153 fun <T> Flow<Set<T>>.setChanges(emitFirstEvent: Boolean = true): Flow<SetChanges<T>> =
154 setChangesBy(::SetChanges, emitFirstEvent)
155
156 /** Contains the difference in elements between two [Set]s. */
157 data class SetChanges<T>(
158 /** Elements that are present in the first [Set] but not in the second. */
159 val removed: Set<T>,
160 /** Elements that are present in the second [Set] but not in the first. */
161 val added: Set<T>,
162 )
163
164 /**
165 * Returns a new [Flow] that emits at the same rate as [this], but combines the emitted value with
166 * the most recent emission from [other] using [transform].
167 *
168 * Note that the returned Flow will not emit anything until [other] has emitted at least one value.
169 */
170 fun <A, B, C> Flow<A>.sample(other: Flow<B>, transform: suspend (A, B) -> C): Flow<C> = flow {
171 coroutineScope {
172 val noVal = Any()
173 val sampledRef = AtomicReference(noVal)
174 val job = launch(context = Dispatchers.Unconfined) { other.collect { sampledRef.set(it) } }
175 collect {
176 val sampled = sampledRef.get()
177 if (sampled != noVal) {
178 @Suppress("UNCHECKED_CAST") emit(transform(it, sampled as B))
179 }
180 }
181 job.cancel()
182 }
183 }
184
185 /**
186 * Returns a new [Flow] that emits at the same rate as [this], but emits the most recently emitted
187 * value from [other] instead.
188 *
189 * Note that the returned Flow will not emit anything until [other] has emitted at least one value.
190 */
anull191 fun <A> Flow<*>.sample(other: Flow<A>): Flow<A> = sample(other) { _, a -> a }
192
193 /**
194 * Returns a flow that mirrors the original flow, but delays values following emitted values for the
195 * given [periodMs] as reported by the given [clock]. If the original flow emits more than one value
196 * during this period, only The latest value is emitted.
197 *
198 * Example:
199 * ```kotlin
200 * flow {
201 * emit(1) // t=0ms
202 * delay(90)
203 * emit(2) // t=90ms
204 * delay(90)
205 * emit(3) // t=180ms
206 * delay(1010)
207 * emit(4) // t=1190ms
208 * delay(1010)
209 * emit(5) // t=2200ms
210 * }.throttle(1000)
211 * ```
212 *
213 * produces the following emissions at the following times
214 *
215 * ```text
216 * 1 (t=0ms), 3 (t=1000ms), 4 (t=2000ms), 5 (t=3000ms)
217 * ```
218 */
throttlenull219 fun <T> Flow<T>.throttle(periodMs: Long, clock: SystemClock = SystemClockImpl()): Flow<T> =
220 channelFlow {
221 coroutineScope {
222 var previousEmitTimeMs = 0L
223 var delayJob: Job? = null
224 var sendJob: Job? = null
225 val outerScope = this
226
227 collect {
228 delayJob?.cancel()
229 sendJob?.join()
230 val currentTimeMs = clock.elapsedRealtime()
231 val timeSinceLastEmit = currentTimeMs - previousEmitTimeMs
232 val timeUntilNextEmit = max(0L, periodMs - timeSinceLastEmit)
233 if (timeUntilNextEmit > 0L) {
234 // We create delayJob to allow cancellation during the delay period
235 delayJob = launch {
236 delay(timeUntilNextEmit)
237 sendJob =
238 outerScope.launch(start = CoroutineStart.UNDISPATCHED) {
239 send(it)
240 previousEmitTimeMs = clock.elapsedRealtime()
241 }
242 }
243 } else {
244 send(it)
245 previousEmitTimeMs = currentTimeMs
246 }
247 }
248 }
249 }
250
combinenull251 inline fun <T1, T2, T3, T4, T5, T6, R> combine(
252 flow: Flow<T1>,
253 flow2: Flow<T2>,
254 flow3: Flow<T3>,
255 flow4: Flow<T4>,
256 flow5: Flow<T5>,
257 flow6: Flow<T6>,
258 crossinline transform: suspend (T1, T2, T3, T4, T5, T6) -> R,
259 ): Flow<R> {
260 return kotlinx.coroutines.flow.combine(flow, flow2, flow3, flow4, flow5, flow6) { args: Array<*>
261 ->
262 @Suppress("UNCHECKED_CAST")
263 transform(
264 args[0] as T1,
265 args[1] as T2,
266 args[2] as T3,
267 args[3] as T4,
268 args[4] as T5,
269 args[5] as T6,
270 )
271 }
272 }
273
combinenull274 inline fun <T1, T2, T3, T4, T5, T6, T7, R> combine(
275 flow: Flow<T1>,
276 flow2: Flow<T2>,
277 flow3: Flow<T3>,
278 flow4: Flow<T4>,
279 flow5: Flow<T5>,
280 flow6: Flow<T6>,
281 flow7: Flow<T7>,
282 crossinline transform: suspend (T1, T2, T3, T4, T5, T6, T7) -> R,
283 ): Flow<R> {
284 return kotlinx.coroutines.flow.combine(flow, flow2, flow3, flow4, flow5, flow6, flow7) {
285 args: Array<*> ->
286 @Suppress("UNCHECKED_CAST")
287 transform(
288 args[0] as T1,
289 args[1] as T2,
290 args[2] as T3,
291 args[3] as T4,
292 args[4] as T5,
293 args[5] as T6,
294 args[6] as T7,
295 )
296 }
297 }
298
combinenull299 inline fun <T1, T2, T3, T4, T5, T6, T7, T8, R> combine(
300 flow: Flow<T1>,
301 flow2: Flow<T2>,
302 flow3: Flow<T3>,
303 flow4: Flow<T4>,
304 flow5: Flow<T5>,
305 flow6: Flow<T6>,
306 flow7: Flow<T7>,
307 flow8: Flow<T8>,
308 crossinline transform: suspend (T1, T2, T3, T4, T5, T6, T7, T8) -> R,
309 ): Flow<R> {
310 return kotlinx.coroutines.flow.combine(flow, flow2, flow3, flow4, flow5, flow6, flow7, flow8) {
311 args: Array<*> ->
312 @Suppress("UNCHECKED_CAST")
313 transform(
314 args[0] as T1,
315 args[1] as T2,
316 args[2] as T3,
317 args[3] as T4,
318 args[4] as T5,
319 args[5] as T6,
320 args[6] as T7,
321 args[7] as T8,
322 )
323 }
324 }
325
combinenull326 inline fun <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> combine(
327 flow: Flow<T1>,
328 flow2: Flow<T2>,
329 flow3: Flow<T3>,
330 flow4: Flow<T4>,
331 flow5: Flow<T5>,
332 flow6: Flow<T6>,
333 flow7: Flow<T7>,
334 flow8: Flow<T8>,
335 flow9: Flow<T9>,
336 crossinline transform: suspend (T1, T2, T3, T4, T5, T6, T7, T8, T9) -> R,
337 ): Flow<R> {
338 return kotlinx.coroutines.flow.combine(
339 flow,
340 flow2,
341 flow3,
342 flow4,
343 flow5,
344 flow6,
345 flow7,
346 flow8,
347 flow9,
348 ) { args: Array<*> ->
349 @Suppress("UNCHECKED_CAST")
350 transform(
351 args[0] as T1,
352 args[1] as T2,
353 args[2] as T3,
354 args[3] as T4,
355 args[4] as T5,
356 args[5] as T6,
357 args[6] as T7,
358 args[7] as T8,
359 args[8] as T9,
360 )
361 }
362 }
363
364 /**
365 * Returns a [Flow] that immediately emits [Unit] when started, then emits from the given upstream
366 * [Flow] as normal.
367 */
368 @Suppress("NOTHING_TO_INLINE")
<lambda>null369 inline fun Flow<Unit>.emitOnStart(): Flow<Unit> = onStart { emit(Unit) }
370
371 /**
372 * Transforms a Flow<T> into a Flow<List<T>> by implementing a sliding window algorithm.
373 *
374 * This function creates a sliding window over the input Flow<T>. The window has a specified
375 * [windowDuration] and slides continuously as time progresses. The emitted List<T> contains all
376 * items from the input flow that fall within the current window.
377 *
378 * The window slides forward by the smallest possible increment to include or exclude *one* event
379 * based on the time the event was emitted (determined by the System.currentTimeMillis()). This
380 * means that consecutive emitted lists will have overlapping elements if the elements fall within
381 * the [windowDuration]
382 *
383 * @param windowDuration The duration of the sliding window.
384 * @return A Flow that emits Lists of elements within the current sliding window.
385 */
slidingWindownull386 fun <T> Flow<T>.slidingWindow(
387 windowDuration: Duration,
388 clock: SystemClock = SystemClockImpl(),
389 ): Flow<List<T>> = channelFlow {
390 require(windowDuration.isPositive()) { "Window duration must be positive" }
391 val buffer = LinkedList<Pair<Duration, T>>()
392
393 coroutineScope {
394 var windowAdvancementJob: Job? = null
395
396 collect { value ->
397 windowAdvancementJob?.cancel()
398 val now = clock.currentTimeMillis().milliseconds
399 buffer.addLast(now to value)
400
401 while (buffer.isNotEmpty() && buffer.first.first + windowDuration <= now) {
402 buffer.removeFirst()
403 }
404 send(buffer.map { it.second })
405
406 // Keep the window advancing through time even if the source flow isn't emitting
407 // anymore. We stop advancing the window as soon as there are no items left in the
408 // buffer.
409 windowAdvancementJob = launch {
410 while (buffer.isNotEmpty()) {
411 val startOfWindow = clock.currentTimeMillis().milliseconds - windowDuration
412 // Invariant: At this point, everything in the buffer is guaranteed to be in
413 // the window, as we removed expired items above.
414 val timeUntilNextOldest =
415 (buffer.first.first - startOfWindow).coerceAtLeast(0.milliseconds)
416 delay(timeUntilNextOldest)
417 // Remove the oldest item, as it has now fallen out of the window.
418 buffer.removeFirst()
419 send(buffer.map { it.second })
420 }
421 }
422 }
423 }
424 }
425