• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 package benchmarks.scheduler
2 
3 import benchmarks.*
4 import kotlinx.coroutines.*
5 import org.openjdk.jmh.annotations.*
6 import java.util.concurrent.*
7 
8 /*
9  * Comparison of fork-join tasks using specific FJP API and classic [async] jobs.
10  * FJP job is organized in perfectly balanced binary tree, every leaf node computes
11  * FPU-heavy sum over its data and intermediate nodes sum results.
12  *
13  * Fine-grained batch size (8192 * 1024 tasks, 128 in sequential batch)
14  * ForkJoinBenchmark.asyncExperimental  avgt   10  681.512 ± 32.069  ms/op
15  * ForkJoinBenchmark.asyncFjp           avgt   10  845.386 ± 73.204  ms/op
16  * ForkJoinBenchmark.fjpRecursiveTask   avgt   10  692.120 ± 26.224  ms/op
17  * ForkJoinBenchmark.fjpTask            avgt   10  791.087 ± 66.544  ms/op
18  *
19  * Too small tasks (8192 * 1024 tasks, 128 batch, 16 in sequential batch)
20  * Benchmark                            Mode  Cnt     Score     Error  Units
21  * ForkJoinBenchmark.asyncExperimental  avgt   10  1273.271 ± 190.372  ms/op
22  * ForkJoinBenchmark.asyncFjp           avgt   10  1406.102 ± 216.793  ms/op
23  * ForkJoinBenchmark.fjpRecursiveTask   avgt   10   849.941 ± 141.254  ms/op
24  * ForkJoinBenchmark.fjpTask            avgt   10   831.554 ±  57.276  ms/op
25  */
26 @Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
27 @Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
28 @Fork(value = 2)
29 @BenchmarkMode(Mode.AverageTime)
30 @OutputTimeUnit(TimeUnit.MILLISECONDS)
31 @State(Scope.Benchmark)
32 open class ForkJoinBenchmark : ParametrizedDispatcherBase() {
33 
34     companion object {
35         /*
36          * Change task size to control global granularity of benchmark
37          * Change batch size to control affinity/work stealing/scheduling overhead effects
38          */
39         const val TASK_SIZE = 8192 * 1024
40         const val BATCH_SIZE = 32 * 8192
41     }
42 
43     lateinit var coefficients: LongArray
44     override var dispatcher: String = "scheduler"
45 
46     @Setup
setupnull47     override fun setup() {
48         super.setup()
49         coefficients = LongArray(TASK_SIZE) { ThreadLocalRandom.current().nextLong(0, 1024 * 1024) }
50     }
51 
52     @Benchmark
<lambda>null53     fun asyncFjp() = runBlocking {
54         CoroutineScope(ForkJoinPool.commonPool().asCoroutineDispatcher()).startAsync(coefficients, 0, coefficients.size).await()
55     }
56 
57     @Benchmark
<lambda>null58     fun asyncExperimental() = runBlocking {
59         startAsync(coefficients, 0, coefficients.size).await()
60     }
61 
62     @Benchmark
fjpRecursiveTasknull63     fun fjpRecursiveTask(): Double {
64         val task = RecursiveAction(coefficients, 0, coefficients.size)
65         return ForkJoinPool.commonPool().submit(task).join()
66     }
67 
68     @Benchmark
fjpTasknull69     fun fjpTask(): Double {
70         val task = Task(coefficients, 0, coefficients.size)
71         return ForkJoinPool.commonPool().submit(task).join()
72     }
73 
<lambda>null74     suspend fun CoroutineScope.startAsync(coefficients: LongArray, start: Int, end: Int): Deferred<Double> = async {
75         if (end - start <= BATCH_SIZE) {
76             compute(coefficients, start, end)
77         } else {
78             val first = startAsync(coefficients, start, start + (end - start) / 2)
79             val second = startAsync(coefficients, start + (end - start) / 2, end)
80             first.await() + second.await()
81         }
82     }
83 
84     class Task(val coefficients: LongArray, val start: Int, val end: Int) : RecursiveTask<Double>() {
computenull85         override fun compute(): Double {
86             if (end - start <= BATCH_SIZE) {
87                 return compute(coefficients, start, end)
88             }
89 
90             val first = Task(coefficients, start, start + (end - start) / 2).fork()
91             val second = Task(coefficients, start + (end - start) / 2, end).fork()
92 
93             var result = 0.0
94             result += first.join()
95             result += second.join()
96             return result
97         }
98 
computenull99         private fun compute(coefficients: LongArray, start: Int, end: Int): Double {
100             var result = 0.0
101             for (i in start until end) {
102                 result += Math.sin(Math.pow(coefficients[i].toDouble(), 1.1)) + 1e-8
103             }
104 
105             return result
106         }
107     }
108 
109     class RecursiveAction(val coefficients: LongArray, val start: Int, val end: Int, @Volatile var result: Double = 0.0,
110                           parent: RecursiveAction? = null) : CountedCompleter<Double>(parent) {
111 
112         private var first: ForkJoinTask<Double>? = null
113         private var second: ForkJoinTask<Double>? = null
114 
getRawResultnull115         override fun getRawResult(): Double {
116             return result
117         }
118 
setRawResultnull119         override fun setRawResult(t: Double) {
120             result = t
121         }
122 
computenull123         override fun compute() {
124             if (end - start <= BATCH_SIZE) {
125                 rawResult = compute(coefficients, start, end)
126             } else {
127                 pendingCount = 2
128                 // One may fork only once here and executing second task here with looping over firstComplete to be even more efficient
129                 first = RecursiveAction(
130                     coefficients,
131                     start,
132                     start + (end - start) / 2,
133                     parent = this
134                 ).fork()
135                 second = RecursiveAction(
136                     coefficients,
137                     start + (end - start) / 2,
138                     end,
139                     parent = this
140                 ).fork()
141             }
142 
143             tryComplete()
144         }
145 
onCompletionnull146         override fun onCompletion(caller: CountedCompleter<*>?) {
147             if (caller !== this) {
148                 rawResult = first!!.rawResult + second!!.rawResult
149             }
150             super.onCompletion(caller)
151         }
152     }
153 }
154 
155 
computenull156 private fun compute(coefficients: LongArray, start: Int, end: Int): Double {
157     var result = 0.0
158     for (i in start until end) {
159         result += Math.sin(Math.pow(coefficients[i].toDouble(), 1.1)) + 1e-8
160     }
161 
162     return result
163 }
164