• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 @file:JvmMultifileClass
6 @file:JvmName("FlowKt")
7 @file:Suppress("unused", "DeprecatedCallableAddReplaceWith", "UNUSED_PARAMETER", "NO_EXPLICIT_RETURN_TYPE_IN_API_MODE")
8 
9 package kotlinx.coroutines.flow
10 
11 import kotlinx.coroutines.*
12 import kotlin.coroutines.*
13 import kotlin.jvm.*
14 
15 /**
16  * **GENERAL NOTE**
17  *
18  * These deprecations are added to improve user experience when they will start to
19  * search for their favourite operators and/or patterns that are missing or renamed in Flow.
20  * Deprecated functions also are moved here when they renamed. The difference is that they have
21  * a body with their implementation while pure stubs have [noImpl].
22  */
23 internal fun noImpl(): Nothing =
24     throw UnsupportedOperationException("Not implemented, should not be called")
25 
26 /**
27  * `observeOn` has no direct match in [Flow] API because all terminal flow operators are suspending and
28  * thus use the context of the caller.
29  *
30  * For example, the following code:
31  * ```
32  * flowable
33  *     .observeOn(Schedulers.io())
34  *     .doOnEach { value -> println("Received $value") }
35  *     .subscribe()
36  * ```
37  *
38  *  has the following Flow equivalent:
39  * ```
40  * withContext(Dispatchers.IO) {
41  *     flow.collect { value -> println("Received $value") }
42  * }
43  *
44  * ```
45  * @suppress
46  */
47 @Deprecated(message = "Collect flow in the desired context instead", level = DeprecationLevel.ERROR)
48 public fun <T> Flow<T>.observeOn(context: CoroutineContext): Flow<T> = noImpl()
49 
50 /**
51  * `publishOn` has no direct match in [Flow] API because all terminal flow operators are suspending and
52  * thus use the context of the caller.
53  *
54  * For example, the following code:
55  * ```
56  * flux
57  *     .publishOn(Schedulers.io())
58  *     .doOnEach { value -> println("Received $value") }
59  *     .subscribe()
60  * ```
61  *
62  *  has the following Flow equivalent:
63  * ```
64  * withContext(Dispatchers.IO) {
65  *     flow.collect { value -> println("Received $value") }
66  * }
67  *
68  * ```
69  * @suppress
70  */
71 @Deprecated(message = "Collect flow in the desired context instead", level = DeprecationLevel.ERROR)
72 public fun <T> Flow<T>.publishOn(context: CoroutineContext): Flow<T> = noImpl()
73 
74 /**
75  * `subscribeOn` has no direct match in [Flow] API because [Flow] preserves its context and does not leak it.
76  *
77  * For example, the following code:
78  * ```
79  * flowable
80  *     .map { value -> println("Doing map in IO"); value }
81  *     .subscribeOn(Schedulers.io())
82  *     .observeOn(Schedulers.computation())
83  *     .doOnEach { value -> println("Processing $value in computation")
84  *     .subscribe()
85  * ```
86  * has the following Flow equivalent:
87  * ```
88  * withContext(Dispatchers.Default) {
89  *     flow
90  *        .map { value -> println("Doing map in IO"); value }
91  *        .flowOn(Dispatchers.IO) // Works upstream, doesn't change downstream
92  *        .collect { value ->
93  *             println("Processing $value in computation")
94  *        }
95  * }
96  * ```
97  * Opposed to subscribeOn, it it **possible** to use multiple `flowOn` operators in the one flow
98  * @suppress
99  */
100 @Deprecated(message = "Use 'flowOn' instead", level = DeprecationLevel.ERROR)
101 public fun <T> Flow<T>.subscribeOn(context: CoroutineContext): Flow<T> = noImpl()
102 
103 /**
104  * Flow analogue of `onErrorXxx` is [catch].
105  * Use `catch { emitAll(fallback) }`.
106  * @suppress
107  */
108 @Deprecated(
109     level = DeprecationLevel.ERROR,
110     message = "Flow analogue of 'onErrorXxx' is 'catch'. Use 'catch { emitAll(fallback) }'",
111     replaceWith = ReplaceWith("catch { emitAll(fallback) }")
112 )
113 public fun <T> Flow<T>.onErrorResume(fallback: Flow<T>): Flow<T> = noImpl()
114 
115 /**
116  * Flow analogue of `onErrorXxx` is [catch].
117  * Use `catch { emitAll(fallback) }`.
118  * @suppress
119  */
120 @Deprecated(
121     level = DeprecationLevel.ERROR,
122     message = "Flow analogue of 'onErrorXxx' is 'catch'. Use 'catch { emitAll(fallback) }'",
123     replaceWith = ReplaceWith("catch { emitAll(fallback) }")
124 )
125 public fun <T> Flow<T>.onErrorResumeNext(fallback: Flow<T>): Flow<T> = noImpl()
126 
127 /**
128  * `subscribe` is Rx-specific API that has no direct match in flows.
129  * One can use [launchIn] instead, for example the following:
130  * ```
131  * flowable
132  *     .observeOn(Schedulers.io())
133  *     .subscribe({ println("Received $it") }, { println("Exception $it happened") }, { println("Flowable is completed successfully") }
134  * ```
135  *
136  * has the following Flow equivalent:
137  * ```
138  * flow
139  *     .onEach { value -> println("Received $value") }
140  *     .onCompletion { cause -> if (cause == null) println("Flow is completed successfully") }
141  *     .catch { cause -> println("Exception $cause happened") }
142  *     .flowOn(Dispatchers.IO)
143  *     .launchIn(myScope)
144  * ```
145  *
146  * Note that resulting value of [launchIn] is not used because the provided scope takes care of cancellation.
147  *
148  * Or terminal operators like [single] can be used from suspend functions.
149  * @suppress
150  */
151 @Deprecated(
152     message = "Use 'launchIn' with 'onEach', 'onCompletion' and 'catch' instead",
153     level = DeprecationLevel.ERROR
154 )
155 public fun <T> Flow<T>.subscribe(): Unit = noImpl()
156 
157 /**
158  * Use [launchIn] with [onEach], [onCompletion] and [catch] operators instead.
159  * @suppress
160  */
161 @Deprecated(
162     message = "Use 'launchIn' with 'onEach', 'onCompletion' and 'catch' instead",
163     level = DeprecationLevel.ERROR
164 )public fun <T> Flow<T>.subscribe(onEach: suspend (T) -> Unit): Unit = noImpl()
165 
166 /**
167  * Use [launchIn] with [onEach], [onCompletion] and [catch] operators instead.
168  * @suppress
169  */
170 @Deprecated(
171     message = "Use 'launchIn' with 'onEach', 'onCompletion' and 'catch' instead",
172     level = DeprecationLevel.ERROR
173 )public fun <T> Flow<T>.subscribe(onEach: suspend (T) -> Unit, onError: suspend (Throwable) -> Unit): Unit = noImpl()
174 
175 /**
176  * Note that this replacement is sequential (`concat`) by default.
177  * For concurrent flatMap [flatMapMerge] can be used instead.
178  * @suppress
179  */
180 @Deprecated(
181     level = DeprecationLevel.ERROR,
182     message = "Flow analogue is 'flatMapConcat'",
183     replaceWith = ReplaceWith("flatMapConcat(mapper)")
184 )
185 public fun <T, R> Flow<T>.flatMap(mapper: suspend (T) -> Flow<R>): Flow<R> = noImpl()
186 
187 /**
188  * Flow analogue of `concatMap` is [flatMapConcat].
189  * @suppress
190  */
191 @Deprecated(
192     level = DeprecationLevel.ERROR,
193     message = "Flow analogue of 'concatMap' is 'flatMapConcat'",
194     replaceWith = ReplaceWith("flatMapConcat(mapper)")
195 )
196 public fun <T, R> Flow<T>.concatMap(mapper: (T) -> Flow<R>): Flow<R> = noImpl()
197 
198 /**
199  * Note that this replacement is sequential (`concat`) by default.
200  * For concurrent flatMap [flattenMerge] can be used instead.
201  * @suppress
202  */
203 @Deprecated(
204     level = DeprecationLevel.ERROR,
205     message = "Flow analogue of 'merge' is 'flattenConcat'",
206     replaceWith = ReplaceWith("flattenConcat()")
207 )
208 public fun <T> Flow<Flow<T>>.merge(): Flow<T> = noImpl()
209 
210 /**
211  * Flow analogue of `flatten` is [flattenConcat].
212  * @suppress
213  */
214 @Deprecated(
215     level = DeprecationLevel.ERROR,
216     message = "Flow analogue of 'flatten' is 'flattenConcat'",
217     replaceWith = ReplaceWith("flattenConcat()")
218 )
219 public fun <T> Flow<Flow<T>>.flatten(): Flow<T> = noImpl()
220 
221 /**
222  * Kotlin has a built-in generic mechanism for making chained calls.
223  * If you wish to write something like
224  * ```
225  * myFlow.compose(MyFlowExtensions.ignoreErrors()).collect { ... }
226  * ```
227  * you can replace it with
228  *
229  * ```
230  * myFlow.let(MyFlowExtensions.ignoreErrors()).collect { ... }
231  * ```
232  * @suppress
233  */
234 @Deprecated(
235     level = DeprecationLevel.ERROR,
236     message = "Flow analogue of 'compose' is 'let'",
237     replaceWith = ReplaceWith("let(transformer)")
238 )
239 public fun <T, R> Flow<T>.compose(transformer: Flow<T>.() -> Flow<R>): Flow<R> = noImpl()
240 
241 /**
242  * Flow analogue of `skip` is [drop].
243  * @suppress
244  */
245 @Deprecated(
246     level = DeprecationLevel.ERROR,
247     message = "Flow analogue of 'skip' is 'drop'",
248     replaceWith = ReplaceWith("drop(count)")
249 )
250 public fun <T> Flow<T>.skip(count: Int): Flow<T> = noImpl()
251 
252 /**
253  * Flow extension to iterate over elements is [collect].
254  * Foreach wasn't introduced deliberately to avoid confusion.
255  * Flow is not a collection, iteration over it may be not idempotent
256  * and can *launch* computations with side-effects.
257  * This behaviour is not reflected in [forEach] name.
258  * @suppress
259  */
260 @Deprecated(
261     level = DeprecationLevel.ERROR,
262     message = "Flow analogue of 'forEach' is 'collect'",
263     replaceWith = ReplaceWith("collect(block)")
264 )
265 public fun <T> Flow<T>.forEach(action: suspend (value: T) -> Unit): Unit = noImpl()
266 
267 /**
268  * Flow has less verbose [scan] shortcut.
269  * @suppress
270  */
271 @Deprecated(
272     level = DeprecationLevel.ERROR,
273     message = "Flow has less verbose 'scan' shortcut",
274     replaceWith = ReplaceWith("scan(initial, operation)")
275 )
276 public fun <T, R> Flow<T>.scanFold(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R> =
277     noImpl()
278 
279 /**
280  * Flow analogue of `onErrorXxx` is [catch].
281  * Use `catch { emit(fallback) }`.
282  * @suppress
283  */
284 @Deprecated(
285     level = DeprecationLevel.ERROR,
286     message = "Flow analogue of 'onErrorXxx' is 'catch'. Use 'catch { emit(fallback) }'",
287     replaceWith = ReplaceWith("catch { emit(fallback) }")
288 )
289 // Note: this version without predicate gives better "replaceWith" action
290 public fun <T> Flow<T>.onErrorReturn(fallback: T): Flow<T> = noImpl()
291 
292 /**
293  * Flow analogue of `onErrorXxx` is [catch].
294  * Use `catch { e -> if (predicate(e)) emit(fallback) else throw e }`.
295  * @suppress
296  */
297 @Deprecated(
298     level = DeprecationLevel.ERROR,
299     message = "Flow analogue of 'onErrorXxx' is 'catch'. Use 'catch { e -> if (predicate(e)) emit(fallback) else throw e }'",
300     replaceWith = ReplaceWith("catch { e -> if (predicate(e)) emit(fallback) else throw e }")
301 )
302 public fun <T> Flow<T>.onErrorReturn(fallback: T, predicate: (Throwable) -> Boolean = { true }): Flow<T> =
enull303     catch { e ->
304         // Note: default value is for binary compatibility with preview version, that is why it has body
305         if (!predicate(e)) throw e
306         emit(fallback)
307     }
308 
309 /**
310  * Flow analogue of `startWith` is [onStart].
311  * Use `onStart { emit(value) }`.
312  * @suppress
313  */
314 @Deprecated(
315     level = DeprecationLevel.ERROR,
316     message = "Flow analogue of 'startWith' is 'onStart'. Use 'onStart { emit(value) }'",
317     replaceWith = ReplaceWith("onStart { emit(value) }")
318 )
startWithnull319 public fun <T> Flow<T>.startWith(value: T): Flow<T> = noImpl()
320 
321 /**
322  * Flow analogue of `startWith` is [onStart].
323  * Use `onStart { emitAll(other) }`.
324  * @suppress
325  */
326 @Deprecated(
327     level = DeprecationLevel.ERROR,
328     message = "Flow analogue of 'startWith' is 'onStart'. Use 'onStart { emitAll(other) }'",
329     replaceWith = ReplaceWith("onStart { emitAll(other) }")
330 )
331 public fun <T> Flow<T>.startWith(other: Flow<T>): Flow<T> = noImpl()
332 
333 /**
334  * Flow analogue of `concatWith` is [onCompletion].
335  * Use `onCompletion { emit(value) }`.
336  * @suppress
337  */
338 @Deprecated(
339     level = DeprecationLevel.ERROR,
340     message = "Flow analogue of 'concatWith' is 'onCompletion'. Use 'onCompletion { emit(value) }'",
341     replaceWith = ReplaceWith("onCompletion { emit(value) }")
342 )
343 public fun <T> Flow<T>.concatWith(value: T): Flow<T> = noImpl()
344 
345 /**
346  * Flow analogue of `concatWith` is [onCompletion].
347  * Use `onCompletion { emitAll(other) }`.
348  * @suppress
349  */
350 @Deprecated(
351     level = DeprecationLevel.ERROR,
352     message = "Flow analogue of 'concatWith' is 'onCompletion'. Use 'onCompletion { emitAll(other) }'",
353     replaceWith = ReplaceWith("onCompletion { emitAll(other) }")
354 )
355 public fun <T> Flow<T>.concatWith(other: Flow<T>): Flow<T> = noImpl()
356 
357 @Deprecated(
358     level = DeprecationLevel.ERROR,
359     message = "Flow analogue of 'combineLatest' is 'combine'",
360     replaceWith = ReplaceWith("this.combine(other, transform)")
361 )
362 public fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
363     combine(this, other, transform)
364 
365 @Deprecated(
366     level = DeprecationLevel.ERROR,
367     message = "Flow analogue of 'combineLatest' is 'combine'",
368     replaceWith = ReplaceWith("combine(this, other, other2, transform)")
369 )
370 public fun <T1, T2, T3, R> Flow<T1>.combineLatest(
371     other: Flow<T2>,
372     other2: Flow<T3>,
373     transform: suspend (T1, T2, T3) -> R
374 ) = combine(this, other, other2, transform)
375 
376 @Deprecated(
377     level = DeprecationLevel.ERROR,
378     message = "Flow analogue of 'combineLatest' is 'combine'",
379     replaceWith = ReplaceWith("combine(this, other, other2, other3, transform)")
380 )
381 public fun <T1, T2, T3, T4, R> Flow<T1>.combineLatest(
382     other: Flow<T2>,
383     other2: Flow<T3>,
384     other3: Flow<T4>,
385     transform: suspend (T1, T2, T3, T4) -> R
386 ) = combine(this, other, other2, other3, transform)
387 
388 @Deprecated(
389     level = DeprecationLevel.ERROR,
390     message = "Flow analogue of 'combineLatest' is 'combine'",
391     replaceWith = ReplaceWith("combine(this, other, other2, other3, transform)")
392 )
393 public fun <T1, T2, T3, T4, T5, R> Flow<T1>.combineLatest(
394     other: Flow<T2>,
395     other2: Flow<T3>,
396     other3: Flow<T4>,
397     other4: Flow<T5>,
398     transform: suspend (T1, T2, T3, T4, T5) -> R
399 ): Flow<R> = combine(this, other, other2, other3, other4, transform)
400 
401 /**
402  * Delays the emission of values from this flow for the given [timeMillis].
403  * Use `onStart { delay(timeMillis) }`.
404  * @suppress
405  */
406 @Deprecated(
407     level = DeprecationLevel.WARNING, // since 1.3.0, error in 1.4.0
408     message = "Use 'onStart { delay(timeMillis) }'",
409     replaceWith = ReplaceWith("onStart { delay(timeMillis) }")
410 )
411 public fun <T> Flow<T>.delayFlow(timeMillis: Long): Flow<T> = onStart { delay(timeMillis) }
412 
413 /**
414  * Delays each element emitted by the given flow for the given [timeMillis].
415  * Use `onEach { delay(timeMillis) }`.
416  * @suppress
417  */
418 @Deprecated(
419     level = DeprecationLevel.WARNING, // since 1.3.0, error in 1.4.0
420     message = "Use 'onEach { delay(timeMillis) }'",
421     replaceWith = ReplaceWith("onEach { delay(timeMillis) }")
422 )
<lambda>null423 public fun <T> Flow<T>.delayEach(timeMillis: Long): Flow<T> = onEach { delay(timeMillis) }
424 
425 @Deprecated(
426     level = DeprecationLevel.ERROR,
427     message = "Flow analogues of 'switchMap' are 'transformLatest', 'flatMapLatest' and 'mapLatest'",
428     replaceWith = ReplaceWith("this.flatMapLatest(transform)")
429 )
switchMapnull430 public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): Flow<R> = flatMapLatest(transform)
431 
432 @Deprecated(
433     level = DeprecationLevel.WARNING, // Since 1.3.8, was experimental when deprecated
434     message = "'scanReduce' was renamed to 'runningReduce' to be consistent with Kotlin standard library",
435     replaceWith = ReplaceWith("runningReduce(operation)")
436 )
437 public fun <T> Flow<T>.scanReduce(operation: suspend (accumulator: T, value: T) -> T): Flow<T> = runningReduce(operation)
438 
439 @Deprecated(
440     level = DeprecationLevel.ERROR,
441     message = "Flow analogue of 'publish()' is 'shareIn'. \n" +
442         "publish().connect() is the default strategy (no extra call is needed), \n" +
443         "publish().autoConnect() translates to 'started = SharingStared.Lazily' argument, \n" +
444         "publish().refCount() translates to 'started = SharingStared.WhileSubscribed()' argument.",
445     replaceWith = ReplaceWith("this.shareIn(scope, 0)")
446 )
447 public fun <T> Flow<T>.publish(): Flow<T> = noImpl()
448 
449 @Deprecated(
450     level = DeprecationLevel.ERROR,
451     message = "Flow analogue of 'publish(bufferSize)' is 'buffer' followed by 'shareIn'. \n" +
452         "publish().connect() is the default strategy (no extra call is needed), \n" +
453         "publish().autoConnect() translates to 'started = SharingStared.Lazily' argument, \n" +
454         "publish().refCount() translates to 'started = SharingStared.WhileSubscribed()' argument.",
455     replaceWith = ReplaceWith("this.buffer(bufferSize).shareIn(scope, 0)")
456 )
457 public fun <T> Flow<T>.publish(bufferSize: Int): Flow<T> = noImpl()
458 
459 @Deprecated(
460     level = DeprecationLevel.ERROR,
461     message = "Flow analogue of 'replay()' is 'shareIn' with unlimited replay. \n" +
462         "replay().connect() is the default strategy (no extra call is needed), \n" +
463         "replay().autoConnect() translates to 'started = SharingStared.Lazily' argument, \n" +
464         "replay().refCount() translates to 'started = SharingStared.WhileSubscribed()' argument.",
465     replaceWith = ReplaceWith("this.shareIn(scope, Int.MAX_VALUE)")
466 )
467 public fun <T> Flow<T>.replay(): Flow<T> = noImpl()
468 
469 @Deprecated(
470     level = DeprecationLevel.ERROR,
471     message = "Flow analogue of 'replay(bufferSize)' is 'shareIn' with the specified replay parameter. \n" +
472         "replay().connect() is the default strategy (no extra call is needed), \n" +
473         "replay().autoConnect() translates to 'started = SharingStared.Lazily' argument, \n" +
474         "replay().refCount() translates to 'started = SharingStared.WhileSubscribed()' argument.",
475     replaceWith = ReplaceWith("this.shareIn(scope, bufferSize)")
476 )
477 public fun <T> Flow<T>.replay(bufferSize: Int): Flow<T> = noImpl()
478 
479 @Deprecated(
480     level = DeprecationLevel.ERROR,
481     message = "Flow analogue of 'cache()' is 'shareIn' with unlimited replay and 'started = SharingStared.Lazily' argument'",
482     replaceWith = ReplaceWith("this.shareIn(scope, Int.MAX_VALUE, started = SharingStared.Lazily)")
483 )
484 public fun <T> Flow<T>.cache(): Flow<T> = noImpl()
485