1 /* 2 * Copyright 2017 Google Inc. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * https://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package trebuchet.io 18 19 import kotlin.sequences.iterator 20 21 class StreamingReader(val source: BufferProducer, val keepLoadedSize: Int = 8096) : GenericByteBuffer { 22 val windows = mutableListOf<Window>() 23 24 var onWindowReleased: ((Window) -> Unit)? = null 25 var startIndex: Long = 0 26 get private set 27 var endIndex: Long = -1 28 get private set 29 var reachedEof: Boolean = false 30 get private set 31 getnull32 override operator fun get(index: Long): Byte = windowFor(index)[index] 33 override val length: Long 34 get() = endIndex - startIndex + 1 35 36 fun windowFor(i: Long): Window { 37 for (wi in 0..windows.size-1) { 38 val window = windows[wi] 39 if (window.globalStartIndex <= i && window.globalEndIndex >= i) { 40 return window 41 } 42 } 43 throw IndexOutOfBoundsException("$i not in range $startIndex..$endIndex") 44 } 45 loadIndexnull46 fun loadIndex(index: Long): Boolean { 47 while (endIndex < index && !reachedEof) { 48 val nextBuffer = source.next() 49 if (nextBuffer == null) { 50 reachedEof = true 51 return false 52 } 53 addBuffer(nextBuffer) 54 } 55 return index <= endIndex 56 } 57 iternull58 fun iter(startIndex: Long = 0L): Iterator<DataSlice> { 59 return iterator { 60 for (win in windows) { 61 if (startIndex <= win.globalStartIndex) { 62 yield(win.slice) 63 } else if (startIndex <= win.globalEndIndex) { 64 yield(win.slice.slice((startIndex - win.globalStartIndex).toInt())) 65 } 66 } 67 68 while (!reachedEof) { 69 val nextBuffer = source.next() 70 if (nextBuffer != null) { 71 addBuffer(nextBuffer) 72 73 // This variable is a workaround for an apparent bug in the Kotlin 74 // type system that causes it to handle type inference around yield 75 // statements incorrectly. 76 val notNullNextBuffer : DataSlice = nextBuffer 77 yield(notNullNextBuffer) 78 } else { 79 reachedEof = true 80 } 81 } 82 } 83 } 84 addBuffernull85 private fun addBuffer(buffer: DataSlice) { 86 windows.add(Window(buffer, endIndex + 1, endIndex + buffer.length)) 87 endIndex += buffer.length 88 if (windows.size > 2 && endIndex - windows[1].globalStartIndex > keepLoadedSize) { 89 val temp = windows[0] 90 windows.removeAt(0) 91 startIndex = windows[0].globalStartIndex 92 if (onWindowReleased != null) { 93 onWindowReleased!!.invoke(temp) 94 } 95 } 96 } 97 98 class Window(val slice: DataSlice, val globalStartIndex: Long, val globalEndIndex: Long) { 99 @Suppress("NOTHING_TO_INLINE") getnull100 inline operator fun get(i: Long): Byte = slice[(i - globalStartIndex).toInt()] 101 } 102 103 fun copyTo(tmpBuffer: ByteArray, lineStartIndex: Long, lineEndIndex: Long) { 104 var srcIndex = lineStartIndex 105 var dstIndex = 0 106 while (srcIndex <= lineEndIndex && dstIndex < tmpBuffer.size) { 107 val window = windowFor(srcIndex) 108 while (srcIndex <= window.globalEndIndex && dstIndex < tmpBuffer.size) { 109 tmpBuffer[dstIndex++] = window[srcIndex++] 110 } 111 } 112 } 113 }