• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package benchmarks.scheduler
6 
7 import benchmarks.*
8 import kotlinx.coroutines.*
9 import org.openjdk.jmh.annotations.*
10 import java.util.concurrent.*
11 
12 /*
13  * Comparison of fork-join tasks using specific FJP API and classic [async] jobs.
14  * FJP job is organized in perfectly balanced binary tree, every leaf node computes
15  * FPU-heavy sum over its data and intermediate nodes sum results.
16  *
17  * Fine-grained batch size (8192 * 1024 tasks, 128 in sequential batch)
18  * ForkJoinBenchmark.asyncExperimental  avgt   10  681.512 ± 32.069  ms/op
19  * ForkJoinBenchmark.asyncFjp           avgt   10  845.386 ± 73.204  ms/op
20  * ForkJoinBenchmark.fjpRecursiveTask   avgt   10  692.120 ± 26.224  ms/op
21  * ForkJoinBenchmark.fjpTask            avgt   10  791.087 ± 66.544  ms/op
22  *
23  * Too small tasks (8192 * 1024 tasks, 128 batch, 16 in sequential batch)
24  * Benchmark                            Mode  Cnt     Score     Error  Units
25  * ForkJoinBenchmark.asyncExperimental  avgt   10  1273.271 ± 190.372  ms/op
26  * ForkJoinBenchmark.asyncFjp           avgt   10  1406.102 ± 216.793  ms/op
27  * ForkJoinBenchmark.fjpRecursiveTask   avgt   10   849.941 ± 141.254  ms/op
28  * ForkJoinBenchmark.fjpTask            avgt   10   831.554 ±  57.276  ms/op
29  */
30 @Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
31 @Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
32 @Fork(value = 2)
33 @BenchmarkMode(Mode.AverageTime)
34 @OutputTimeUnit(TimeUnit.MILLISECONDS)
35 @State(Scope.Benchmark)
36 open class ForkJoinBenchmark : ParametrizedDispatcherBase() {
37 
38     companion object {
39         /*
40          * Change task size to control global granularity of benchmark
41          * Change batch size to control affinity/work stealing/scheduling overhead effects
42          */
43         const val TASK_SIZE = 8192 * 1024
44         const val BATCH_SIZE = 32 * 8192
45     }
46 
47     lateinit var coefficients: LongArray
48     override var dispatcher: String = "scheduler"
49 
50     @Setup
setupnull51     override fun setup() {
52         super.setup()
53         coefficients = LongArray(TASK_SIZE) { ThreadLocalRandom.current().nextLong(0, 1024 * 1024) }
54     }
55 
56     @Benchmark
<lambda>null57     fun asyncFjp() = runBlocking {
58         CoroutineScope(ForkJoinPool.commonPool().asCoroutineDispatcher()).startAsync(coefficients, 0, coefficients.size).await()
59     }
60 
61     @Benchmark
<lambda>null62     fun asyncExperimental() = runBlocking {
63         startAsync(coefficients, 0, coefficients.size).await()
64     }
65 
66     @Benchmark
fjpRecursiveTasknull67     fun fjpRecursiveTask(): Double {
68         val task = RecursiveAction(coefficients, 0, coefficients.size)
69         return ForkJoinPool.commonPool().submit(task).join()
70     }
71 
72     @Benchmark
fjpTasknull73     fun fjpTask(): Double {
74         val task = Task(coefficients, 0, coefficients.size)
75         return ForkJoinPool.commonPool().submit(task).join()
76     }
77 
<lambda>null78     suspend fun CoroutineScope.startAsync(coefficients: LongArray, start: Int, end: Int): Deferred<Double> = async {
79         if (end - start <= BATCH_SIZE) {
80             compute(coefficients, start, end)
81         } else {
82             val first = startAsync(coefficients, start, start + (end - start) / 2)
83             val second = startAsync(coefficients, start + (end - start) / 2, end)
84             first.await() + second.await()
85         }
86     }
87 
88     class Task(val coefficients: LongArray, val start: Int, val end: Int) : RecursiveTask<Double>() {
computenull89         override fun compute(): Double {
90             if (end - start <= BATCH_SIZE) {
91                 return compute(coefficients, start, end)
92             }
93 
94             val first = Task(coefficients, start, start + (end - start) / 2).fork()
95             val second = Task(coefficients, start + (end - start) / 2, end).fork()
96 
97             var result = 0.0
98             result += first.join()
99             result += second.join()
100             return result
101         }
102 
computenull103         private fun compute(coefficients: LongArray, start: Int, end: Int): Double {
104             var result = 0.0
105             for (i in start until end) {
106                 result += Math.sin(Math.pow(coefficients[i].toDouble(), 1.1)) + 1e-8
107             }
108 
109             return result
110         }
111     }
112 
113     class RecursiveAction(val coefficients: LongArray, val start: Int, val end: Int, @Volatile var result: Double = 0.0,
114                           parent: RecursiveAction? = null) : CountedCompleter<Double>(parent) {
115 
116         private var first: ForkJoinTask<Double>? = null
117         private var second: ForkJoinTask<Double>? = null
118 
getRawResultnull119         override fun getRawResult(): Double {
120             return result
121         }
122 
setRawResultnull123         override fun setRawResult(t: Double) {
124             result = t
125         }
126 
computenull127         override fun compute() {
128             if (end - start <= BATCH_SIZE) {
129                 rawResult = compute(coefficients, start, end)
130             } else {
131                 pendingCount = 2
132                 // One may fork only once here and executing second task here with looping over firstComplete to be even more efficient
133                 first = RecursiveAction(
134                     coefficients,
135                     start,
136                     start + (end - start) / 2,
137                     parent = this
138                 ).fork()
139                 second = RecursiveAction(
140                     coefficients,
141                     start + (end - start) / 2,
142                     end,
143                     parent = this
144                 ).fork()
145             }
146 
147             tryComplete()
148         }
149 
onCompletionnull150         override fun onCompletion(caller: CountedCompleter<*>?) {
151             if (caller !== this) {
152                 rawResult = first!!.rawResult + second!!.rawResult
153             }
154             super.onCompletion(caller)
155         }
156     }
157 }
158 
159 
computenull160 private fun compute(coefficients: LongArray, start: Int, end: Int): Double {
161     var result = 0.0
162     for (i in start until end) {
163         result += Math.sin(Math.pow(coefficients[i].toDouble(), 1.1)) + 1e-8
164     }
165 
166     return result
167 }
168