• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.stream
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.flow.*
10 import java.util.stream.*
11 
12 /**
13  * Represents the given stream as a flow and [closes][Stream.close] the stream afterwards.
14  * The resulting flow can be [collected][Flow.collect] only once
15  * and throws [IllegalStateException] when trying to collect it more than once.
16  */
consumeAsFlownull17 public fun <T> Stream<T>.consumeAsFlow(): Flow<T> = StreamFlow(this)
18 
19 private class StreamFlow<T>(private val stream: Stream<T>) : Flow<T> {
20     private val consumed = atomic(false)
21 
22     override suspend fun collect(collector: FlowCollector<T>) {
23         if (!consumed.compareAndSet(false, true)) error("Stream.consumeAsFlow can be collected only once")
24         try {
25             for (value in stream.iterator()) {
26                 collector.emit(value)
27             }
28         } finally {
29             stream.close()
30         }
31     }
32 }
33