1 /* <lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package benchmarks 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.channels.* 9 import org.openjdk.jmh.annotations.* 10 import java.util.concurrent.* 11 import kotlin.coroutines.* 12 13 @Warmup(iterations = 5, time = 1) 14 @Measurement(iterations = 5, time = 1) 15 @BenchmarkMode(Mode.AverageTime) 16 @OutputTimeUnit(TimeUnit.MILLISECONDS) 17 @State(Scope.Benchmark) 18 @Fork(1) 19 open class ChannelSinkBenchmark { 20 private val tl = ThreadLocal.withInitial({ 42 }) 21 private val tl2 = ThreadLocal.withInitial({ 239 }) 22 23 private val unconfined = Dispatchers.Unconfined 24 private val unconfinedOneElement = Dispatchers.Unconfined + tl.asContextElement() 25 private val unconfinedTwoElements = Dispatchers.Unconfined + tl.asContextElement() + tl2.asContextElement() 26 27 @Benchmark 28 fun channelPipeline(): Int = runBlocking { 29 run(unconfined) 30 } 31 32 @Benchmark 33 fun channelPipelineOneThreadLocal(): Int = runBlocking { 34 run(unconfinedOneElement) 35 } 36 37 @Benchmark 38 fun channelPipelineTwoThreadLocals(): Int = runBlocking { 39 run(unconfinedTwoElements) 40 } 41 42 private suspend inline fun run(context: CoroutineContext): Int { 43 return Channel 44 .range(1, 1_000_000, context) 45 .filter(context) { it % 4 == 0 } 46 .fold(0) { a, b -> a + b } 47 } 48 49 private fun Channel.Factory.range(start: Int, count: Int, context: CoroutineContext) = GlobalScope.produce(context) { 50 for (i in start until (start + count)) 51 send(i) 52 } 53 54 // Migrated from deprecated operators, are good only for stressing channels 55 56 private fun <E> ReceiveChannel<E>.filter(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> = 57 GlobalScope.produce(context, onCompletion = { cancel() }) { 58 for (e in this@filter) { 59 if (predicate(e)) send(e) 60 } 61 } 62 63 private suspend inline fun <E, R> ReceiveChannel<E>.fold(initial: R, operation: (acc: R, E) -> R): R { 64 var accumulator = initial 65 consumeEach { 66 accumulator = operation(accumulator, it) 67 } 68 return accumulator 69 } 70 } 71 72