• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.channels
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.selects.*
9 
10 enum class TestChannelKind(
11     val capacity: Int,
12     private val description: String,
13     val viaBroadcast: Boolean = false
14 ) {
15     RENDEZVOUS(0, "RendezvousChannel"),
16     ARRAY_1(1, "ArrayChannel(1)"),
17     ARRAY_2(2, "ArrayChannel(2)"),
18     ARRAY_10(10, "ArrayChannel(10)"),
19     LINKED_LIST(Channel.UNLIMITED, "LinkedListChannel"),
20     CONFLATED(Channel.CONFLATED, "ConflatedChannel"),
21     ARRAY_1_BROADCAST(1, "ArrayBroadcastChannel(1)", viaBroadcast = true),
22     ARRAY_10_BROADCAST(10, "ArrayBroadcastChannel(10)", viaBroadcast = true),
23     CONFLATED_BROADCAST(Channel.CONFLATED, "ConflatedBroadcastChannel", viaBroadcast = true)
24     ;
25 
createnull26     fun <T> create(onUndeliveredElement: ((T) -> Unit)? = null): Channel<T> = when {
27         viaBroadcast && onUndeliveredElement != null -> error("Broadcast channels to do not support onUndeliveredElement")
28         viaBroadcast -> ChannelViaBroadcast(BroadcastChannel(capacity))
29         else -> Channel(capacity, onUndeliveredElement = onUndeliveredElement)
30     }
31 
32     val isConflated get() = capacity == Channel.CONFLATED
toStringnull33     override fun toString(): String = description
34 }
35 
36 private class ChannelViaBroadcast<E>(
37     private val broadcast: BroadcastChannel<E>
38 ): Channel<E>, SendChannel<E> by broadcast {
39     val sub = broadcast.openSubscription()
40 
41     override val isClosedForReceive: Boolean get() = sub.isClosedForReceive
42     override val isEmpty: Boolean get() = sub.isEmpty
43 
44     override suspend fun receive(): E = sub.receive()
45     override suspend fun receiveCatching(): ChannelResult<E> = sub.receiveCatching()
46     override fun iterator(): ChannelIterator<E> = sub.iterator()
47     override fun tryReceive(): ChannelResult<E> = sub.tryReceive()
48 
49     override fun cancel(cause: CancellationException?) = sub.cancel(cause)
50 
51     // implementing hidden method anyway, so can cast to an internal class
52     @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
53     override fun cancel(cause: Throwable?): Boolean = (sub as AbstractChannel).cancelInternal(cause)
54 
55     override val onReceive: SelectClause1<E>
56         get() = sub.onReceive
57     override val onReceiveCatching: SelectClause1<ChannelResult<E>>
58         get() = sub.onReceiveCatching
59 }
60