1 package benchmarks.scheduler
2
3 import benchmarks.*
4 import kotlinx.coroutines.*
5 import org.openjdk.jmh.annotations.*
6 import java.util.concurrent.*
7
8 /*
9 * Comparison of fork-join tasks using specific FJP API and classic [async] jobs.
10 * FJP job is organized in perfectly balanced binary tree, every leaf node computes
11 * FPU-heavy sum over its data and intermediate nodes sum results.
12 *
13 * Fine-grained batch size (8192 * 1024 tasks, 128 in sequential batch)
14 * ForkJoinBenchmark.asyncExperimental avgt 10 681.512 ± 32.069 ms/op
15 * ForkJoinBenchmark.asyncFjp avgt 10 845.386 ± 73.204 ms/op
16 * ForkJoinBenchmark.fjpRecursiveTask avgt 10 692.120 ± 26.224 ms/op
17 * ForkJoinBenchmark.fjpTask avgt 10 791.087 ± 66.544 ms/op
18 *
19 * Too small tasks (8192 * 1024 tasks, 128 batch, 16 in sequential batch)
20 * Benchmark Mode Cnt Score Error Units
21 * ForkJoinBenchmark.asyncExperimental avgt 10 1273.271 ± 190.372 ms/op
22 * ForkJoinBenchmark.asyncFjp avgt 10 1406.102 ± 216.793 ms/op
23 * ForkJoinBenchmark.fjpRecursiveTask avgt 10 849.941 ± 141.254 ms/op
24 * ForkJoinBenchmark.fjpTask avgt 10 831.554 ± 57.276 ms/op
25 */
26 @Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
27 @Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
28 @Fork(value = 2)
29 @BenchmarkMode(Mode.AverageTime)
30 @OutputTimeUnit(TimeUnit.MILLISECONDS)
31 @State(Scope.Benchmark)
32 open class ForkJoinBenchmark : ParametrizedDispatcherBase() {
33
34 companion object {
35 /*
36 * Change task size to control global granularity of benchmark
37 * Change batch size to control affinity/work stealing/scheduling overhead effects
38 */
39 const val TASK_SIZE = 8192 * 1024
40 const val BATCH_SIZE = 32 * 8192
41 }
42
43 lateinit var coefficients: LongArray
44 override var dispatcher: String = "scheduler"
45
46 @Setup
setupnull47 override fun setup() {
48 super.setup()
49 coefficients = LongArray(TASK_SIZE) { ThreadLocalRandom.current().nextLong(0, 1024 * 1024) }
50 }
51
52 @Benchmark
<lambda>null53 fun asyncFjp() = runBlocking {
54 CoroutineScope(ForkJoinPool.commonPool().asCoroutineDispatcher()).startAsync(coefficients, 0, coefficients.size).await()
55 }
56
57 @Benchmark
<lambda>null58 fun asyncExperimental() = runBlocking {
59 startAsync(coefficients, 0, coefficients.size).await()
60 }
61
62 @Benchmark
fjpRecursiveTasknull63 fun fjpRecursiveTask(): Double {
64 val task = RecursiveAction(coefficients, 0, coefficients.size)
65 return ForkJoinPool.commonPool().submit(task).join()
66 }
67
68 @Benchmark
fjpTasknull69 fun fjpTask(): Double {
70 val task = Task(coefficients, 0, coefficients.size)
71 return ForkJoinPool.commonPool().submit(task).join()
72 }
73
<lambda>null74 suspend fun CoroutineScope.startAsync(coefficients: LongArray, start: Int, end: Int): Deferred<Double> = async {
75 if (end - start <= BATCH_SIZE) {
76 compute(coefficients, start, end)
77 } else {
78 val first = startAsync(coefficients, start, start + (end - start) / 2)
79 val second = startAsync(coefficients, start + (end - start) / 2, end)
80 first.await() + second.await()
81 }
82 }
83
84 class Task(val coefficients: LongArray, val start: Int, val end: Int) : RecursiveTask<Double>() {
computenull85 override fun compute(): Double {
86 if (end - start <= BATCH_SIZE) {
87 return compute(coefficients, start, end)
88 }
89
90 val first = Task(coefficients, start, start + (end - start) / 2).fork()
91 val second = Task(coefficients, start + (end - start) / 2, end).fork()
92
93 var result = 0.0
94 result += first.join()
95 result += second.join()
96 return result
97 }
98
computenull99 private fun compute(coefficients: LongArray, start: Int, end: Int): Double {
100 var result = 0.0
101 for (i in start until end) {
102 result += Math.sin(Math.pow(coefficients[i].toDouble(), 1.1)) + 1e-8
103 }
104
105 return result
106 }
107 }
108
109 class RecursiveAction(val coefficients: LongArray, val start: Int, val end: Int, @Volatile var result: Double = 0.0,
110 parent: RecursiveAction? = null) : CountedCompleter<Double>(parent) {
111
112 private var first: ForkJoinTask<Double>? = null
113 private var second: ForkJoinTask<Double>? = null
114
getRawResultnull115 override fun getRawResult(): Double {
116 return result
117 }
118
setRawResultnull119 override fun setRawResult(t: Double) {
120 result = t
121 }
122
computenull123 override fun compute() {
124 if (end - start <= BATCH_SIZE) {
125 rawResult = compute(coefficients, start, end)
126 } else {
127 pendingCount = 2
128 // One may fork only once here and executing second task here with looping over firstComplete to be even more efficient
129 first = RecursiveAction(
130 coefficients,
131 start,
132 start + (end - start) / 2,
133 parent = this
134 ).fork()
135 second = RecursiveAction(
136 coefficients,
137 start + (end - start) / 2,
138 end,
139 parent = this
140 ).fork()
141 }
142
143 tryComplete()
144 }
145
onCompletionnull146 override fun onCompletion(caller: CountedCompleter<*>?) {
147 if (caller !== this) {
148 rawResult = first!!.rawResult + second!!.rawResult
149 }
150 super.onCompletion(caller)
151 }
152 }
153 }
154
155
computenull156 private fun compute(coefficients: LongArray, start: Int, end: Int): Double {
157 var result = 0.0
158 for (i in start until end) {
159 result += Math.sin(Math.pow(coefficients[i].toDouble(), 1.1)) + 1e-8
160 }
161
162 return result
163 }
164