Skip to content

Commit 0d2aa85

Browse files
authored
Reimplement Source::indexOf(ByteString) without Source::peek calls (#242)
Reimplement indexOf(ByteString) without the use of peek-source. Along with direct access into buffer's data (instead of using `Buffer::get`) it gave a significant performance boost.
1 parent daffa8d commit 0d2aa85

File tree

4 files changed

+144
-30
lines changed

4 files changed

+144
-30
lines changed

core/api/kotlinx-io-core.api

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ public final class kotlinx/io/BuffersKt {
5454
}
5555

5656
public final class kotlinx/io/ByteStringsKt {
57+
public static final fun indexOf (Lkotlinx/io/Buffer;Lkotlinx/io/bytestring/ByteString;J)J
5758
public static final fun indexOf (Lkotlinx/io/Source;Lkotlinx/io/bytestring/ByteString;J)J
59+
public static synthetic fun indexOf$default (Lkotlinx/io/Buffer;Lkotlinx/io/bytestring/ByteString;JILjava/lang/Object;)J
5860
public static synthetic fun indexOf$default (Lkotlinx/io/Source;Lkotlinx/io/bytestring/ByteString;JILjava/lang/Object;)J
5961
public static final fun readByteString (Lkotlinx/io/Source;)Lkotlinx/io/bytestring/ByteString;
6062
public static final fun readByteString (Lkotlinx/io/Source;I)Lkotlinx/io/bytestring/ByteString;

core/common/src/ByteStrings.kt

Lines changed: 54 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -102,38 +102,63 @@ public fun Source.indexOf(byteString: ByteString, startIndex: Long = 0): Long {
102102
}
103103

104104
var offset = startIndex
105-
val peek = peek()
106-
if (!request(startIndex)) {
107-
return -1L
105+
while (request(offset + byteString.size)) {
106+
val idx = buffer.indexOf(byteString, offset)
107+
if (idx < 0) {
108+
// The buffer does not contain the pattern, let's try fetching at least one extra byte
109+
// and start a new search attempt so that the pattern would fit in the suffix of
110+
// the current buffer + 1 extra byte.
111+
offset = buffer.size - byteString.size + 1
112+
} else {
113+
return idx
114+
}
108115
}
109-
peek.skip(offset)
110-
var resultingIndex = -1L
111-
UnsafeByteStringOperations.withByteArrayUnsafe(byteString) { data ->
112-
while (!peek.exhausted()) {
113-
val index = peek.indexOf(data[0])
114-
if (index == -1L) {
115-
return@withByteArrayUnsafe
116-
}
117-
offset += index
118-
peek.skip(index)
119-
if (!peek.request(byteString.size.toLong())) {
120-
return@withByteArrayUnsafe
121-
}
116+
return -1
117+
}
122118

123-
var matches = true
124-
for (idx in data.indices) {
125-
if (data[idx] != peek.buffer[idx.toLong()]) {
126-
matches = false
127-
offset++
128-
peek.skip(1)
129-
break
130-
}
131-
}
132-
if (matches) {
133-
resultingIndex = offset
134-
return@withByteArrayUnsafe
119+
@OptIn(UnsafeByteStringApi::class)
120+
public fun Buffer.indexOf(byteString: ByteString, startIndex: Long = 0): Long {
121+
require(startIndex <= size) {
122+
"startIndex ($startIndex) should not exceed size ($size)"
123+
}
124+
if (byteString.isEmpty()) return 0
125+
if (startIndex > size - byteString.size) return -1L
126+
127+
UnsafeByteStringOperations.withByteArrayUnsafe(byteString) { byteStringData ->
128+
seek(startIndex) { seg, o ->
129+
if (o == -1L) {
130+
return -1L
135131
}
132+
var segment = seg!!
133+
var offset = o
134+
do {
135+
// If start index within this segment, the diff will be positive and
136+
// we'll scan the segment starting from the corresponding offset.
137+
// Otherwise, the diff will be negative and we'll scan the segment from the beginning.
138+
val startOffset = maxOf((startIndex - offset).toInt(), 0)
139+
// Try to search the pattern within the current segment.
140+
val idx = segment.indexOfBytesInbound(byteStringData, startOffset)
141+
if (idx != -1) {
142+
// The offset corresponds to the segment's start, idx - to offset within the segment.
143+
return offset + idx.toLong()
144+
}
145+
// firstOutboundOffset corresponds to a first byte starting reading the pattern from which
146+
// will result in running out of the current segment bounds.
147+
val firstOutboundOffset = maxOf(startOffset, segment.size - byteStringData.size + 1)
148+
// Try to find a pattern in all suffixes shorter than the pattern. These suffixes start
149+
// in the current segment, but ends in the following segments; thus we're using outbound function.
150+
val idx1 = segment.indexOfBytesOutbound(byteStringData, firstOutboundOffset, head)
151+
if (idx1 != -1) {
152+
// Offset corresponds to the segment's start, idx - to offset within the segment.
153+
return offset + idx1.toLong()
154+
}
155+
156+
// We scanned the whole segment, so let's go to the next one
157+
offset += segment.size
158+
segment = segment.next!!
159+
} while (segment !== head && offset + byteString.size <= size)
160+
return -1L
136161
}
137162
}
138-
return resultingIndex
163+
return -1
139164
}

core/common/src/Segment.kt

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,9 @@ internal fun Segment.indexOf(byte: Byte, startOffset: Int, endOffset: Int): Int
201201
require(startOffset in 0 until size) {
202202
"$startOffset"
203203
}
204-
require(endOffset in startOffset..size) { "$endOffset" }
204+
require(endOffset in startOffset..size) {
205+
"$endOffset"
206+
}
205207
val p = pos
206208
for (idx in startOffset until endOffset) {
207209
if (data[p + idx] == byte) {
@@ -210,3 +212,75 @@ internal fun Segment.indexOf(byte: Byte, startOffset: Int, endOffset: Int): Int
210212
}
211213
return -1
212214
}
215+
216+
/**
217+
* Searches for a `bytes` pattern within this segment starting at the offset `startOffset`.
218+
* `startOffset` is relative and should be within `[0, size)`.
219+
*/
220+
internal fun Segment.indexOfBytesInbound(bytes: ByteArray, startOffset: Int): Int {
221+
// require(startOffset in 0 until size)
222+
var offset = startOffset
223+
val limit = size - bytes.size + 1
224+
val firstByte = bytes[0]
225+
while (offset < limit) {
226+
val idx = indexOf(firstByte, offset, limit)
227+
if (idx < 0) {
228+
return -1
229+
}
230+
var found = true
231+
for (innerIdx in 1 until bytes.size) {
232+
if (data[pos + idx + innerIdx] != bytes[innerIdx]) {
233+
found = false
234+
break
235+
}
236+
}
237+
if (found) {
238+
return idx
239+
} else {
240+
offset++
241+
}
242+
}
243+
return -1
244+
}
245+
246+
/**
247+
* Searches for a `bytes` pattern starting in between offset `startOffset` and `size` within this segment
248+
* and continued in the following segments.
249+
* `startOffset` is relative and should be within `[0, size)`.
250+
*/
251+
internal fun Segment.indexOfBytesOutbound(bytes: ByteArray, startOffset: Int, head: Segment?): Int {
252+
var offset = startOffset
253+
val firstByte = bytes[0]
254+
255+
while (offset in 0 until size) {
256+
val idx = indexOf(firstByte, offset, size)
257+
if (idx < 0) {
258+
return -1
259+
}
260+
// The pattern should start in this segment
261+
var seg = this
262+
var scanOffset = offset
263+
264+
var found = true
265+
for (element in bytes) {
266+
// We ran out of bytes in this segment,
267+
// so let's take the next one and continue the scan there.
268+
if (scanOffset == seg.size) {
269+
val next = seg.next
270+
if (next === head) return -1
271+
seg = next!!
272+
scanOffset = 0 // we're scanning the next segment right from the beginning
273+
}
274+
if (element != seg.data[seg.pos + scanOffset]) {
275+
found = false
276+
break
277+
}
278+
scanOffset++
279+
}
280+
if (found) {
281+
return offset
282+
}
283+
offset++
284+
}
285+
return -1
286+
}

core/common/test/AbstractSourceTest.kt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1769,4 +1769,17 @@ abstract class AbstractBufferedSourceTest internal constructor(
17691769
assertEquals((Segment.SIZE * 2 + 1).toLong(), source.indexOf("fg".encodeToByteString()))
17701770
assertEquals((Segment.SIZE * 2 + 2).toLong(), source.indexOf("g".encodeToByteString()))
17711771
}
1772+
1773+
@Test
1774+
fun indexOfByteStringSpanningAcrossMultipleSegments() {
1775+
sink.writeString("a".repeat(SEGMENT_SIZE))
1776+
sink.emit()
1777+
sink.writeString("bbbb")
1778+
sink.emit()
1779+
sink.write(Buffer().also { it.writeString("c".repeat(SEGMENT_SIZE)) }, SEGMENT_SIZE.toLong())
1780+
sink.emit()
1781+
1782+
source.skip(SEGMENT_SIZE - 10L)
1783+
assertEquals(9, source.indexOf("abbbbc".encodeToByteString()))
1784+
}
17721785
}

0 commit comments

Comments
 (0)