1 /*
<lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 @file:JvmMultifileClass
6 @file:JvmName("FlowKt")
7 @file:Suppress("unused", "DeprecatedCallableAddReplaceWith", "UNUSED_PARAMETER", "NO_EXPLICIT_RETURN_TYPE_IN_API_MODE")
8
9 package kotlinx.coroutines.flow
10
11 import kotlinx.coroutines.*
12 import kotlin.coroutines.*
13 import kotlin.jvm.*
14
15 /**
16 * **GENERAL NOTE**
17 *
18 * These deprecations are added to improve user experience when they will start to
19 * search for their favourite operators and/or patterns that are missing or renamed in Flow.
20 * Deprecated functions also are moved here when they renamed. The difference is that they have
21 * a body with their implementation while pure stubs have [noImpl].
22 */
23 internal fun noImpl(): Nothing =
24 throw UnsupportedOperationException("Not implemented, should not be called")
25
26 /**
27 * `observeOn` has no direct match in [Flow] API because all terminal flow operators are suspending and
28 * thus use the context of the caller.
29 *
30 * For example, the following code:
31 * ```
32 * flowable
33 * .observeOn(Schedulers.io())
34 * .doOnEach { value -> println("Received $value") }
35 * .subscribe()
36 * ```
37 *
38 * has the following Flow equivalent:
39 * ```
40 * withContext(Dispatchers.IO) {
41 * flow.collect { value -> println("Received $value") }
42 * }
43 *
44 * ```
45 * @suppress
46 */
47 @Deprecated(message = "Collect flow in the desired context instead", level = DeprecationLevel.ERROR)
48 public fun <T> Flow<T>.observeOn(context: CoroutineContext): Flow<T> = noImpl()
49
50 /**
51 * `publishOn` has no direct match in [Flow] API because all terminal flow operators are suspending and
52 * thus use the context of the caller.
53 *
54 * For example, the following code:
55 * ```
56 * flux
57 * .publishOn(Schedulers.io())
58 * .doOnEach { value -> println("Received $value") }
59 * .subscribe()
60 * ```
61 *
62 * has the following Flow equivalent:
63 * ```
64 * withContext(Dispatchers.IO) {
65 * flow.collect { value -> println("Received $value") }
66 * }
67 *
68 * ```
69 * @suppress
70 */
71 @Deprecated(message = "Collect flow in the desired context instead", level = DeprecationLevel.ERROR)
72 public fun <T> Flow<T>.publishOn(context: CoroutineContext): Flow<T> = noImpl()
73
74 /**
75 * `subscribeOn` has no direct match in [Flow] API because [Flow] preserves its context and does not leak it.
76 *
77 * For example, the following code:
78 * ```
79 * flowable
80 * .map { value -> println("Doing map in IO"); value }
81 * .subscribeOn(Schedulers.io())
82 * .observeOn(Schedulers.computation())
83 * .doOnEach { value -> println("Processing $value in computation")
84 * .subscribe()
85 * ```
86 * has the following Flow equivalent:
87 * ```
88 * withContext(Dispatchers.Default) {
89 * flow
90 * .map { value -> println("Doing map in IO"); value }
91 * .flowOn(Dispatchers.IO) // Works upstream, doesn't change downstream
92 * .collect { value ->
93 * println("Processing $value in computation")
94 * }
95 * }
96 * ```
97 * Opposed to subscribeOn, it it **possible** to use multiple `flowOn` operators in the one flow
98 * @suppress
99 */
100 @Deprecated(message = "Use 'flowOn' instead", level = DeprecationLevel.ERROR)
101 public fun <T> Flow<T>.subscribeOn(context: CoroutineContext): Flow<T> = noImpl()
102
103 /**
104 * Flow analogue of `onErrorXxx` is [catch].
105 * Use `catch { emitAll(fallback) }`.
106 * @suppress
107 */
108 @Deprecated(
109 level = DeprecationLevel.ERROR,
110 message = "Flow analogue of 'onErrorXxx' is 'catch'. Use 'catch { emitAll(fallback) }'",
111 replaceWith = ReplaceWith("catch { emitAll(fallback) }")
112 )
113 public fun <T> Flow<T>.onErrorResume(fallback: Flow<T>): Flow<T> = noImpl()
114
115 /**
116 * Flow analogue of `onErrorXxx` is [catch].
117 * Use `catch { emitAll(fallback) }`.
118 * @suppress
119 */
120 @Deprecated(
121 level = DeprecationLevel.ERROR,
122 message = "Flow analogue of 'onErrorXxx' is 'catch'. Use 'catch { emitAll(fallback) }'",
123 replaceWith = ReplaceWith("catch { emitAll(fallback) }")
124 )
125 public fun <T> Flow<T>.onErrorResumeNext(fallback: Flow<T>): Flow<T> = noImpl()
126
127 /**
128 * `subscribe` is Rx-specific API that has no direct match in flows.
129 * One can use [launchIn] instead, for example the following:
130 * ```
131 * flowable
132 * .observeOn(Schedulers.io())
133 * .subscribe({ println("Received $it") }, { println("Exception $it happened") }, { println("Flowable is completed successfully") }
134 * ```
135 *
136 * has the following Flow equivalent:
137 * ```
138 * flow
139 * .onEach { value -> println("Received $value") }
140 * .onCompletion { cause -> if (cause == null) println("Flow is completed successfully") }
141 * .catch { cause -> println("Exception $cause happened") }
142 * .flowOn(Dispatchers.IO)
143 * .launchIn(myScope)
144 * ```
145 *
146 * Note that resulting value of [launchIn] is not used because the provided scope takes care of cancellation.
147 *
148 * Or terminal operators like [single] can be used from suspend functions.
149 * @suppress
150 */
151 @Deprecated(
152 message = "Use 'launchIn' with 'onEach', 'onCompletion' and 'catch' instead",
153 level = DeprecationLevel.ERROR
154 )
155 public fun <T> Flow<T>.subscribe(): Unit = noImpl()
156
157 /**
158 * Use [launchIn] with [onEach], [onCompletion] and [catch] operators instead.
159 * @suppress
160 */
161 @Deprecated(
162 message = "Use 'launchIn' with 'onEach', 'onCompletion' and 'catch' instead",
163 level = DeprecationLevel.ERROR
164 )public fun <T> Flow<T>.subscribe(onEach: suspend (T) -> Unit): Unit = noImpl()
165
166 /**
167 * Use [launchIn] with [onEach], [onCompletion] and [catch] operators instead.
168 * @suppress
169 */
170 @Deprecated(
171 message = "Use 'launchIn' with 'onEach', 'onCompletion' and 'catch' instead",
172 level = DeprecationLevel.ERROR
173 )public fun <T> Flow<T>.subscribe(onEach: suspend (T) -> Unit, onError: suspend (Throwable) -> Unit): Unit = noImpl()
174
175 /**
176 * Note that this replacement is sequential (`concat`) by default.
177 * For concurrent flatMap [flatMapMerge] can be used instead.
178 * @suppress
179 */
180 @Deprecated(
181 level = DeprecationLevel.ERROR,
182 message = "Flow analogue is 'flatMapConcat'",
183 replaceWith = ReplaceWith("flatMapConcat(mapper)")
184 )
185 public fun <T, R> Flow<T>.flatMap(mapper: suspend (T) -> Flow<R>): Flow<R> = noImpl()
186
187 /**
188 * Flow analogue of `concatMap` is [flatMapConcat].
189 * @suppress
190 */
191 @Deprecated(
192 level = DeprecationLevel.ERROR,
193 message = "Flow analogue of 'concatMap' is 'flatMapConcat'",
194 replaceWith = ReplaceWith("flatMapConcat(mapper)")
195 )
196 public fun <T, R> Flow<T>.concatMap(mapper: (T) -> Flow<R>): Flow<R> = noImpl()
197
198 /**
199 * Note that this replacement is sequential (`concat`) by default.
200 * For concurrent flatMap [flattenMerge] can be used instead.
201 * @suppress
202 */
203 @Deprecated(
204 level = DeprecationLevel.ERROR,
205 message = "Flow analogue of 'merge' is 'flattenConcat'",
206 replaceWith = ReplaceWith("flattenConcat()")
207 )
208 public fun <T> Flow<Flow<T>>.merge(): Flow<T> = noImpl()
209
210 /**
211 * Flow analogue of `flatten` is [flattenConcat].
212 * @suppress
213 */
214 @Deprecated(
215 level = DeprecationLevel.ERROR,
216 message = "Flow analogue of 'flatten' is 'flattenConcat'",
217 replaceWith = ReplaceWith("flattenConcat()")
218 )
219 public fun <T> Flow<Flow<T>>.flatten(): Flow<T> = noImpl()
220
221 /**
222 * Kotlin has a built-in generic mechanism for making chained calls.
223 * If you wish to write something like
224 * ```
225 * myFlow.compose(MyFlowExtensions.ignoreErrors()).collect { ... }
226 * ```
227 * you can replace it with
228 *
229 * ```
230 * myFlow.let(MyFlowExtensions.ignoreErrors()).collect { ... }
231 * ```
232 * @suppress
233 */
234 @Deprecated(
235 level = DeprecationLevel.ERROR,
236 message = "Flow analogue of 'compose' is 'let'",
237 replaceWith = ReplaceWith("let(transformer)")
238 )
239 public fun <T, R> Flow<T>.compose(transformer: Flow<T>.() -> Flow<R>): Flow<R> = noImpl()
240
241 /**
242 * Flow analogue of `skip` is [drop].
243 * @suppress
244 */
245 @Deprecated(
246 level = DeprecationLevel.ERROR,
247 message = "Flow analogue of 'skip' is 'drop'",
248 replaceWith = ReplaceWith("drop(count)")
249 )
250 public fun <T> Flow<T>.skip(count: Int): Flow<T> = noImpl()
251
252 /**
253 * Flow extension to iterate over elements is [collect].
254 * Foreach wasn't introduced deliberately to avoid confusion.
255 * Flow is not a collection, iteration over it may be not idempotent
256 * and can *launch* computations with side-effects.
257 * This behaviour is not reflected in [forEach] name.
258 * @suppress
259 */
260 @Deprecated(
261 level = DeprecationLevel.ERROR,
262 message = "Flow analogue of 'forEach' is 'collect'",
263 replaceWith = ReplaceWith("collect(block)")
264 )
265 public fun <T> Flow<T>.forEach(action: suspend (value: T) -> Unit): Unit = noImpl()
266
267 /**
268 * Flow has less verbose [scan] shortcut.
269 * @suppress
270 */
271 @Deprecated(
272 level = DeprecationLevel.ERROR,
273 message = "Flow has less verbose 'scan' shortcut",
274 replaceWith = ReplaceWith("scan(initial, operation)")
275 )
276 public fun <T, R> Flow<T>.scanFold(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R> =
277 noImpl()
278
279 /**
280 * Flow analogue of `onErrorXxx` is [catch].
281 * Use `catch { emit(fallback) }`.
282 * @suppress
283 */
284 @Deprecated(
285 level = DeprecationLevel.ERROR,
286 message = "Flow analogue of 'onErrorXxx' is 'catch'. Use 'catch { emit(fallback) }'",
287 replaceWith = ReplaceWith("catch { emit(fallback) }")
288 )
289 // Note: this version without predicate gives better "replaceWith" action
290 public fun <T> Flow<T>.onErrorReturn(fallback: T): Flow<T> = noImpl()
291
292 /**
293 * Flow analogue of `onErrorXxx` is [catch].
294 * Use `catch { e -> if (predicate(e)) emit(fallback) else throw e }`.
295 * @suppress
296 */
297 @Deprecated(
298 level = DeprecationLevel.ERROR,
299 message = "Flow analogue of 'onErrorXxx' is 'catch'. Use 'catch { e -> if (predicate(e)) emit(fallback) else throw e }'",
300 replaceWith = ReplaceWith("catch { e -> if (predicate(e)) emit(fallback) else throw e }")
301 )
302 public fun <T> Flow<T>.onErrorReturn(fallback: T, predicate: (Throwable) -> Boolean = { true }): Flow<T> =
enull303 catch { e ->
304 // Note: default value is for binary compatibility with preview version, that is why it has body
305 if (!predicate(e)) throw e
306 emit(fallback)
307 }
308
309 /**
310 * Flow analogue of `startWith` is [onStart].
311 * Use `onStart { emit(value) }`.
312 * @suppress
313 */
314 @Deprecated(
315 level = DeprecationLevel.ERROR,
316 message = "Flow analogue of 'startWith' is 'onStart'. Use 'onStart { emit(value) }'",
317 replaceWith = ReplaceWith("onStart { emit(value) }")
318 )
startWithnull319 public fun <T> Flow<T>.startWith(value: T): Flow<T> = noImpl()
320
321 /**
322 * Flow analogue of `startWith` is [onStart].
323 * Use `onStart { emitAll(other) }`.
324 * @suppress
325 */
326 @Deprecated(
327 level = DeprecationLevel.ERROR,
328 message = "Flow analogue of 'startWith' is 'onStart'. Use 'onStart { emitAll(other) }'",
329 replaceWith = ReplaceWith("onStart { emitAll(other) }")
330 )
331 public fun <T> Flow<T>.startWith(other: Flow<T>): Flow<T> = noImpl()
332
333 /**
334 * Flow analogue of `concatWith` is [onCompletion].
335 * Use `onCompletion { emit(value) }`.
336 * @suppress
337 */
338 @Deprecated(
339 level = DeprecationLevel.ERROR,
340 message = "Flow analogue of 'concatWith' is 'onCompletion'. Use 'onCompletion { emit(value) }'",
341 replaceWith = ReplaceWith("onCompletion { emit(value) }")
342 )
343 public fun <T> Flow<T>.concatWith(value: T): Flow<T> = noImpl()
344
345 /**
346 * Flow analogue of `concatWith` is [onCompletion].
347 * Use `onCompletion { emitAll(other) }`.
348 * @suppress
349 */
350 @Deprecated(
351 level = DeprecationLevel.ERROR,
352 message = "Flow analogue of 'concatWith' is 'onCompletion'. Use 'onCompletion { emitAll(other) }'",
353 replaceWith = ReplaceWith("onCompletion { emitAll(other) }")
354 )
355 public fun <T> Flow<T>.concatWith(other: Flow<T>): Flow<T> = noImpl()
356
357 @Deprecated(
358 level = DeprecationLevel.ERROR,
359 message = "Flow analogue of 'combineLatest' is 'combine'",
360 replaceWith = ReplaceWith("this.combine(other, transform)")
361 )
362 public fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
363 combine(this, other, transform)
364
365 @Deprecated(
366 level = DeprecationLevel.ERROR,
367 message = "Flow analogue of 'combineLatest' is 'combine'",
368 replaceWith = ReplaceWith("combine(this, other, other2, transform)")
369 )
370 public fun <T1, T2, T3, R> Flow<T1>.combineLatest(
371 other: Flow<T2>,
372 other2: Flow<T3>,
373 transform: suspend (T1, T2, T3) -> R
374 ) = combine(this, other, other2, transform)
375
376 @Deprecated(
377 level = DeprecationLevel.ERROR,
378 message = "Flow analogue of 'combineLatest' is 'combine'",
379 replaceWith = ReplaceWith("combine(this, other, other2, other3, transform)")
380 )
381 public fun <T1, T2, T3, T4, R> Flow<T1>.combineLatest(
382 other: Flow<T2>,
383 other2: Flow<T3>,
384 other3: Flow<T4>,
385 transform: suspend (T1, T2, T3, T4) -> R
386 ) = combine(this, other, other2, other3, transform)
387
388 @Deprecated(
389 level = DeprecationLevel.ERROR,
390 message = "Flow analogue of 'combineLatest' is 'combine'",
391 replaceWith = ReplaceWith("combine(this, other, other2, other3, transform)")
392 )
393 public fun <T1, T2, T3, T4, T5, R> Flow<T1>.combineLatest(
394 other: Flow<T2>,
395 other2: Flow<T3>,
396 other3: Flow<T4>,
397 other4: Flow<T5>,
398 transform: suspend (T1, T2, T3, T4, T5) -> R
399 ): Flow<R> = combine(this, other, other2, other3, other4, transform)
400
401 /**
402 * Delays the emission of values from this flow for the given [timeMillis].
403 * Use `onStart { delay(timeMillis) }`.
404 * @suppress
405 */
406 @Deprecated(
407 level = DeprecationLevel.WARNING, // since 1.3.0, error in 1.4.0
408 message = "Use 'onStart { delay(timeMillis) }'",
409 replaceWith = ReplaceWith("onStart { delay(timeMillis) }")
410 )
411 public fun <T> Flow<T>.delayFlow(timeMillis: Long): Flow<T> = onStart { delay(timeMillis) }
412
413 /**
414 * Delays each element emitted by the given flow for the given [timeMillis].
415 * Use `onEach { delay(timeMillis) }`.
416 * @suppress
417 */
418 @Deprecated(
419 level = DeprecationLevel.WARNING, // since 1.3.0, error in 1.4.0
420 message = "Use 'onEach { delay(timeMillis) }'",
421 replaceWith = ReplaceWith("onEach { delay(timeMillis) }")
422 )
<lambda>null423 public fun <T> Flow<T>.delayEach(timeMillis: Long): Flow<T> = onEach { delay(timeMillis) }
424
425 @Deprecated(
426 level = DeprecationLevel.ERROR,
427 message = "Flow analogues of 'switchMap' are 'transformLatest', 'flatMapLatest' and 'mapLatest'",
428 replaceWith = ReplaceWith("this.flatMapLatest(transform)")
429 )
switchMapnull430 public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): Flow<R> = flatMapLatest(transform)
431
432 @Deprecated(
433 level = DeprecationLevel.WARNING, // Since 1.3.8, was experimental when deprecated
434 message = "'scanReduce' was renamed to 'runningReduce' to be consistent with Kotlin standard library",
435 replaceWith = ReplaceWith("runningReduce(operation)")
436 )
437 public fun <T> Flow<T>.scanReduce(operation: suspend (accumulator: T, value: T) -> T): Flow<T> = runningReduce(operation)
438
439 @Deprecated(
440 level = DeprecationLevel.ERROR,
441 message = "Flow analogue of 'publish()' is 'shareIn'. \n" +
442 "publish().connect() is the default strategy (no extra call is needed), \n" +
443 "publish().autoConnect() translates to 'started = SharingStared.Lazily' argument, \n" +
444 "publish().refCount() translates to 'started = SharingStared.WhileSubscribed()' argument.",
445 replaceWith = ReplaceWith("this.shareIn(scope, 0)")
446 )
447 public fun <T> Flow<T>.publish(): Flow<T> = noImpl()
448
449 @Deprecated(
450 level = DeprecationLevel.ERROR,
451 message = "Flow analogue of 'publish(bufferSize)' is 'buffer' followed by 'shareIn'. \n" +
452 "publish().connect() is the default strategy (no extra call is needed), \n" +
453 "publish().autoConnect() translates to 'started = SharingStared.Lazily' argument, \n" +
454 "publish().refCount() translates to 'started = SharingStared.WhileSubscribed()' argument.",
455 replaceWith = ReplaceWith("this.buffer(bufferSize).shareIn(scope, 0)")
456 )
457 public fun <T> Flow<T>.publish(bufferSize: Int): Flow<T> = noImpl()
458
459 @Deprecated(
460 level = DeprecationLevel.ERROR,
461 message = "Flow analogue of 'replay()' is 'shareIn' with unlimited replay. \n" +
462 "replay().connect() is the default strategy (no extra call is needed), \n" +
463 "replay().autoConnect() translates to 'started = SharingStared.Lazily' argument, \n" +
464 "replay().refCount() translates to 'started = SharingStared.WhileSubscribed()' argument.",
465 replaceWith = ReplaceWith("this.shareIn(scope, Int.MAX_VALUE)")
466 )
467 public fun <T> Flow<T>.replay(): Flow<T> = noImpl()
468
469 @Deprecated(
470 level = DeprecationLevel.ERROR,
471 message = "Flow analogue of 'replay(bufferSize)' is 'shareIn' with the specified replay parameter. \n" +
472 "replay().connect() is the default strategy (no extra call is needed), \n" +
473 "replay().autoConnect() translates to 'started = SharingStared.Lazily' argument, \n" +
474 "replay().refCount() translates to 'started = SharingStared.WhileSubscribed()' argument.",
475 replaceWith = ReplaceWith("this.shareIn(scope, bufferSize)")
476 )
477 public fun <T> Flow<T>.replay(bufferSize: Int): Flow<T> = noImpl()
478
479 @Deprecated(
480 level = DeprecationLevel.ERROR,
481 message = "Flow analogue of 'cache()' is 'shareIn' with unlimited replay and 'started = SharingStared.Lazily' argument'",
482 replaceWith = ReplaceWith("this.shareIn(scope, Int.MAX_VALUE, started = SharingStared.Lazily)")
483 )
484 public fun <T> Flow<T>.cache(): Flow<T> = noImpl()
485