• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package benchmarks
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.channels.*
9 import org.openjdk.jmh.annotations.*
10 import java.util.concurrent.*
11 import kotlin.coroutines.*
12 
13 @Warmup(iterations = 5, time = 1)
14 @Measurement(iterations = 5, time = 1)
15 @BenchmarkMode(Mode.AverageTime)
16 @OutputTimeUnit(TimeUnit.MILLISECONDS)
17 @State(Scope.Benchmark)
18 @Fork(1)
19 open class ChannelSinkBenchmark {
20     private val tl = ThreadLocal.withInitial({ 42 })
21     private val tl2 = ThreadLocal.withInitial({ 239 })
22 
23     private val unconfined = Dispatchers.Unconfined
24     private val unconfinedOneElement = Dispatchers.Unconfined + tl.asContextElement()
25     private val unconfinedTwoElements = Dispatchers.Unconfined + tl.asContextElement() + tl2.asContextElement()
26 
27     @Benchmark
28     fun channelPipeline(): Int = runBlocking {
29         run(unconfined)
30     }
31 
32     @Benchmark
33     fun channelPipelineOneThreadLocal(): Int = runBlocking {
34         run(unconfinedOneElement)
35     }
36 
37     @Benchmark
38     fun channelPipelineTwoThreadLocals(): Int = runBlocking {
39         run(unconfinedTwoElements)
40     }
41 
42     private suspend inline fun run(context: CoroutineContext): Int {
43         return Channel
44             .range(1, 1_000_000, context)
45             .filter(context) { it % 4 == 0 }
46             .fold(0) { a, b -> a + b }
47     }
48 
49     private fun Channel.Factory.range(start: Int, count: Int, context: CoroutineContext) = GlobalScope.produce(context) {
50         for (i in start until (start + count))
51             send(i)
52     }
53 
54     // Migrated from deprecated operators, are good only for stressing channels
55 
56     private fun <E> ReceiveChannel<E>.filter(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
57         GlobalScope.produce(context, onCompletion = { cancel() }) {
58             for (e in this@filter) {
59                 if (predicate(e)) send(e)
60             }
61         }
62 
63     private suspend inline fun <E, R> ReceiveChannel<E>.fold(initial: R, operation: (acc: R, E) -> R): R {
64         var accumulator = initial
65         consumeEach {
66             accumulator = operation(accumulator, it)
67         }
68         return accumulator
69     }
70 }
71 
72