1 /*
<lambda>null2 * Copyright 2018 Google Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package trebuchet.util
18
19 import java.io.Closeable
20 import java.util.*
21 import java.util.concurrent.ArrayBlockingQueue
22 import java.util.concurrent.Callable
23 import java.util.concurrent.CompletableFuture
24 import java.util.concurrent.Executors
25 import java.util.concurrent.Future
26 import java.util.concurrent.atomic.AtomicBoolean
27 import java.util.concurrent.atomic.AtomicLong
28 import kotlin.collections.ArrayList
29 import kotlin.concurrent.thread
30 import kotlin.system.measureNanoTime
31 import kotlin.system.measureTimeMillis
32
33 fun ns2ms(ns: Long) = ns / 1000000
34
35 class WorkQueue {
36 private val workArray = ArrayDeque<Runnable?>(8192)
37 private val _finished = AtomicBoolean(false)
38
39 private val finished get() = _finished.get()
40 fun finish() = _finished.set(true)
41
42 private fun spinLoop(duration: Long) {
43 val loopUntil = System.nanoTime() + duration
44 while (System.nanoTime() < loopUntil) {}
45 }
46
47 fun submit(task: Runnable) {
48 synchronized(workArray) {
49 workArray.add(task)
50 }
51 }
52
53 fun processAll() {
54 while (!finished) {
55 var next: Runnable?
56 do {
57 synchronized(workArray) {
58 next = workArray.poll()
59 }
60 if (next == null) {
61 spinLoop(10000)
62 synchronized(workArray) {
63 next = workArray.poll()
64 }
65 }
66 next?.run()
67 } while (next != null)
68 spinLoop(60000)
69 }
70
71 var remaining: Runnable?
72 do {
73 synchronized(workArray) {
74 remaining = workArray.poll()
75 }
76 remaining?.run()
77 } while (remaining != null)
78 }
79 }
80
81 private val ThreadCount = minOf(10, Runtime.getRuntime().availableProcessors())
82
83 class WorkPool : Closeable {
84
85 private var closed = false
86 private val workQueue = WorkQueue()
<lambda>null87 private val threadPool = Array(ThreadCount) {
88 Thread { workQueue.processAll() }.apply { start() }
89 }
90
submitnull91 fun submit(task: Runnable) {
92 workQueue.submit(task)
93 }
94
closenull95 override fun close() {
96 if (closed) return
97 closed = true
98 workQueue.finish()
99 threadPool.forEach { it.join() }
100 }
101 }
102
par_mapnull103 fun<T, U, S> par_map(iterator: Iterator<T>, threadState: () -> S, chunkSize: Int = 50,
104 map: (S, T) -> U): Iterator<U> {
105 val endOfStreamMarker = CompletableFuture<List<U>>()
106 val resultPipe = ArrayBlockingQueue<Future<List<U>>>(1024, false)
107 thread {
108 val threadLocal = ThreadLocal.withInitial { threadState() }
109 WorkPool().use { pool ->
110 while (iterator.hasNext()) {
111 val source = ArrayList<T>(chunkSize)
112 while (source.size < chunkSize && iterator.hasNext()) {
113 source.add(iterator.next())
114 }
115 val future = CompletableFuture<List<U>>()
116 pool.submit(Runnable {
117 val state = threadLocal.get()
118 future.complete(source.map { map(state, it) })
119 })
120 resultPipe.put(future)
121 }
122 resultPipe.put(endOfStreamMarker)
123 }
124 }
125
126 return object : Iterator<U> {
127 var current: Iterator<U>? = null
128
129 init {
130 takeNext()
131 }
132
133 private fun takeNext() {
134 val next = resultPipe.take()
135 if (next != endOfStreamMarker) {
136 current = next.get().iterator()
137 }
138 }
139
140 override fun next(): U {
141 val ret = current!!.next()
142 if (!current!!.hasNext()) {
143 current = null
144 takeNext()
145 }
146 return ret
147 }
148
149 override fun hasNext(): Boolean {
150 return current != null
151 }
152
153 }
154 }