1 /* 2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.stream 6 7 import kotlinx.atomicfu.* 8 import kotlinx.coroutines.* 9 import kotlinx.coroutines.flow.* 10 import java.util.stream.* 11 12 /** 13 * Represents the given stream as a flow and [closes][Stream.close] the stream afterwards. 14 * The resulting flow can be [collected][Flow.collect] only once 15 * and throws [IllegalStateException] when trying to collect it more than once. 16 */ consumeAsFlownull17public fun <T> Stream<T>.consumeAsFlow(): Flow<T> = StreamFlow(this) 18 19 private class StreamFlow<T>(private val stream: Stream<T>) : Flow<T> { 20 private val consumed = atomic(false) 21 22 override suspend fun collect(collector: FlowCollector<T>) { 23 if (!consumed.compareAndSet(false, true)) error("Stream.consumeAsFlow can be collected only once") 24 try { 25 for (value in stream.iterator()) { 26 collector.emit(value) 27 } 28 } finally { 29 stream.close() 30 } 31 } 32 } 33