1 /*
2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package benchmarks.scheduler
6
7 import benchmarks.*
8 import kotlinx.coroutines.*
9 import org.openjdk.jmh.annotations.*
10 import java.util.concurrent.*
11
12 /*
13 * Comparison of fork-join tasks using specific FJP API and classic [async] jobs.
14 * FJP job is organized in perfectly balanced binary tree, every leaf node computes
15 * FPU-heavy sum over its data and intermediate nodes sum results.
16 *
17 * Fine-grained batch size (8192 * 1024 tasks, 128 in sequential batch)
18 * ForkJoinBenchmark.asyncExperimental avgt 10 681.512 ± 32.069 ms/op
19 * ForkJoinBenchmark.asyncFjp avgt 10 845.386 ± 73.204 ms/op
20 * ForkJoinBenchmark.fjpRecursiveTask avgt 10 692.120 ± 26.224 ms/op
21 * ForkJoinBenchmark.fjpTask avgt 10 791.087 ± 66.544 ms/op
22 *
23 * Too small tasks (8192 * 1024 tasks, 128 batch, 16 in sequential batch)
24 * Benchmark Mode Cnt Score Error Units
25 * ForkJoinBenchmark.asyncExperimental avgt 10 1273.271 ± 190.372 ms/op
26 * ForkJoinBenchmark.asyncFjp avgt 10 1406.102 ± 216.793 ms/op
27 * ForkJoinBenchmark.fjpRecursiveTask avgt 10 849.941 ± 141.254 ms/op
28 * ForkJoinBenchmark.fjpTask avgt 10 831.554 ± 57.276 ms/op
29 */
30 @Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
31 @Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
32 @Fork(value = 2)
33 @BenchmarkMode(Mode.AverageTime)
34 @OutputTimeUnit(TimeUnit.MILLISECONDS)
35 @State(Scope.Benchmark)
36 open class ForkJoinBenchmark : ParametrizedDispatcherBase() {
37
38 companion object {
39 /*
40 * Change task size to control global granularity of benchmark
41 * Change batch size to control affinity/work stealing/scheduling overhead effects
42 */
43 const val TASK_SIZE = 8192 * 1024
44 const val BATCH_SIZE = 32 * 8192
45 }
46
47 lateinit var coefficients: LongArray
48 override var dispatcher: String = "scheduler"
49
50 @Setup
setupnull51 override fun setup() {
52 super.setup()
53 coefficients = LongArray(TASK_SIZE) { ThreadLocalRandom.current().nextLong(0, 1024 * 1024) }
54 }
55
56 @Benchmark
<lambda>null57 fun asyncFjp() = runBlocking {
58 CoroutineScope(ForkJoinPool.commonPool().asCoroutineDispatcher()).startAsync(coefficients, 0, coefficients.size).await()
59 }
60
61 @Benchmark
<lambda>null62 fun asyncExperimental() = runBlocking {
63 startAsync(coefficients, 0, coefficients.size).await()
64 }
65
66 @Benchmark
fjpRecursiveTasknull67 fun fjpRecursiveTask(): Double {
68 val task = RecursiveAction(coefficients, 0, coefficients.size)
69 return ForkJoinPool.commonPool().submit(task).join()
70 }
71
72 @Benchmark
fjpTasknull73 fun fjpTask(): Double {
74 val task = Task(coefficients, 0, coefficients.size)
75 return ForkJoinPool.commonPool().submit(task).join()
76 }
77
<lambda>null78 suspend fun CoroutineScope.startAsync(coefficients: LongArray, start: Int, end: Int): Deferred<Double> = async {
79 if (end - start <= BATCH_SIZE) {
80 compute(coefficients, start, end)
81 } else {
82 val first = startAsync(coefficients, start, start + (end - start) / 2)
83 val second = startAsync(coefficients, start + (end - start) / 2, end)
84 first.await() + second.await()
85 }
86 }
87
88 class Task(val coefficients: LongArray, val start: Int, val end: Int) : RecursiveTask<Double>() {
computenull89 override fun compute(): Double {
90 if (end - start <= BATCH_SIZE) {
91 return compute(coefficients, start, end)
92 }
93
94 val first = Task(coefficients, start, start + (end - start) / 2).fork()
95 val second = Task(coefficients, start + (end - start) / 2, end).fork()
96
97 var result = 0.0
98 result += first.join()
99 result += second.join()
100 return result
101 }
102
computenull103 private fun compute(coefficients: LongArray, start: Int, end: Int): Double {
104 var result = 0.0
105 for (i in start until end) {
106 result += Math.sin(Math.pow(coefficients[i].toDouble(), 1.1)) + 1e-8
107 }
108
109 return result
110 }
111 }
112
113 class RecursiveAction(val coefficients: LongArray, val start: Int, val end: Int, @Volatile var result: Double = 0.0,
114 parent: RecursiveAction? = null) : CountedCompleter<Double>(parent) {
115
116 private var first: ForkJoinTask<Double>? = null
117 private var second: ForkJoinTask<Double>? = null
118
getRawResultnull119 override fun getRawResult(): Double {
120 return result
121 }
122
setRawResultnull123 override fun setRawResult(t: Double) {
124 result = t
125 }
126
computenull127 override fun compute() {
128 if (end - start <= BATCH_SIZE) {
129 rawResult = compute(coefficients, start, end)
130 } else {
131 pendingCount = 2
132 // One may fork only once here and executing second task here with looping over firstComplete to be even more efficient
133 first = RecursiveAction(
134 coefficients,
135 start,
136 start + (end - start) / 2,
137 parent = this
138 ).fork()
139 second = RecursiveAction(
140 coefficients,
141 start + (end - start) / 2,
142 end,
143 parent = this
144 ).fork()
145 }
146
147 tryComplete()
148 }
149
onCompletionnull150 override fun onCompletion(caller: CountedCompleter<*>?) {
151 if (caller !== this) {
152 rawResult = first!!.rawResult + second!!.rawResult
153 }
154 super.onCompletion(caller)
155 }
156 }
157 }
158
159
computenull160 private fun compute(coefficients: LongArray, start: Int, end: Int): Double {
161 var result = 0.0
162 for (i in start until end) {
163 result += Math.sin(Math.pow(coefficients[i].toDouble(), 1.1)) + 1e-8
164 }
165
166 return result
167 }
168