From 14247a231f4f300d6fac7cf3af65e3063cdb8b43 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Fri, 18 Nov 2022 17:29:09 +0100 Subject: [PATCH 1/3] Handroll CharsetReader implementation --- .../json/internal/CharsetReader.kt | 120 ++++++++++++++++++ .../json/internal/JvmJsonStreams.kt | 4 +- 2 files changed, 122 insertions(+), 2 deletions(-) create mode 100644 formats/json/jvmMain/src/kotlinx/serialization/json/internal/CharsetReader.kt diff --git a/formats/json/jvmMain/src/kotlinx/serialization/json/internal/CharsetReader.kt b/formats/json/jvmMain/src/kotlinx/serialization/json/internal/CharsetReader.kt new file mode 100644 index 0000000000..d2ba1afc27 --- /dev/null +++ b/formats/json/jvmMain/src/kotlinx/serialization/json/internal/CharsetReader.kt @@ -0,0 +1,120 @@ +package kotlinx.serialization.json.internal + +import java.io.* +import java.nio.* +import java.nio.charset.* + +internal class CharsetReader( + private val inputStream: InputStream, + private val charset: Charset +) { + private val decoder: CharsetDecoder + private val byteBuffer: ByteBuffer + + // Surrogate-handling in cases when a single char is requested, but two were read + private var hasLeftoverPotentiallySurrogateChar = false + private var leftoverChar = 0.toChar() + + init { + decoder = charset.newDecoder() + .onMalformedInput(CodingErrorAction.REPLACE) + .onUnmappableCharacter(CodingErrorAction.REPLACE) + byteBuffer = ByteBuffer.allocate(32) + byteBuffer.flip() // Make empty + } + + @Suppress("NAME_SHADOWING") + fun read(array: CharArray, offset: Int, length: Int): Int { + if (length == 0) return 0 + require(offset in 0 until array.size && length >= 0 && offset + length <= array.size) { + "Unexpected arguments: $offset, $length, ${array.size}" + } + + var offset = offset + var length = length + var bytesRead = 0 + if (hasLeftoverPotentiallySurrogateChar) { + array[offset] = leftoverChar + offset++ + length-- + hasLeftoverPotentiallySurrogateChar = false + bytesRead = 1 + if (length == 0) return bytesRead + } + if (length == 1) { + // Treat single-character array reads just like read() + val c = oneShotReadSlowPath() + if (c == -1) return if (bytesRead == 0) -1 else bytesRead + array[offset] = c.toChar() + return bytesRead + 1 + } + return doRead(array, offset, length) + bytesRead + } + + private fun doRead(array: CharArray, offset: Int, length: Int): Int { + var charBuffer = CharBuffer.wrap(array, offset, length) + if (charBuffer.position() != 0) { + charBuffer = charBuffer.slice() + } + var isEof = false + while (true) { + val cr = decoder.decode(byteBuffer, charBuffer, isEof) + if (cr.isUnderflow) { + if (isEof) break + if (!charBuffer.hasRemaining()) break + val n = fillByteBuffer() + if (n < 0) { + isEof = true + if (charBuffer.position() == 0 && !byteBuffer.hasRemaining()) break + decoder.reset() + } + continue + } + if (cr.isOverflow) { + assert(charBuffer.position() > 0) + break + } + cr.throwException() + } + if (isEof) decoder.reset() + return if (charBuffer.position() == 0) -1 + else charBuffer.position() + } + + private fun fillByteBuffer(): Int { + byteBuffer.compact() + try { + // Read from the input stream, and then update the buffer + val limit = byteBuffer.limit() + val position = byteBuffer.position() + val remaining = if (position <= limit) limit - position else 0 + val bytesRead = inputStream.read(byteBuffer.array(), byteBuffer.arrayOffset() + position, remaining) + if (bytesRead < 0) return bytesRead + byteBuffer.position(position + bytesRead) + } finally { + byteBuffer.flip() + } + return byteBuffer.remaining() + } + + private fun oneShotReadSlowPath(): Int { + // Return the leftover char, if there is one + if (hasLeftoverPotentiallySurrogateChar) { + hasLeftoverPotentiallySurrogateChar = false + return leftoverChar.code + } + + val array = CharArray(2) + val bytesRead = read(array, 0, 2) + return when (bytesRead) { + -1 -> -1 + 1 -> array[0].code + 2 -> { + leftoverChar = array[1] + hasLeftoverPotentiallySurrogateChar = true + array[0].code + } + else -> error("Unreachable state: $bytesRead") + } + } +} diff --git a/formats/json/jvmMain/src/kotlinx/serialization/json/internal/JvmJsonStreams.kt b/formats/json/jvmMain/src/kotlinx/serialization/json/internal/JvmJsonStreams.kt index 746c0441ab..79e668b160 100644 --- a/formats/json/jvmMain/src/kotlinx/serialization/json/internal/JvmJsonStreams.kt +++ b/formats/json/jvmMain/src/kotlinx/serialization/json/internal/JvmJsonStreams.kt @@ -2,7 +2,6 @@ package kotlinx.serialization.json.internal import java.io.InputStream import java.io.OutputStream -import java.nio.charset.Charset internal class JsonToJavaStreamWriter(private val stream: OutputStream) : JsonWriter { private val buffer = ByteArrayPool.take() @@ -255,7 +254,8 @@ internal class JsonToJavaStreamWriter(private val stream: OutputStream) : JsonWr } internal class JavaStreamSerialReader(stream: InputStream) : SerialReader { - private val reader = stream.reader(Charsets.UTF_8) + // NB: not closed on purpose, it is responsibility of the caller + private val reader = CharsetReader(stream, Charsets.UTF_8) override fun read(buffer: CharArray, bufferOffset: Int, count: Int): Int { return reader.read(buffer, bufferOffset, count) From 0dea861055776426b6c3f3235cb50ee8b01bede8 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Fri, 18 Nov 2022 17:43:36 +0100 Subject: [PATCH 2/3] Pool byte arrays for charset decoding --- .../kotlinx/serialization/json/JvmStreams.kt | 7 ++- .../{CharArrayPool.kt => ArrayPools.kt} | 53 +++++++++++++++---- .../json/internal/ByteArrayPool.kt | 30 ----------- .../json/internal/CharsetReader.kt | 6 ++- .../json/internal/JvmJsonStreams.kt | 6 ++- 5 files changed, 60 insertions(+), 42 deletions(-) rename formats/json/jvmMain/src/kotlinx/serialization/json/internal/{CharArrayPool.kt => ArrayPools.kt} (51%) delete mode 100644 formats/json/jvmMain/src/kotlinx/serialization/json/internal/ByteArrayPool.kt diff --git a/formats/json/jvmMain/src/kotlinx/serialization/json/JvmStreams.kt b/formats/json/jvmMain/src/kotlinx/serialization/json/JvmStreams.kt index 81bfc56317..4af1045326 100644 --- a/formats/json/jvmMain/src/kotlinx/serialization/json/JvmStreams.kt +++ b/formats/json/jvmMain/src/kotlinx/serialization/json/JvmStreams.kt @@ -56,7 +56,12 @@ public fun Json.decodeFromStream( deserializer: DeserializationStrategy, stream: InputStream ): T { - return decodeByReader(deserializer, JavaStreamSerialReader(stream)) + val reader = JavaStreamSerialReader(stream) + try { + return decodeByReader(deserializer, reader) + } finally { + reader.release() + } } /** diff --git a/formats/json/jvmMain/src/kotlinx/serialization/json/internal/CharArrayPool.kt b/formats/json/jvmMain/src/kotlinx/serialization/json/internal/ArrayPools.kt similarity index 51% rename from formats/json/jvmMain/src/kotlinx/serialization/json/internal/CharArrayPool.kt rename to formats/json/jvmMain/src/kotlinx/serialization/json/internal/ArrayPools.kt index 08d22ec83d..da0692d1e2 100644 --- a/formats/json/jvmMain/src/kotlinx/serialization/json/internal/CharArrayPool.kt +++ b/formats/json/jvmMain/src/kotlinx/serialization/json/internal/ArrayPools.kt @@ -3,20 +3,18 @@ */ package kotlinx.serialization.json.internal -import java.util.concurrent.* +/* + * Not really documented kill switch as a workaround for potential + * (unlikely) problems with memory consumptions. + */ +private val MAX_CHARS_IN_POOL = runCatching { + System.getProperty("kotlinx.serialization.json.pool.size").toIntOrNull() +}.getOrNull() ?: 1024 * 1024 internal open class CharArrayPoolBase { private val arrays = ArrayDeque() private var charsTotal = 0 - /* - * Not really documented kill switch as a workaround for potential - * (unlikely) problems with memory consumptions. - */ - private val MAX_CHARS_IN_POOL = runCatching { - System.getProperty("kotlinx.serialization.json.pool.size").toIntOrNull() - }.getOrNull() ?: 1024 * 1024 // 2 MB seems to be a reasonable constraint, (1M of chars) - protected fun take(size: Int): CharArray { /* * Initially the pool is empty, so an instance will be allocated @@ -52,3 +50,40 @@ internal actual object CharArrayPoolBatchSize : CharArrayPoolBase() { releaseImpl(array) } } + +// Byte array pool + +internal open class ByteArrayPoolBase { + private val arrays = ArrayDeque() + private var bytesTotal = 0 + + protected fun take(size: Int): ByteArray { + /* + * Initially the pool is empty, so an instance will be allocated + * and the pool will be populated in the 'release' + */ + val candidate = synchronized(this) { + arrays.removeLastOrNull()?.also { bytesTotal -= it.size / 2 } + } + return candidate ?: ByteArray(size) + } + + protected fun releaseImpl(array: ByteArray): Unit = synchronized(this) { + if (bytesTotal + array.size >= MAX_CHARS_IN_POOL) return@synchronized + bytesTotal += array.size / 2 + arrays.addLast(array) + } +} + +internal object ByteArrayPool8k : ByteArrayPoolBase() { + fun take(): ByteArray = super.take(8196) + + fun release(array: ByteArray) = releaseImpl(array) +} + + +internal object ByteArrayPool : ByteArrayPoolBase() { + fun take(): ByteArray = super.take(128) + + fun release(array: ByteArray) = releaseImpl(array) +} diff --git a/formats/json/jvmMain/src/kotlinx/serialization/json/internal/ByteArrayPool.kt b/formats/json/jvmMain/src/kotlinx/serialization/json/internal/ByteArrayPool.kt deleted file mode 100644 index a1ab1bc0b8..0000000000 --- a/formats/json/jvmMain/src/kotlinx/serialization/json/internal/ByteArrayPool.kt +++ /dev/null @@ -1,30 +0,0 @@ -package kotlinx.serialization.json.internal - -internal object ByteArrayPool { - private val arrays = ArrayDeque() - private var charsTotal = 0 - /* - * Not really documented kill switch as a workaround for potential - * (unlikely) problems with memory consumptions. - */ - private val MAX_CHARS_IN_POOL = runCatching { - System.getProperty("kotlinx.serialization.json.pool.size").toIntOrNull() - }.getOrNull() ?: 2 * 1024 * 1024 // 2 MB seems to be a reasonable constraint, (1M of chars) - - fun take(): ByteArray { - /* - * Initially the pool is empty, so an instance will be allocated - * and the pool will be populated in the 'release' - */ - val candidate = synchronized(this) { - arrays.removeLastOrNull()?.also { charsTotal -= it.size } - } - return candidate ?: ByteArray(512) - } - - fun release(array: ByteArray) = synchronized(this) { - if (charsTotal + array.size >= MAX_CHARS_IN_POOL) return@synchronized - charsTotal += array.size - arrays.addLast(array) - } -} diff --git a/formats/json/jvmMain/src/kotlinx/serialization/json/internal/CharsetReader.kt b/formats/json/jvmMain/src/kotlinx/serialization/json/internal/CharsetReader.kt index d2ba1afc27..f5bf0cf6ba 100644 --- a/formats/json/jvmMain/src/kotlinx/serialization/json/internal/CharsetReader.kt +++ b/formats/json/jvmMain/src/kotlinx/serialization/json/internal/CharsetReader.kt @@ -19,7 +19,7 @@ internal class CharsetReader( decoder = charset.newDecoder() .onMalformedInput(CodingErrorAction.REPLACE) .onUnmappableCharacter(CodingErrorAction.REPLACE) - byteBuffer = ByteBuffer.allocate(32) + byteBuffer = ByteBuffer.wrap(ByteArrayPool8k.take()) byteBuffer.flip() // Make empty } @@ -117,4 +117,8 @@ internal class CharsetReader( else -> error("Unreachable state: $bytesRead") } } + + public fun release() { + ByteArrayPool8k.release(byteBuffer.array()) + } } diff --git a/formats/json/jvmMain/src/kotlinx/serialization/json/internal/JvmJsonStreams.kt b/formats/json/jvmMain/src/kotlinx/serialization/json/internal/JvmJsonStreams.kt index 79e668b160..274dd4dc2d 100644 --- a/formats/json/jvmMain/src/kotlinx/serialization/json/internal/JvmJsonStreams.kt +++ b/formats/json/jvmMain/src/kotlinx/serialization/json/internal/JvmJsonStreams.kt @@ -254,10 +254,14 @@ internal class JsonToJavaStreamWriter(private val stream: OutputStream) : JsonWr } internal class JavaStreamSerialReader(stream: InputStream) : SerialReader { - // NB: not closed on purpose, it is responsibility of the caller + // NB: not closed on purpose, it is the responsibility of the caller private val reader = CharsetReader(stream, Charsets.UTF_8) override fun read(buffer: CharArray, bufferOffset: Int, count: Int): Int { return reader.read(buffer, bufferOffset, count) } + + fun release() { + reader.release() + } } From 07e67b4eb58d44bbf2afa6b3f6927436df69a44e Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Thu, 24 Nov 2022 17:48:18 +0100 Subject: [PATCH 3/3] ~fix merge inconsistencies --- .../src/kotlinx/serialization/json/internal/ArrayPools.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/formats/json/jvmMain/src/kotlinx/serialization/json/internal/ArrayPools.kt b/formats/json/jvmMain/src/kotlinx/serialization/json/internal/ArrayPools.kt index da0692d1e2..0d36c6c03b 100644 --- a/formats/json/jvmMain/src/kotlinx/serialization/json/internal/ArrayPools.kt +++ b/formats/json/jvmMain/src/kotlinx/serialization/json/internal/ArrayPools.kt @@ -9,7 +9,7 @@ package kotlinx.serialization.json.internal */ private val MAX_CHARS_IN_POOL = runCatching { System.getProperty("kotlinx.serialization.json.pool.size").toIntOrNull() -}.getOrNull() ?: 1024 * 1024 +}.getOrNull() ?: 2 * 1024 * 1024 internal open class CharArrayPoolBase { private val arrays = ArrayDeque() @@ -83,7 +83,7 @@ internal object ByteArrayPool8k : ByteArrayPoolBase() { internal object ByteArrayPool : ByteArrayPoolBase() { - fun take(): ByteArray = super.take(128) + fun take(): ByteArray = super.take(512) fun release(array: ByteArray) = releaseImpl(array) }