From 1745d7a7c5d6fe60c1184149c33de2ee53b9313f Mon Sep 17 00:00:00 2001 From: Iurii Malchenko Date: Sun, 23 Jan 2022 00:36:11 +0200 Subject: [PATCH 1/6] chunk inflater --- build.sbt | 3 + .../fs2/compression/CompressionPlatform.scala | 196 +-- .../compression/internal/ChunkInflater.scala | 37 + .../compression/internal/InflatePipe.scala | 184 +++ .../internal/MakeChunkInflater.scala | 35 + .../src/test/scala/fs2/CompressionSuite.scala | 1056 ++++++++--------- .../scala/fs2/io/compressionplatform.scala | 212 ++-- 7 files changed, 968 insertions(+), 755 deletions(-) create mode 100644 core/shared/src/main/scala/fs2/compression/internal/ChunkInflater.scala create mode 100644 core/shared/src/main/scala/fs2/compression/internal/InflatePipe.scala create mode 100644 core/shared/src/main/scala/fs2/compression/internal/MakeChunkInflater.scala diff --git a/build.sbt b/build.sbt index b49395da07..8abb6d7f5a 100644 --- a/build.sbt +++ b/build.sbt @@ -159,6 +159,9 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq( "fs2.compression.Compression.gunzip$default$1$" ), ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.ChunkCompanionPlatform.makeArrayBuilder"), + ProblemFilters.exclude[ReversedMissingMethodProblem]( + "fs2.compression.Compression.gunzip" + ), ProblemFilters.exclude[ReversedMissingMethodProblem]( "fs2.compression.Compression.gzip" ) // Compression is a sealed trait with an implementation diff --git a/core/jvm/src/main/scala/fs2/compression/CompressionPlatform.scala b/core/jvm/src/main/scala/fs2/compression/CompressionPlatform.scala index f4f0fd2995..72bbe6482e 100644 --- a/core/jvm/src/main/scala/fs2/compression/CompressionPlatform.scala +++ b/core/jvm/src/main/scala/fs2/compression/CompressionPlatform.scala @@ -25,7 +25,7 @@ package compression import cats.effect.Ref import cats.effect.kernel.Sync import cats.syntax.all._ -import fs2.compression.internal.CrcBuilder +import fs2.compression.internal.{ChunkInflater, CrcBuilder, InflatePipe, MakeChunkInflater} import java.time.Instant import java.util.concurrent.TimeUnit @@ -126,6 +126,51 @@ private[compression] trait CompressionPlatform[F[_]] { self: Compression[F] => private[compression] trait CompressionCompanionPlatform { + private implicit def makeChunkInflaterForSync[F[_]](implicit + F: Sync[F] + ): MakeChunkInflater[F] = new MakeChunkInflater[F] { + + def withChunkInflater( + inflateParams: InflateParams + )( + body: ChunkInflater[F] => Pull[F, Byte, Unit] + ): Pull[F, Byte, Unit] = + Pull + .bracketCase[F, Byte, Inflater, Unit]( + Pull.eval(F.delay(new Inflater(inflateParams.header.juzDeflaterNoWrap))), + inflater => body(chunkInflater(inflateParams, inflater)), + (inflater, _) => Pull.eval(F.delay(inflater.end())) + ) + + private def chunkInflater[F[_]]( + inflateParams: InflateParams, + inflater: Inflater + ): ChunkInflater[F] = { + val inflatedBuffer = new Array[Byte](inflateParams.bufferSizeOrMinimum) + new ChunkInflater[F] { + def inflateChunk( + bytesChunk: Chunk.ArraySlice[Byte], + offset: Int + ): Pull[F, INothing, (Chunk[Byte], Int, Boolean)] = { + inflater.setInput( + bytesChunk.values, + bytesChunk.offset + offset, + bytesChunk.length - offset + ) + val inflatedBytes = inflater.inflate(inflatedBuffer) + Pull.pure( + ( + copyAsChunkBytes(inflatedBuffer, inflatedBytes), + inflater.getRemaining, + inflater.finished() + ) + ) + } + } + } + + } + implicit def forSync[F[_]](implicit F: Sync[F]): Compression[F] = new Compression.UnsealedCompression[F] { @@ -233,134 +278,7 @@ private[compression] trait CompressionCompanionPlatform { * @param inflateParams See [[compression.InflateParams]] */ def inflate(inflateParams: InflateParams): Pipe[F, Byte, Byte] = - stream => _inflate_chunks(inflateParams, none, none, none, trailerSize = 0)(stream) - - private def inflateAndTrailer( - inflateParams: InflateParams, - trailerSize: Int - ): Stream[F, Byte] => Stream[ - F, - (Stream[F, Byte], Ref[F, Chunk[Byte]], Ref[F, Long], Ref[F, Long]) - ] = in => - Stream.suspend { - Stream - .eval( - ( - Ref.of[F, Chunk[Byte]](Chunk.empty), - Ref.of[F, Long](0), - Ref.of[F, Long](0) - ).tupled - ) - .map { case (trailerChunk, bytesWritten, crc32) => - ( - _inflate_chunks( - inflateParams, - trailerChunk.some, - bytesWritten.some, - crc32.some, - trailerSize - )( - in - ), - trailerChunk, - bytesWritten, - crc32 - ) - } - } - - private def _inflate_chunks( - inflateParams: InflateParams, - trailerChunk: Option[Ref[F, Chunk[Byte]]], - bytesWritten: Option[Ref[F, Long]], - crc32: Option[Ref[F, Long]], - trailerSize: Int - ): Stream[F, Byte] => Stream[F, Byte] = stream => - Pull - .bracketCase[F, Byte, Inflater, Unit]( - Pull.eval(F.delay(new Inflater(inflateParams.header.juzDeflaterNoWrap))), - inflater => { - val track = trailerChunk.isDefined && bytesWritten.isDefined && crc32.isDefined - val inflatedBuffer = new Array[Byte](inflateParams.bufferSizeOrMinimum) - val crcBuilder = new CrcBuilder - - def setRefs(trailerBytes: Chunk[Byte]) = - Pull.eval { - trailerChunk.fold(F.unit)(_.set(trailerBytes)) >> - bytesWritten.fold(F.unit)(_.set(inflater.getBytesWritten)) >> - crc32.fold(F.unit)(_.set(crcBuilder.getValue)) - } - - def setTrailerChunk( - remaining: Chunk[Byte] - ): Stream[F, Byte] => Pull[F, INothing, Unit] = - _.pull.uncons.flatMap { - case None => - setRefs(remaining) - case Some((chunk, rest)) => - if (remaining.size + chunk.size > trailerSize) { - setRefs(remaining ++ chunk.take(trailerSize - remaining.size)) - } else { - setTrailerChunk(remaining ++ chunk)(rest) - } - } - - def inflateChunk( - bytesChunk: Chunk.ArraySlice[Byte], - offset: Int - ): Pull[F, Byte, Option[Chunk[Byte]]] = { - inflater.setInput( - bytesChunk.values, - bytesChunk.offset + offset, - bytesChunk.length - offset - ) - val inflatedBytes = inflater.inflate(inflatedBuffer) - if (track) crcBuilder.update(inflatedBuffer, 0, inflatedBytes) - Pull.output(copyAsChunkBytes(inflatedBuffer, inflatedBytes)) >> { - val remainingBytes = inflater.getRemaining - if (!inflater.finished()) { - if (remainingBytes > 0) - inflateChunk(bytesChunk, bytesChunk.length - remainingBytes) - else - Pull.pure(none) - } else { - if (remainingBytes > 0) - Pull.pure( - Chunk - .array( - bytesChunk.values, - bytesChunk.offset + bytesChunk.length - remainingBytes, - if (remainingBytes < trailerSize) remainingBytes - else trailerSize // don't need more than that - ) - .some - ) - else - Pull.pure(Chunk.empty.some) - } - } - } - - def pull: Stream[F, Byte] => Pull[F, Byte, Unit] = in => - in.pull.uncons.flatMap { - case None => Pull.done - case Some((chunk, rest)) => - inflateChunk(chunk.toArraySlice, 0).flatMap { - case None => pull(rest) - case Some(remaining) => - if (track) { - setTrailerChunk(remaining)(rest) - } else { - Pull.done - } - } - } - - pull(stream) - }, - (inflater, _) => Pull.eval(F.delay(inflater.end())) - ) - .stream + InflatePipe.inflateChunks(inflateParams, none, none, none, trailerSize = 0) def gzip( fileName: Option[String], @@ -378,14 +296,18 @@ private[compression] trait CompressionCompanionPlatform { ) def gunzip(inflateParams: InflateParams): Stream[F, Byte] => Stream[F, GunzipResult[F]] = - gzip.gunzip(inflateAndTrailer(inflateParams, gzip.gzipTrailerBytes), inflateParams) - - private def copyAsChunkBytes(values: Array[Byte], length: Int): Chunk[Byte] = - if (length > 0) { - val target = new Array[Byte](length) - System.arraycopy(values, 0, target, 0, length) - Chunk.array(target, 0, length) - } else Chunk.empty[Byte] + gzip.gunzip( + InflatePipe.inflateAndTrailer(inflateParams, gzip.gzipTrailerBytes), + inflateParams + ) } + + private def copyAsChunkBytes(values: Array[Byte], length: Int): Chunk[Byte] = + if (length > 0) { + val target = new Array[Byte](length) + System.arraycopy(values, 0, target, 0, length) + Chunk.array(target, 0, length) + } else Chunk.empty[Byte] + } diff --git a/core/shared/src/main/scala/fs2/compression/internal/ChunkInflater.scala b/core/shared/src/main/scala/fs2/compression/internal/ChunkInflater.scala new file mode 100644 index 0000000000..ec0780855a --- /dev/null +++ b/core/shared/src/main/scala/fs2/compression/internal/ChunkInflater.scala @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2 +package compression +package internal + +trait ChunkInflater[F[_]] { + + /** @param bytesChunk bytes to inflate + * @param offset offset + * @return (inflatedChunk, remainingBytes, finished) + */ + def inflateChunk( + bytesChunk: Chunk.ArraySlice[Byte], + offset: Int + ): Pull[F, INothing, (Chunk[Byte], Int, Boolean)] + +} diff --git a/core/shared/src/main/scala/fs2/compression/internal/InflatePipe.scala b/core/shared/src/main/scala/fs2/compression/internal/InflatePipe.scala new file mode 100644 index 0000000000..965b665a50 --- /dev/null +++ b/core/shared/src/main/scala/fs2/compression/internal/InflatePipe.scala @@ -0,0 +1,184 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2.compression.internal + +import cats.syntax.all._ +import cats.effect.{Ref, Sync} +import fs2.compression.InflateParams +import fs2.{Chunk, INothing, Pull, Stream} + +object InflatePipe { + + def inflateChunks[F[_]]( + inflateParams: InflateParams, + trailerChunkRef: Option[Ref[F, Chunk[Byte]]], + bytesWrittenRef: Option[Ref[F, Long]], + crc32Ref: Option[Ref[F, Long]], + trailerSize: Int + )(implicit + makeChunkInflater: MakeChunkInflater[F], + F: Sync[F] + ): Stream[F, Byte] => Stream[F, Byte] = + stream => + makeChunkInflater + .withChunkInflater(inflateParams) { inflater => + val track = + trailerChunkRef.isDefined && bytesWrittenRef.isDefined && crc32Ref.isDefined + val crcBuilder = new CrcBuilder + + def setRefs(trailerBytes: Chunk[Byte], bytesWritten: Long) = + Pull.eval { + trailerChunkRef.fold(F.unit)(_.set(trailerBytes)) >> + bytesWrittenRef.fold(F.unit)(_.set(bytesWritten)) >> + crc32Ref.fold(F.unit)(_.set(crcBuilder.getValue)) + } + + def setTrailerChunk( + remaining: Chunk[Byte], + bytesWritten: Long + ): Stream[F, Byte] => Pull[F, INothing, Unit] = + _.pull.uncons.flatMap { + case None => + setRefs(remaining, bytesWritten) + case Some((chunk, rest)) => + if (remaining.size + chunk.size > trailerSize) { + setRefs(remaining ++ chunk.take(trailerSize - remaining.size), bytesWritten) + } else { + setTrailerChunk(remaining ++ chunk, bytesWritten)(rest) + } + } + + def inflateChunk( + bytesChunk: Chunk.ArraySlice[Byte], + offset: Int, + inflatedBytesSoFar: Long + ): Pull[F, Byte, (Chunk[Byte], Long, Boolean)] = + inflater.inflateChunk(bytesChunk, offset).flatMap { + case (inflatedChunk, remainingBytes, finished) => +// println(s"inflatedBytes: ${inflatedChunk.size}") +// println(s"remainingBytes: $remainingBytes") +// println(s"finished: $finished") + if (track) crcBuilder.update(inflatedChunk) + Chunk.empty.takeRight(2) + Pull.output(inflatedChunk) >> { + if (!finished) { + if (remainingBytes > 0) { + inflateChunk( + bytesChunk, + bytesChunk.length - remainingBytes, + inflatedBytesSoFar + inflatedChunk.size + ) + } else { + Pull.pure((Chunk.empty, inflatedBytesSoFar + inflatedChunk.size, false)) + } + } else { + val remainingChunk = + if (remainingBytes > 0) { + Chunk + .array( + bytesChunk.values, + bytesChunk.offset + bytesChunk.length - remainingBytes, + remainingBytes + ) + } else { + Chunk.empty + } + Pull.pure( + ( + remainingChunk, + inflatedBytesSoFar + inflatedChunk.size, + true + ) + ) + } + } + } + + def pull(bytesWritten: Long): Stream[F, Byte] => Pull[F, Byte, Unit] = in => + in.pull.uncons.flatMap { + case None => Pull.done + case Some((chunk, rest)) => + inflateChunk(chunk.toArraySlice, 0, 0).flatMap { + case ( + remaining @ _, // remaining will be Chunk.empty + chunkBytesWritten, + false // not finished + ) => + pull(bytesWritten + chunkBytesWritten)(rest) + case ( + remaining, + chunkBytesWritten, + true // finished + ) => + if (track) + setTrailerChunk(remaining, bytesWritten + chunkBytesWritten)(rest) + else + Pull.done + } + } + + pull(0)(stream) + } + .stream + + def inflateAndTrailer[F[_]]( + inflateParams: InflateParams, + trailerSize: Int + )(implicit makeChunkInflater: MakeChunkInflater[F], F: Sync[F]): Stream[F, Byte] => Stream[ + F, + (Stream[F, Byte], Ref[F, Chunk[Byte]], Ref[F, Long], Ref[F, Long]) + ] = in => + Stream.suspend { + Stream + .eval( + ( + Ref.of[F, Chunk[Byte]](Chunk.empty), + Ref.of[F, Long](0), + Ref.of[F, Long](0) + ).tupled + ) + .map { case (trailerChunk, bytesWritten, crc32) => + ( + inflateChunks( + inflateParams, + trailerChunk.some, + bytesWritten.some, + crc32.some, + trailerSize + ).apply( + in + ), + trailerChunk, + bytesWritten, + crc32 + ) + } + } + + private def copyAsChunkBytes(values: Array[Byte], length: Int): Chunk[Byte] = + if (length > 0) { + val target = new Array[Byte](length) + System.arraycopy(values, 0, target, 0, length) + Chunk.array(target, 0, length) + } else Chunk.empty[Byte] + +} diff --git a/core/shared/src/main/scala/fs2/compression/internal/MakeChunkInflater.scala b/core/shared/src/main/scala/fs2/compression/internal/MakeChunkInflater.scala new file mode 100644 index 0000000000..4fcc116f61 --- /dev/null +++ b/core/shared/src/main/scala/fs2/compression/internal/MakeChunkInflater.scala @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2.compression.internal + +import fs2.Pull +import fs2.compression.InflateParams + +trait MakeChunkInflater[F[_]] { + + def withChunkInflater( + inflateParams: InflateParams + )( + body: ChunkInflater[F] => Pull[F, Byte, Unit] + ): Pull[F, Byte, Unit] + +} diff --git a/core/shared/src/test/scala/fs2/CompressionSuite.scala b/core/shared/src/test/scala/fs2/CompressionSuite.scala index f326f603f2..4d9c2f7cd4 100644 --- a/core/shared/src/test/scala/fs2/CompressionSuite.scala +++ b/core/shared/src/test/scala/fs2/CompressionSuite.scala @@ -86,151 +86,151 @@ abstract class CompressionSuite(implicit compression: Compression[IO]) extends F ) ) - test("deflate input") { - forAllF { (s: String, level0: Int, strategy0: Int, nowrap: Boolean) => - val level = (level0 % 10).abs - val strategy = Array( - DeflateParams.Strategy.DEFAULT.juzDeflaterStrategy, - DeflateParams.Strategy.FILTERED.juzDeflaterStrategy, - DeflateParams.Strategy.HUFFMAN_ONLY.juzDeflaterStrategy - )( - (strategy0 % 3).abs - ) - val expected = deflateStream(getBytes(s), level, strategy, nowrap).toVector - Stream - .chunk[IO, Byte](Chunk.array(getBytes(s))) - .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) - .through( - Compression[IO].deflate( - DeflateParams( - level = DeflateParams.Level(level), - strategy = DeflateParams.Strategy(strategy), - header = ZLibParams.Header(nowrap) - ) - ) - ) - .compile - .toVector - .assertEquals(expected) - } - } - - test("inflate input") { - forAllF { - ( - s: String, - nowrap: Boolean, - level: DeflateParams.Level, - strategy: DeflateParams.Strategy, - flushMode: DeflateParams.FlushMode - ) => - Stream - .chunk[IO, Byte](Chunk.array(getBytes(s))) - .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) - .through( - Compression[IO].deflate( - DeflateParams( - bufferSize = 32 * 1024, - header = if (nowrap) ZLibParams.Header.GZIP else ZLibParams.Header.ZLIB, - level = level, - strategy = strategy, - flushMode = flushMode - ) - ) - ) - .compile - .to(Array) - .flatMap { deflated => - val expected = inflateStream(deflated, nowrap).toVector - Stream - .chunk[IO, Byte](Chunk.array(deflated)) - .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) - .through(Compression[IO].inflate(InflateParams(header = ZLibParams.Header(nowrap)))) - .compile - .toVector - .assertEquals(expected) - } - } - } - - test("inflate input (deflated larger than inflated)") { - Stream - .chunk[IO, Byte]( - Chunk.array( - getBytes( - "꒔諒ᇂ즆ᰃ遇ኼ㎐만咘똠ᯈ䕍쏮쿻ࣇ㦲䷱瘫椪⫐褽睌쨘꛹騏蕾☦余쒧꺠ܝ猸b뷈埣ꂓ琌ཬ隖㣰忢鐮橀쁚誅렌폓㖅ꋹ켗餪庺Đ懣㫍㫌굦뢲䅦苮Ѣқ闭䮚ū﫣༶漵>껆拦휬콯耙腒䔖돆圹Ⲷ曩ꀌ㒈" - ) - ) - ) - .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) - .through( - Compression[IO].deflate( - DeflateParams( - header = ZLibParams.Header.ZLIB - ) - ) - ) - .compile - .to(Array) - .flatMap { deflated => - val expected = new String(inflateStream(deflated, nowrap = false)) - Stream - .chunk[IO, Byte](Chunk.array(deflated)) - .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) - .through(Compression[IO].inflate(InflateParams(header = ZLibParams.Header(false)))) - .compile - .to(Array) - .map(new String(_)) - .assertEquals(expected) - } - } - - test("deflate |> inflate ~= id") { - forAllF { - ( - s: String, - nowrap: Boolean, - level: DeflateParams.Level, - strategy: DeflateParams.Strategy, - flushMode: DeflateParams.FlushMode - ) => - Stream - .chunk[IO, Byte](Chunk.array(getBytes(s))) - .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) - .through( - Compression[IO].deflate( - DeflateParams( - bufferSize = 32 * 1024, - header = if (nowrap) ZLibParams.Header.GZIP else ZLibParams.Header.ZLIB, - level = level, - strategy = strategy, - flushMode = flushMode - ) - ) - ) - .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) - .through(Compression[IO].inflate(InflateParams(header = ZLibParams.Header(nowrap)))) - .compile - .to(Array) - .map(it => assert(it.sameElements(getBytes(s)))) - } - } - - test("deflate.compresses input") { - val uncompressed = - getBytes("""" - |"A type system is a tractable syntactic method for proving the absence - |of certain program behaviors by classifying phrases according to the - |kinds of values they compute." - |-- Pierce, Benjamin C. (2002). Types and Programming Languages""") - Stream - .chunk[IO, Byte](Chunk.array(uncompressed)) - .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) - .through(Compression[IO].deflate(DeflateParams(level = DeflateParams.Level.NINE))) - .compile - .toVector - .map(compressed => assert(compressed.length < uncompressed.length)) - } +// test("deflate input") { +// forAllF { (s: String, level0: Int, strategy0: Int, nowrap: Boolean) => +// val level = (level0 % 10).abs +// val strategy = Array( +// DeflateParams.Strategy.DEFAULT.juzDeflaterStrategy, +// DeflateParams.Strategy.FILTERED.juzDeflaterStrategy, +// DeflateParams.Strategy.HUFFMAN_ONLY.juzDeflaterStrategy +// )( +// (strategy0 % 3).abs +// ) +// val expected = deflateStream(getBytes(s), level, strategy, nowrap).toVector +// Stream +// .chunk[IO, Byte](Chunk.array(getBytes(s))) +// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) +// .through( +// Compression[IO].deflate( +// DeflateParams( +// level = DeflateParams.Level(level), +// strategy = DeflateParams.Strategy(strategy), +// header = ZLibParams.Header(nowrap) +// ) +// ) +// ) +// .compile +// .toVector +// .assertEquals(expected) +// } +// } +// +// test("inflate input") { +// forAllF { +// ( +// s: String, +// nowrap: Boolean, +// level: DeflateParams.Level, +// strategy: DeflateParams.Strategy, +// flushMode: DeflateParams.FlushMode +// ) => +// Stream +// .chunk[IO, Byte](Chunk.array(getBytes(s))) +// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) +// .through( +// Compression[IO].deflate( +// DeflateParams( +// bufferSize = 32 * 1024, +// header = if (nowrap) ZLibParams.Header.GZIP else ZLibParams.Header.ZLIB, +// level = level, +// strategy = strategy, +// flushMode = flushMode +// ) +// ) +// ) +// .compile +// .to(Array) +// .flatMap { deflated => +// val expected = inflateStream(deflated, nowrap).toVector +// Stream +// .chunk[IO, Byte](Chunk.array(deflated)) +// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) +// .through(Compression[IO].inflate(InflateParams(header = ZLibParams.Header(nowrap)))) +// .compile +// .toVector +// .assertEquals(expected) +// } +// } +// } +// +// test("inflate input (deflated larger than inflated)") { +// Stream +// .chunk[IO, Byte]( +// Chunk.array( +// getBytes( +// "꒔諒ᇂ즆ᰃ遇ኼ㎐만咘똠ᯈ䕍쏮쿻ࣇ㦲䷱瘫椪⫐褽睌쨘꛹騏蕾☦余쒧꺠ܝ猸b뷈埣ꂓ琌ཬ隖㣰忢鐮橀쁚誅렌폓㖅ꋹ켗餪庺Đ懣㫍㫌굦뢲䅦苮Ѣқ闭䮚ū﫣༶漵>껆拦휬콯耙腒䔖돆圹Ⲷ曩ꀌ㒈" +// ) +// ) +// ) +// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) +// .through( +// Compression[IO].deflate( +// DeflateParams( +// header = ZLibParams.Header.ZLIB +// ) +// ) +// ) +// .compile +// .to(Array) +// .flatMap { deflated => +// val expected = new String(inflateStream(deflated, nowrap = false)) +// Stream +// .chunk[IO, Byte](Chunk.array(deflated)) +// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) +// .through(Compression[IO].inflate(InflateParams(header = ZLibParams.Header(false)))) +// .compile +// .to(Array) +// .map(new String(_)) +// .assertEquals(expected) +// } +// } +// +// test("deflate |> inflate ~= id") { +// forAllF { +// ( +// s: String, +// nowrap: Boolean, +// level: DeflateParams.Level, +// strategy: DeflateParams.Strategy, +// flushMode: DeflateParams.FlushMode +// ) => +// Stream +// .chunk[IO, Byte](Chunk.array(getBytes(s))) +// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) +// .through( +// Compression[IO].deflate( +// DeflateParams( +// bufferSize = 32 * 1024, +// header = if (nowrap) ZLibParams.Header.GZIP else ZLibParams.Header.ZLIB, +// level = level, +// strategy = strategy, +// flushMode = flushMode +// ) +// ) +// ) +// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) +// .through(Compression[IO].inflate(InflateParams(header = ZLibParams.Header(nowrap)))) +// .compile +// .to(Array) +// .map(it => assert(it.sameElements(getBytes(s)))) +// } +// } + +// test("deflate.compresses input") { +// val uncompressed = +// getBytes("""" +// |"A type system is a tractable syntactic method for proving the absence +// |of certain program behaviors by classifying phrases according to the +// |kinds of values they compute." +// |-- Pierce, Benjamin C. (2002). Types and Programming Languages""") +// Stream +// .chunk[IO, Byte](Chunk.array(uncompressed)) +// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) +// .through(Compression[IO].deflate(DeflateParams(level = DeflateParams.Level.NINE))) +// .compile +// .toVector +// .map(compressed => assert(compressed.length < uncompressed.length)) +// } test("deflate and inflate are reusable") { val bytesIn: Int = 1024 * 1024 @@ -254,389 +254,389 @@ abstract class CompressionSuite(implicit compression: Compression[IO]) extends F .last } yield assertEquals(first, second) } - - test("deflate |> inflate ~= id") { - forAllF { - ( - s: String, - level: DeflateParams.Level, - strategy: DeflateParams.Strategy, - flushMode: DeflateParams.FlushMode - ) => - Stream - .chunk(Chunk.array(s.getBytes)) - .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) - .through( - Compression[IO].deflate( - DeflateParams( - bufferSize = 8192, - header = ZLibParams.Header.GZIP, - level = level, - strategy = strategy, - flushMode = flushMode - ) - ) - ) - .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) - .through( - Compression[IO].inflate( - InflateParams( - bufferSize = 8192, - header = ZLibParams.Header.GZIP - ) - ) - ) - .compile - .toVector - .map { result => - assertEquals(result, s.getBytes.toVector) - () - } - } - } - - test("empty.gz |> gunzip") { - - val bytes = Array( - 0x1f, 0x8b, 0x08, 0x08, 0x0f, 0x85, 0xc7, 0x61, 0x00, 0x03, 0x65, 0x6d, 0x70, 0x74, 0x79, - 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 - ).map(_.toByte) - val expectedBytes = Array.empty[Byte] - - val expectedFileName = Option(toEncodableFileName("empty")) - Stream - .chunk(Chunk.array(bytes)) - .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) - .through( - Compression[IO].gunzip(8192) - ) - .flatMap { gunzipResult => - assertEquals(gunzipResult.fileName, expectedFileName) - gunzipResult.content - } - .compile - .toVector - .assertEquals(expectedBytes.toVector) - } - - test("hello-compression.gz |> gunzip") { - - val bytes = Array( - 0x1f, 0x8b, 0x08, 0x08, 0x99, 0x8a, 0xc7, 0x61, 0x00, 0x03, 0x68, 0x65, 0x6c, 0x6c, 0x6f, - 0x2d, 0x63, 0x6f, 0x6d, 0x70, 0x72, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x6a, 0x73, - 0x6f, 0x6e, 0x00, 0xab, 0x56, 0xca, 0x48, 0xcd, 0xc9, 0xc9, 0x57, 0xb2, 0x52, 0x4a, 0xce, - 0xcf, 0x2d, 0x28, 0x4a, 0x2d, 0x2e, 0xce, 0xcc, 0xcf, 0x53, 0xaa, 0xe5, 0x02, 0x00, 0x47, - 0x6f, 0xf6, 0xe9, 0x18, 0x00, 0x00, 0x00 - ).map(_.toByte) - val expectedBytes = - """{"hello":"compression"} - |""".stripMargin.getBytes - - val expectedFileName = Option(toEncodableFileName("hello-compression.json")) - Stream - .chunk(Chunk.array(bytes)) - .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) - .through( - Compression[IO].gunzip(8192) - ) - .flatMap { gunzipResult => - assertEquals(gunzipResult.fileName, expectedFileName) - gunzipResult.content - } - .compile - .toVector - .assertEquals(expectedBytes.toVector) - } - - test("gzip |> gunzip ~= id") { - forAllF { - ( - s: String, - level: DeflateParams.Level, - strategy: DeflateParams.Strategy, - flushMode: DeflateParams.FlushMode, - epochSeconds: Int - ) => - val expectedFileName = Option(toEncodableFileName(s)) - val expectedComment = Option(toEncodableComment(s)) - val expectedMTime = Option(FiniteDuration(epochSeconds.toLong, TimeUnit.SECONDS)) - Stream - .chunk(Chunk.array(s.getBytes)) - .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) - .through( - Compression[IO].gzip( - fileName = Some(s), - modificationTime = Some(FiniteDuration(epochSeconds.toLong, TimeUnit.SECONDS)), - comment = Some(s), - DeflateParams( - bufferSize = 8192, - header = ZLibParams.Header.GZIP, - level = level, - strategy = strategy, - flushMode = flushMode - ) - ) - ) - .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) - .through( - Compression[IO].gunzip(8192) - ) - .flatMap { gunzipResult => - assertEquals(gunzipResult.fileName, expectedFileName) - assertEquals(gunzipResult.comment, expectedComment) - if (epochSeconds > 0) assertEquals(gunzipResult.modificationEpochTime, expectedMTime) - gunzipResult.content - } - .compile - .toVector - .assertEquals(s.getBytes.toVector) - } - } - - test("gzip |> gunzip ~= id (mutually prime chunk sizes, compression larger)") { - forAllF { - ( - s: String, - level: DeflateParams.Level, - strategy: DeflateParams.Strategy, - flushMode: DeflateParams.FlushMode, - epochSeconds: Int - ) => - val expectedFileName = Option(toEncodableFileName(s)) - val expectedComment = Option(toEncodableComment(s)) - val expectedMTime = Option(FiniteDuration(epochSeconds.toLong, TimeUnit.SECONDS)) - Stream - .chunk(Chunk.array(s.getBytes)) - .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) - .through( - Compression[IO].gzip( - fileName = Some(s), - modificationTime = Some(FiniteDuration(epochSeconds.toLong, TimeUnit.SECONDS)), - comment = Some(s), - DeflateParams( - bufferSize = 1031, - header = ZLibParams.Header.GZIP, - level = level, - strategy = strategy, - flushMode = flushMode - ) - ) - ) - .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) - .through( - Compression[IO].gunzip(509) - ) - .flatMap { gunzipResult => - assertEquals(gunzipResult.fileName, expectedFileName) - assertEquals(gunzipResult.comment, expectedComment) - if (epochSeconds > 0) assertEquals(gunzipResult.modificationEpochTime, expectedMTime) - gunzipResult.content - } - .compile - .toVector - .assertEquals(s.getBytes.toVector) - } - } - - test("gzip |> gunzip ~= id (mutually prime chunk sizes, decompression larger)") { - forAllF { - ( - s: String, - level: DeflateParams.Level, - strategy: DeflateParams.Strategy, - flushMode: DeflateParams.FlushMode, - epochSeconds: Int - ) => - val expectedFileName = Option(toEncodableFileName(s)) - val expectedComment = Option(toEncodableComment(s)) - val expectedMTime = Option(FiniteDuration(epochSeconds.toLong, TimeUnit.SECONDS)) - Stream - .chunk(Chunk.array(s.getBytes)) - .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) - .through( - Compression[IO].gzip( - fileName = Some(s), - modificationTime = Some(FiniteDuration(epochSeconds.toLong, TimeUnit.SECONDS)), - comment = Some(s), - DeflateParams( - bufferSize = 509, - header = ZLibParams.Header.GZIP, - level = level, - strategy = strategy, - flushMode = flushMode - ) - ) - ) - .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) - .through( - Compression[IO].gunzip(1031) - ) - .flatMap { gunzipResult => - assertEquals(gunzipResult.fileName, expectedFileName) - assertEquals(gunzipResult.comment, expectedComment) - if (epochSeconds > 0) assertEquals(gunzipResult.modificationEpochTime, expectedMTime) - gunzipResult.content - } - .compile - .toVector - .assertEquals(s.getBytes.toVector) - } - } - - test("gzip.compresses input") { - val uncompressed = - getBytes("""" - |"A type system is a tractable syntactic method for proving the absence - |of certain program behaviors by classifying phrases according to the - |kinds of values they compute." - |-- Pierce, Benjamin C. (2002). Types and Programming Languages""") - Stream - .chunk[IO, Byte](Chunk.array(uncompressed)) - .through(Compression[IO].gzip(2048)) - .compile - .toVector - .map(compressed => assert(compressed.length < uncompressed.length)) - } - - test("gzip.compresses input, with FLG.FHCRC set") { - Stream - .chunk[IO, Byte](Chunk.array(getBytes("Foo"))) - .through( - Compression[IO].gzip( - fileName = None, - modificationTime = None, - comment = None, - deflateParams = DeflateParams.apply( - bufferSize = 1024 * 32, - header = ZLibParams.Header.GZIP, - level = DeflateParams.Level.DEFAULT, - strategy = DeflateParams.Strategy.DEFAULT, - flushMode = DeflateParams.FlushMode.DEFAULT, - fhCrcEnabled = true - ) - ) - ) - .compile - .toVector - .map { compressed => - val headerBytes = ByteVector(compressed.take(10)) - val crc32 = crc.crc32(headerBytes.toBitVector).toByteArray - val expectedCrc16 = crc32.reverse.take(2).toVector - val actualCrc16 = compressed.drop(10).take(2) - assertEquals(actualCrc16, expectedCrc16) - } - } - - test("gunzip limit fileName and comment length") { - val longString: String = - Array - .fill(1034 * 1024)("x") - .mkString( - "" - ) // max(classic.fileNameBytesSoftLimit, classic.fileCommentBytesSoftLimit) + 1 - val expectedFileName = Option(toEncodableFileName(longString)) - val expectedComment = Option(toEncodableComment(longString)) - - Stream - .chunk(Chunk.empty[Byte]) - .through(Compression[IO].gzip(8192, fileName = Some(longString), comment = Some(longString))) - .chunkLimit(512) - .unchunks // ensure chunk sizes are less than file name and comment size soft limits - .through(Compression[IO].gunzip(8192)) - .flatMap { gunzipResult => - assert( - gunzipResult.fileName - .map(_.length) - .getOrElse(0) < expectedFileName.map(_.length).getOrElse(0) - ) - assert( - gunzipResult.comment - .map(_.length) - .getOrElse(0) < expectedComment.map(_.length).getOrElse(0) - ) - gunzipResult.content - } - .compile - .last - .assertEquals(None) - } - - test("unix.gzip |> gunzip") { - val expectedContent = "fs2.compress implementing RFC 1952\n" - val expectedFileName = Option(toEncodableFileName("fs2.compress")) - val expectedComment = Option.empty[String] - val expectedMTime = Option(FiniteDuration(1580853602, TimeUnit.SECONDS)) // 2020-02-04T22:00:02Z - val compressed = Array(0x1f, 0x8b, 0x08, 0x08, 0x62, 0xe9, 0x39, 0x5e, 0x00, 0x03, 0x66, 0x73, - 0x32, 0x2e, 0x63, 0x6f, 0x6d, 0x70, 0x72, 0x65, 0x73, 0x73, 0x00, 0x4b, 0x2b, 0x36, 0xd2, - 0x4b, 0xce, 0xcf, 0x2d, 0x28, 0x4a, 0x2d, 0x2e, 0x56, 0xc8, 0xcc, 0x2d, 0xc8, 0x49, 0xcd, - 0x4d, 0xcd, 0x2b, 0xc9, 0xcc, 0x4b, 0x57, 0x08, 0x72, 0x73, 0x56, 0x30, 0xb4, 0x34, 0x35, - 0xe2, 0x02, 0x00, 0x57, 0xb3, 0x5e, 0x6d, 0x23, 0x00, 0x00, 0x00).map(_.toByte) - Stream - .chunk(Chunk.array(compressed)) - .through( - Compression[IO].gunzip() - ) - .flatMap { gunzipResult => - assertEquals(gunzipResult.fileName, expectedFileName) - assertEquals(gunzipResult.comment, expectedComment) - assertEquals(gunzipResult.modificationEpochTime, expectedMTime) - gunzipResult.content - } - .compile - .toVector - .map(vector => new String(vector.toArray, StandardCharsets.US_ASCII)) - .assertEquals(expectedContent) - } - - test("gzip and gunzip are reusable") { - val bytesIn: Int = 1024 * 1024 - val chunkSize = 1024 - val gzipStream = Compression[IO].gzip(bufferSize = chunkSize) - val gunzipStream = Compression[IO].gunzip(bufferSize = chunkSize) - val stream = Stream - .chunk[IO, Byte](Chunk.array(1.to(bytesIn).map(_.toByte).toArray)) - .through(gzipStream) - .through(gunzipStream) - .flatMap(_.content) - for { - first <- - stream - .fold(Vector.empty[Byte]) { case (vector, byte) => vector :+ byte } - .compile - .last - second <- - stream - .fold(Vector.empty[Byte]) { case (vector, byte) => vector :+ byte } - .compile - .last - } yield assertEquals(first, second) - } - - group("maybeGunzip") { - def maybeGunzip[F[_]: Compression](s: Stream[F, Byte]): Stream[F, Byte] = - s.pull - .unconsN(2, allowFewer = true) - .flatMap { - case Some((hd, tl)) => - if (hd == Chunk[Byte](0x1f, 0x8b.toByte)) - Compression[F].gunzip(128)(tl.cons(hd)).flatMap(_.content).pull.echo - else tl.cons(hd).pull.echo - case None => Pull.done - } - .stream - - test("not gzip") { - forAllF { (s: Stream[Pure, Byte]) => - maybeGunzip[IO](s).compile.toList.assertEquals(s.toList) - } - } - - test("gzip") { - forAllF { (s: Stream[Pure, Byte]) => - maybeGunzip[IO](s.through(Compression[IO].gzip())).compile.toList.assertEquals(s.toList) - } - } - } +// +// test("deflate |> inflate ~= id") { +// forAllF { +// ( +// s: String, +// level: DeflateParams.Level, +// strategy: DeflateParams.Strategy, +// flushMode: DeflateParams.FlushMode +// ) => +// Stream +// .chunk(Chunk.array(s.getBytes)) +// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) +// .through( +// Compression[IO].deflate( +// DeflateParams( +// bufferSize = 8192, +// header = ZLibParams.Header.GZIP, +// level = level, +// strategy = strategy, +// flushMode = flushMode +// ) +// ) +// ) +// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) +// .through( +// Compression[IO].inflate( +// InflateParams( +// bufferSize = 8192, +// header = ZLibParams.Header.GZIP +// ) +// ) +// ) +// .compile +// .toVector +// .map { result => +// assertEquals(result, s.getBytes.toVector) +// () +// } +// } +// } +// +// test("empty.gz |> gunzip") { +// +// val bytes = Array( +// 0x1f, 0x8b, 0x08, 0x08, 0x0f, 0x85, 0xc7, 0x61, 0x00, 0x03, 0x65, 0x6d, 0x70, 0x74, 0x79, +// 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 +// ).map(_.toByte) +// val expectedBytes = Array.empty[Byte] +// +// val expectedFileName = Option(toEncodableFileName("empty")) +// Stream +// .chunk(Chunk.array(bytes)) +// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) +// .through( +// Compression[IO].gunzip(8192) +// ) +// .flatMap { gunzipResult => +// assertEquals(gunzipResult.fileName, expectedFileName) +// gunzipResult.content +// } +// .compile +// .toVector +// .assertEquals(expectedBytes.toVector) +// } +// +// test("hello-compression.gz |> gunzip") { +// +// val bytes = Array( +// 0x1f, 0x8b, 0x08, 0x08, 0x99, 0x8a, 0xc7, 0x61, 0x00, 0x03, 0x68, 0x65, 0x6c, 0x6c, 0x6f, +// 0x2d, 0x63, 0x6f, 0x6d, 0x70, 0x72, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x6a, 0x73, +// 0x6f, 0x6e, 0x00, 0xab, 0x56, 0xca, 0x48, 0xcd, 0xc9, 0xc9, 0x57, 0xb2, 0x52, 0x4a, 0xce, +// 0xcf, 0x2d, 0x28, 0x4a, 0x2d, 0x2e, 0xce, 0xcc, 0xcf, 0x53, 0xaa, 0xe5, 0x02, 0x00, 0x47, +// 0x6f, 0xf6, 0xe9, 0x18, 0x00, 0x00, 0x00 +// ).map(_.toByte) +// val expectedBytes = +// """{"hello":"compression"} +// |""".stripMargin.getBytes +// +// val expectedFileName = Option(toEncodableFileName("hello-compression.json")) +// Stream +// .chunk(Chunk.array(bytes)) +// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) +// .through( +// Compression[IO].gunzip(8192) +// ) +// .flatMap { gunzipResult => +// assertEquals(gunzipResult.fileName, expectedFileName) +// gunzipResult.content +// } +// .compile +// .toVector +// .assertEquals(expectedBytes.toVector) +// } +// +// test("gzip |> gunzip ~= id") { +// forAllF { +// ( +// s: String, +// level: DeflateParams.Level, +// strategy: DeflateParams.Strategy, +// flushMode: DeflateParams.FlushMode, +// epochSeconds: Int +// ) => +// val expectedFileName = Option(toEncodableFileName(s)) +// val expectedComment = Option(toEncodableComment(s)) +// val expectedMTime = Option(FiniteDuration(epochSeconds.toLong, TimeUnit.SECONDS)) +// Stream +// .chunk(Chunk.array(s.getBytes)) +// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) +// .through( +// Compression[IO].gzip( +// fileName = Some(s), +// modificationTime = Some(FiniteDuration(epochSeconds.toLong, TimeUnit.SECONDS)), +// comment = Some(s), +// DeflateParams( +// bufferSize = 8192, +// header = ZLibParams.Header.GZIP, +// level = level, +// strategy = strategy, +// flushMode = flushMode +// ) +// ) +// ) +// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) +// .through( +// Compression[IO].gunzip(8192) +// ) +// .flatMap { gunzipResult => +// assertEquals(gunzipResult.fileName, expectedFileName) +// assertEquals(gunzipResult.comment, expectedComment) +// if (epochSeconds > 0) assertEquals(gunzipResult.modificationEpochTime, expectedMTime) +// gunzipResult.content +// } +// .compile +// .toVector +// .assertEquals(s.getBytes.toVector) +// } +// } +// +// test("gzip |> gunzip ~= id (mutually prime chunk sizes, compression larger)") { +// forAllF { +// ( +// s: String, +// level: DeflateParams.Level, +// strategy: DeflateParams.Strategy, +// flushMode: DeflateParams.FlushMode, +// epochSeconds: Int +// ) => +// val expectedFileName = Option(toEncodableFileName(s)) +// val expectedComment = Option(toEncodableComment(s)) +// val expectedMTime = Option(FiniteDuration(epochSeconds.toLong, TimeUnit.SECONDS)) +// Stream +// .chunk(Chunk.array(s.getBytes)) +// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) +// .through( +// Compression[IO].gzip( +// fileName = Some(s), +// modificationTime = Some(FiniteDuration(epochSeconds.toLong, TimeUnit.SECONDS)), +// comment = Some(s), +// DeflateParams( +// bufferSize = 1031, +// header = ZLibParams.Header.GZIP, +// level = level, +// strategy = strategy, +// flushMode = flushMode +// ) +// ) +// ) +// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) +// .through( +// Compression[IO].gunzip(509) +// ) +// .flatMap { gunzipResult => +// assertEquals(gunzipResult.fileName, expectedFileName) +// assertEquals(gunzipResult.comment, expectedComment) +// if (epochSeconds > 0) assertEquals(gunzipResult.modificationEpochTime, expectedMTime) +// gunzipResult.content +// } +// .compile +// .toVector +// .assertEquals(s.getBytes.toVector) +// } +// } +// +// test("gzip |> gunzip ~= id (mutually prime chunk sizes, decompression larger)") { +// forAllF { +// ( +// s: String, +// level: DeflateParams.Level, +// strategy: DeflateParams.Strategy, +// flushMode: DeflateParams.FlushMode, +// epochSeconds: Int +// ) => +// val expectedFileName = Option(toEncodableFileName(s)) +// val expectedComment = Option(toEncodableComment(s)) +// val expectedMTime = Option(FiniteDuration(epochSeconds.toLong, TimeUnit.SECONDS)) +// Stream +// .chunk(Chunk.array(s.getBytes)) +// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) +// .through( +// Compression[IO].gzip( +// fileName = Some(s), +// modificationTime = Some(FiniteDuration(epochSeconds.toLong, TimeUnit.SECONDS)), +// comment = Some(s), +// DeflateParams( +// bufferSize = 509, +// header = ZLibParams.Header.GZIP, +// level = level, +// strategy = strategy, +// flushMode = flushMode +// ) +// ) +// ) +// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) +// .through( +// Compression[IO].gunzip(1031) +// ) +// .flatMap { gunzipResult => +// assertEquals(gunzipResult.fileName, expectedFileName) +// assertEquals(gunzipResult.comment, expectedComment) +// if (epochSeconds > 0) assertEquals(gunzipResult.modificationEpochTime, expectedMTime) +// gunzipResult.content +// } +// .compile +// .toVector +// .assertEquals(s.getBytes.toVector) +// } +// } +// +// test("gzip.compresses input") { +// val uncompressed = +// getBytes("""" +// |"A type system is a tractable syntactic method for proving the absence +// |of certain program behaviors by classifying phrases according to the +// |kinds of values they compute." +// |-- Pierce, Benjamin C. (2002). Types and Programming Languages""") +// Stream +// .chunk[IO, Byte](Chunk.array(uncompressed)) +// .through(Compression[IO].gzip(2048)) +// .compile +// .toVector +// .map(compressed => assert(compressed.length < uncompressed.length)) +// } +// +// test("gzip.compresses input, with FLG.FHCRC set") { +// Stream +// .chunk[IO, Byte](Chunk.array(getBytes("Foo"))) +// .through( +// Compression[IO].gzip( +// fileName = None, +// modificationTime = None, +// comment = None, +// deflateParams = DeflateParams.apply( +// bufferSize = 1024 * 32, +// header = ZLibParams.Header.GZIP, +// level = DeflateParams.Level.DEFAULT, +// strategy = DeflateParams.Strategy.DEFAULT, +// flushMode = DeflateParams.FlushMode.DEFAULT, +// fhCrcEnabled = true +// ) +// ) +// ) +// .compile +// .toVector +// .map { compressed => +// val headerBytes = ByteVector(compressed.take(10)) +// val crc32 = crc.crc32(headerBytes.toBitVector).toByteArray +// val expectedCrc16 = crc32.reverse.take(2).toVector +// val actualCrc16 = compressed.drop(10).take(2) +// assertEquals(actualCrc16, expectedCrc16) +// } +// } +// +// test("gunzip limit fileName and comment length") { +// val longString: String = +// Array +// .fill(1034 * 1024)("x") +// .mkString( +// "" +// ) // max(classic.fileNameBytesSoftLimit, classic.fileCommentBytesSoftLimit) + 1 +// val expectedFileName = Option(toEncodableFileName(longString)) +// val expectedComment = Option(toEncodableComment(longString)) +// +// Stream +// .chunk(Chunk.empty[Byte]) +// .through(Compression[IO].gzip(8192, fileName = Some(longString), comment = Some(longString))) +// .chunkLimit(512) +// .unchunks // ensure chunk sizes are less than file name and comment size soft limits +// .through(Compression[IO].gunzip(8192)) +// .flatMap { gunzipResult => +// assert( +// gunzipResult.fileName +// .map(_.length) +// .getOrElse(0) < expectedFileName.map(_.length).getOrElse(0) +// ) +// assert( +// gunzipResult.comment +// .map(_.length) +// .getOrElse(0) < expectedComment.map(_.length).getOrElse(0) +// ) +// gunzipResult.content +// } +// .compile +// .last +// .assertEquals(None) +// } +// +// test("unix.gzip |> gunzip") { +// val expectedContent = "fs2.compress implementing RFC 1952\n" +// val expectedFileName = Option(toEncodableFileName("fs2.compress")) +// val expectedComment = Option.empty[String] +// val expectedMTime = Option(FiniteDuration(1580853602, TimeUnit.SECONDS)) // 2020-02-04T22:00:02Z +// val compressed = Array(0x1f, 0x8b, 0x08, 0x08, 0x62, 0xe9, 0x39, 0x5e, 0x00, 0x03, 0x66, 0x73, +// 0x32, 0x2e, 0x63, 0x6f, 0x6d, 0x70, 0x72, 0x65, 0x73, 0x73, 0x00, 0x4b, 0x2b, 0x36, 0xd2, +// 0x4b, 0xce, 0xcf, 0x2d, 0x28, 0x4a, 0x2d, 0x2e, 0x56, 0xc8, 0xcc, 0x2d, 0xc8, 0x49, 0xcd, +// 0x4d, 0xcd, 0x2b, 0xc9, 0xcc, 0x4b, 0x57, 0x08, 0x72, 0x73, 0x56, 0x30, 0xb4, 0x34, 0x35, +// 0xe2, 0x02, 0x00, 0x57, 0xb3, 0x5e, 0x6d, 0x23, 0x00, 0x00, 0x00).map(_.toByte) +// Stream +// .chunk(Chunk.array(compressed)) +// .through( +// Compression[IO].gunzip() +// ) +// .flatMap { gunzipResult => +// assertEquals(gunzipResult.fileName, expectedFileName) +// assertEquals(gunzipResult.comment, expectedComment) +// assertEquals(gunzipResult.modificationEpochTime, expectedMTime) +// gunzipResult.content +// } +// .compile +// .toVector +// .map(vector => new String(vector.toArray, StandardCharsets.US_ASCII)) +// .assertEquals(expectedContent) +// } +// +// test("gzip and gunzip are reusable") { +// val bytesIn: Int = 1024 * 1024 +// val chunkSize = 1024 +// val gzipStream = Compression[IO].gzip(bufferSize = chunkSize) +// val gunzipStream = Compression[IO].gunzip(bufferSize = chunkSize) +// val stream = Stream +// .chunk[IO, Byte](Chunk.array(1.to(bytesIn).map(_.toByte).toArray)) +// .through(gzipStream) +// .through(gunzipStream) +// .flatMap(_.content) +// for { +// first <- +// stream +// .fold(Vector.empty[Byte]) { case (vector, byte) => vector :+ byte } +// .compile +// .last +// second <- +// stream +// .fold(Vector.empty[Byte]) { case (vector, byte) => vector :+ byte } +// .compile +// .last +// } yield assertEquals(first, second) +// } +// +// group("maybeGunzip") { +// def maybeGunzip[F[_]: Compression](s: Stream[F, Byte]): Stream[F, Byte] = +// s.pull +// .unconsN(2, allowFewer = true) +// .flatMap { +// case Some((hd, tl)) => +// if (hd == Chunk[Byte](0x1f, 0x8b.toByte)) +// Compression[F].gunzip(128)(tl.cons(hd)).flatMap(_.content).pull.echo +// else tl.cons(hd).pull.echo +// case None => Pull.done +// } +// .stream +// +// test("not gzip") { +// forAllF { (s: Stream[Pure, Byte]) => +// maybeGunzip[IO](s).compile.toList.assertEquals(s.toList) +// } +// } +// +// test("gzip") { +// forAllF { (s: Stream[Pure, Byte]) => +// maybeGunzip[IO](s.through(Compression[IO].gzip())).compile.toList.assertEquals(s.toList) +// } +// } +// } def toEncodableFileName(fileName: String): String = new String( diff --git a/io/js/src/main/scala/fs2/io/compressionplatform.scala b/io/js/src/main/scala/fs2/io/compressionplatform.scala index a04a59867e..dde384be74 100644 --- a/io/js/src/main/scala/fs2/io/compressionplatform.scala +++ b/io/js/src/main/scala/fs2/io/compressionplatform.scala @@ -22,18 +22,128 @@ package fs2 package io -import cats.effect.{Async, Ref} +import cats.effect.Async import cats.syntax.all._ import fs2.compression._ -import fs2.internal.jsdeps.node.zlibMod +import fs2.internal.jsdeps.node.{nodeStrings, zlibMod} import fs2.io.internal.SuspendedStream -import fs2.compression.internal.CrcBuilder -import fs2.compression.internal.CountPipe -import fs2.compression.internal.CrcPipe +import fs2.compression.internal.{ChunkInflater, InflatePipe, MakeChunkInflater} +import fs2.internal.jsdeps.node.bufferMod.global.Buffer +import fs2.internal.jsdeps.node.streamMod.{Duplex, Readable} +import fs2.io.internal.ByteChunkOps._ + +import scala.annotation.nowarn import scala.concurrent.duration.FiniteDuration +import scala.scalajs.js +import scala.scalajs.js.| private[fs2] trait compressionplatform { + private implicit def makeChunkInflaterForAsync[F[_]](implicit + F: Async[F] + ): MakeChunkInflater[F] = new MakeChunkInflater[F] { + + def withChunkInflater( + inflateParams: InflateParams + )( + body: ChunkInflater[F] => Pull[F, Byte, Unit] + ): Pull[F, Byte, Unit] = + body(chunkInflater(inflateParams)) + + private def chunkInflater[F[_]]( + inflateParams: InflateParams + )(implicit F: Async[F]): ChunkInflater[F] = { + val options = zlibMod + .ZlibOptions() + .setChunkSize(inflateParams.bufferSizeOrMinimum.toDouble) + + val writable = (inflateParams.header match { + case ZLibParams.Header.GZIP => zlibMod.createInflateRaw(options) + case ZLibParams.Header.ZLIB => zlibMod.createInflate(options) + }).asInstanceOf[Duplex] + val readable = writable.asInstanceOf[Readable] + val inflate = writable.asInstanceOf[zlibMod.Zlib] + + new ChunkInflater[F] { + def inflateChunk( + bytesChunk: Chunk[Byte] + ): Pull[ + F, + INothing, + (Chunk[Byte], Int, Boolean) + ] = // (inflatedBuffer, inflatedBytes, remainingBytes, finished) + Pull.eval { + F.async_[(Chunk[Byte], Int, Boolean)] { cb => + println() + println() + println(s"got chunk to inflate: ${bytesChunk.size} bytes") + readable.read() match { + case null => + println(s" read before write: null") + + val writtenBefore = inflate.bytesWritten.toLong + val buffer = bytesChunk.toUint8Array + + def tryRead(finished: Boolean) = { + val writtenNow = inflate.bytesWritten.toLong + val bytesWriten = writtenNow - writtenBefore + println(s" bytes written: ${bytesWriten}") + val bytesRemaining = bytesChunk.size - bytesWriten + println(s" bytes remaining: $bytesRemaining bytes") + val out = readable.read() match { + case null => + println(s" read null") + (Chunk.empty[Byte], bytesRemaining.toInt, finished) + case notNull => + val buffer = notNull.asInstanceOf[Buffer] + val chunk = buffer.toChunk + println(s" read buffer: ${chunk.size} bytes") + (chunk, bytesRemaining.toInt, finished) + } + cb(out.asRight[Throwable]) + } + + val onError: js.Function1[Any, Unit] = e => println(s"readable.error: ${e}") + val onEnd: js.Function1[Any, Unit] = _ => { + println(s"!!! readable.end") + tryRead(true) + } + + val onReadable: js.Function1[Any, Unit] = _ => { + println(s"!!! readable.readable") + readable.off("error", onError) + readable.off("end", onEnd) + tryRead(false) + } + + readable.once("error", onError) + readable.once("end", onEnd) + readable.once("readable", onReadable) + + val written = writable.write( + buffer, + (e: js.UndefOr[js.Error | Null]) => println(s"callback: $e") + // cb( + // e.toLeft { + // + // }.leftMap(js.JavaScriptException) + // ) + ) + println(s"written: $written") + + case notNull => + val buffer = notNull.asInstanceOf[Buffer] + val chunk = buffer.toChunk + println(s" read buffer before write: ${chunk.size} bytes") + cb((chunk, bytesChunk.size, false).asRight) + } + + } + } + } + } + } + implicit def fs2ioCompressionForAsync[F[_]](implicit F: Async[F]): Compression[F] = new Compression.UnsealedCompression[F] { @@ -52,7 +162,7 @@ private[fs2] trait compressionplatform { (deflateParams.header match { case ZLibParams.Header.GZIP => zlibMod.createDeflateRaw(options) case ZLibParams.Header.ZLIB => zlibMod.createDeflate(options) - }).asInstanceOf[Duplex] + }).asInstanceOf[fs2.io.Duplex] }) .flatMap { case (deflate, out) => out @@ -63,89 +173,8 @@ private[fs2] trait compressionplatform { } } - override def inflate(inflateParams: InflateParams): Pipe[F, Byte, Byte] = in => - inflateAndTrailer(inflateParams, 0)(in).flatMap(_._1) - - private def inflateAndTrailer( - inflateParams: InflateParams, - trailerSize: Int - ): Stream[F, Byte] => Stream[ - F, - (Stream[F, Byte], Ref[F, Chunk[Byte]], Ref[F, Long], Ref[F, Long]) - ] = in => { - val options = zlibMod - .ZlibOptions() - .setChunkSize(inflateParams.bufferSizeOrMinimum.toDouble) - - ( - Stream.resource(suspendReadableAndRead() { - (inflateParams.header match { - case ZLibParams.Header.GZIP => zlibMod.createInflateRaw(options) - case ZLibParams.Header.ZLIB => zlibMod.createInflate(options) - }).asInstanceOf[Duplex] - }), - Stream.resource(SuspendedStream(in)), - Stream.eval(Ref.of[F, Chunk[Byte]](Chunk.empty)), - Stream.eval(Ref.of[F, Long](0)), - Stream.eval(Ref.of[F, Long](0)), - Stream.eval(Ref.of[F, Long](0)), - Stream.eval(Ref.of[F, Chunk[Byte]](Chunk.empty)) - ).tupled.map { - case ( - (inflate, out), - suspendedIn, - lastChunk, - bytesPiped, - bytesWritten, - crc32, - trailerChunk - ) => - val trackedStream = - suspendedIn.stream.chunks.evalTap { chunk => - bytesPiped.update(_ + chunk.size) >> lastChunk.set(chunk) - }.unchunks - - def onBytesWritten(bytesWritten: Long): F[Unit] = - (bytesPiped.get, lastChunk.get).tupled.flatMap { case (bytesPiped, lastChunk) => - val bytesAvailable = bytesPiped - bytesWritten - val headTrailerBytes = lastChunk.takeRight(bytesAvailable.toInt) - val bytesToPull = trailerSize - headTrailerBytes.size - val wholeTrailer = if (bytesToPull > 0) { - suspendedIn.stream - .take(bytesToPull.toLong) - .chunkAll - .compile - .lastOrError - .map(remainingBytes => headTrailerBytes ++ remainingBytes) - } else { - headTrailerBytes.take(trailerSize).pure[F] - } - (wholeTrailer >>= trailerChunk.set).void - } - - val crcBuilder = new CrcBuilder - val inflated = out - .concurrently( - trackedStream - .through(writeWritable[F](inflate.asInstanceOf[Writable].pure)) - ) - .onFinalize { - F.delay( - inflate.asInstanceOf[zlibMod.Zlib].bytesWritten.toLong - ).flatMap(onBytesWritten) >> - F.async_[Unit] { cb => - inflate.asInstanceOf[zlibMod.Zlib].close(() => cb(Right(()))) - } - } - .through(CountPipe(bytesWritten)) - .through(CrcPipe(crcBuilder)) ++ - Stream - .eval(crc32.set(crcBuilder.getValue)) - .flatMap(_ => Stream.empty) - - (inflated, trailerChunk, bytesWritten, crc32) - } - } + override def inflate(inflateParams: InflateParams): Pipe[F, Byte, Byte] = + InflatePipe.inflateChunks(inflateParams, none, none, none, trailerSize = 0) def gzip( fileName: Option[String], @@ -163,7 +192,10 @@ private[fs2] trait compressionplatform { ) def gunzip(inflateParams: InflateParams): Stream[F, Byte] => Stream[F, GunzipResult[F]] = - gzip.gunzip(inflateAndTrailer(inflateParams, gzip.gzipTrailerBytes), inflateParams) + gzip.gunzip( + InflatePipe.inflateAndTrailer(inflateParams, gzip.gzipTrailerBytes), + inflateParams + ) } } From 808596e0ed393be483fa34f23941cac15488d543 Mon Sep 17 00:00:00 2001 From: Iurii Malchenko Date: Sun, 23 Jan 2022 01:10:42 +0200 Subject: [PATCH 2/6] remove cruft --- .../src/main/scala/fs2/compression/internal/InflatePipe.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/compression/internal/InflatePipe.scala b/core/shared/src/main/scala/fs2/compression/internal/InflatePipe.scala index 965b665a50..d437b42b53 100644 --- a/core/shared/src/main/scala/fs2/compression/internal/InflatePipe.scala +++ b/core/shared/src/main/scala/fs2/compression/internal/InflatePipe.scala @@ -78,7 +78,7 @@ object InflatePipe { // println(s"remainingBytes: $remainingBytes") // println(s"finished: $finished") if (track) crcBuilder.update(inflatedChunk) - Chunk.empty.takeRight(2) + Pull.output(inflatedChunk) >> { if (!finished) { if (remainingBytes > 0) { From 4f0d46baaf8be99660aa019d09f224ba9fd934ad Mon Sep 17 00:00:00 2001 From: Iurii Malchenko Date: Mon, 24 Jan 2022 14:47:06 +0200 Subject: [PATCH 3/6] ChunkInflater - return the array not chunk --- .../main/scala/fs2/compression/CompressionPlatform.scala | 5 +++-- .../main/scala/fs2/compression/internal/ChunkInflater.scala | 4 ++-- .../main/scala/fs2/compression/internal/InflatePipe.scala | 6 +++--- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/core/jvm/src/main/scala/fs2/compression/CompressionPlatform.scala b/core/jvm/src/main/scala/fs2/compression/CompressionPlatform.scala index 72bbe6482e..a971d4183c 100644 --- a/core/jvm/src/main/scala/fs2/compression/CompressionPlatform.scala +++ b/core/jvm/src/main/scala/fs2/compression/CompressionPlatform.scala @@ -151,7 +151,7 @@ private[compression] trait CompressionCompanionPlatform { def inflateChunk( bytesChunk: Chunk.ArraySlice[Byte], offset: Int - ): Pull[F, INothing, (Chunk[Byte], Int, Boolean)] = { + ): Pull[F, INothing, (Array[Byte], Int, Int, Boolean)] = { inflater.setInput( bytesChunk.values, bytesChunk.offset + offset, @@ -160,7 +160,8 @@ private[compression] trait CompressionCompanionPlatform { val inflatedBytes = inflater.inflate(inflatedBuffer) Pull.pure( ( - copyAsChunkBytes(inflatedBuffer, inflatedBytes), + inflatedBuffer, + inflatedBytes, inflater.getRemaining, inflater.finished() ) diff --git a/core/shared/src/main/scala/fs2/compression/internal/ChunkInflater.scala b/core/shared/src/main/scala/fs2/compression/internal/ChunkInflater.scala index ec0780855a..9e3224fe56 100644 --- a/core/shared/src/main/scala/fs2/compression/internal/ChunkInflater.scala +++ b/core/shared/src/main/scala/fs2/compression/internal/ChunkInflater.scala @@ -27,11 +27,11 @@ trait ChunkInflater[F[_]] { /** @param bytesChunk bytes to inflate * @param offset offset - * @return (inflatedChunk, remainingBytes, finished) + * @return (inflatedArray, inflatedLength, remainingBytes, finished) */ def inflateChunk( bytesChunk: Chunk.ArraySlice[Byte], offset: Int - ): Pull[F, INothing, (Chunk[Byte], Int, Boolean)] + ): Pull[F, INothing, (Array[Byte], Int, Int, Boolean)] } diff --git a/core/shared/src/main/scala/fs2/compression/internal/InflatePipe.scala b/core/shared/src/main/scala/fs2/compression/internal/InflatePipe.scala index d437b42b53..6577827969 100644 --- a/core/shared/src/main/scala/fs2/compression/internal/InflatePipe.scala +++ b/core/shared/src/main/scala/fs2/compression/internal/InflatePipe.scala @@ -73,12 +73,12 @@ object InflatePipe { inflatedBytesSoFar: Long ): Pull[F, Byte, (Chunk[Byte], Long, Boolean)] = inflater.inflateChunk(bytesChunk, offset).flatMap { - case (inflatedChunk, remainingBytes, finished) => + case (inflatedArray, inflatedLength, remainingBytes, finished) => // println(s"inflatedBytes: ${inflatedChunk.size}") // println(s"remainingBytes: $remainingBytes") // println(s"finished: $finished") - if (track) crcBuilder.update(inflatedChunk) - + if (track) crcBuilder.update(inflatedArray, 0, inflatedLength) + val inflatedChunk = copyAsChunkBytes(inflatedArray, inflatedLength) Pull.output(inflatedChunk) >> { if (!finished) { if (remainingBytes > 0) { From dd4251d49d796cd6063c9a2ea707ba98db41b812 Mon Sep 17 00:00:00 2001 From: Iurii Malchenko Date: Mon, 24 Jan 2022 20:45:18 +0200 Subject: [PATCH 4/6] JS implementation --- .../fs2/compression/CompressionPlatform.scala | 25 +- .../src/main/scala/fs2/compression/Gzip.scala | 6 +- .../compression/internal/ChunkInflater.scala | 7 +- .../compression/internal/InflatePipe.scala | 90 +++++-- .../src/test/scala/fs2/CompressionSuite.scala | 102 ++++---- .../scala/fs2/io/compressionplatform.scala | 220 +++++++++++------- 6 files changed, 290 insertions(+), 160 deletions(-) diff --git a/core/jvm/src/main/scala/fs2/compression/CompressionPlatform.scala b/core/jvm/src/main/scala/fs2/compression/CompressionPlatform.scala index a971d4183c..41e5e21a46 100644 --- a/core/jvm/src/main/scala/fs2/compression/CompressionPlatform.scala +++ b/core/jvm/src/main/scala/fs2/compression/CompressionPlatform.scala @@ -137,7 +137,11 @@ private[compression] trait CompressionCompanionPlatform { ): Pull[F, Byte, Unit] = Pull .bracketCase[F, Byte, Inflater, Unit]( - Pull.eval(F.delay(new Inflater(inflateParams.header.juzDeflaterNoWrap))), + Pull.eval(F.delay { +// println(s"-" * 60) +// println("creating new inflater") + new Inflater(inflateParams.header.juzDeflaterNoWrap) + }), inflater => body(chunkInflater(inflateParams, inflater)), (inflater, _) => Pull.eval(F.delay(inflater.end())) ) @@ -148,21 +152,28 @@ private[compression] trait CompressionCompanionPlatform { ): ChunkInflater[F] = { val inflatedBuffer = new Array[Byte](inflateParams.bufferSizeOrMinimum) new ChunkInflater[F] { + def end: Pull[F, INothing, Unit] = Pull.pure { + inflater.end() + } + def inflateChunk( - bytesChunk: Chunk.ArraySlice[Byte], - offset: Int - ): Pull[F, INothing, (Array[Byte], Int, Int, Boolean)] = { + bytesChunk: Chunk.ArraySlice[Byte] + ): Pull[F, INothing, (Array[Byte], Int, Chunk.ArraySlice[Byte], Boolean)] = { inflater.setInput( bytesChunk.values, - bytesChunk.offset + offset, - bytesChunk.length - offset + bytesChunk.offset, + bytesChunk.length ) val inflatedBytes = inflater.inflate(inflatedBuffer) + val remaining = inflater.getRemaining Pull.pure( ( inflatedBuffer, inflatedBytes, - inflater.getRemaining, + bytesChunk.copy( + offset = bytesChunk.length - remaining, + length = remaining + ), inflater.finished() ) ) diff --git a/core/shared/src/main/scala/fs2/compression/Gzip.scala b/core/shared/src/main/scala/fs2/compression/Gzip.scala index a40e81a07b..2f0bce22ae 100644 --- a/core/shared/src/main/scala/fs2/compression/Gzip.scala +++ b/core/shared/src/main/scala/fs2/compression/Gzip.scala @@ -518,7 +518,11 @@ class Gzip[F[_]](implicit F: Sync[F]) { ) ) } else if (expectedInputSize != actualInputSize) { - F.raiseError[Unit](new ZipException("Content failed size validation")) + F.raiseError[Unit]( + new ZipException( + s"Content failed size validation: expectedInputSize $expectedInputSize != actualInputSize $actualInputSize" + ) + ) } else { F.unit } diff --git a/core/shared/src/main/scala/fs2/compression/internal/ChunkInflater.scala b/core/shared/src/main/scala/fs2/compression/internal/ChunkInflater.scala index 9e3224fe56..5054ad16ff 100644 --- a/core/shared/src/main/scala/fs2/compression/internal/ChunkInflater.scala +++ b/core/shared/src/main/scala/fs2/compression/internal/ChunkInflater.scala @@ -25,13 +25,14 @@ package internal trait ChunkInflater[F[_]] { + def end: Pull[F, INothing, Unit] + /** @param bytesChunk bytes to inflate * @param offset offset * @return (inflatedArray, inflatedLength, remainingBytes, finished) */ def inflateChunk( - bytesChunk: Chunk.ArraySlice[Byte], - offset: Int - ): Pull[F, INothing, (Array[Byte], Int, Int, Boolean)] + bytesChunk: Chunk.ArraySlice[Byte] + ): Pull[F, INothing, (Array[Byte], Int, Chunk.ArraySlice[Byte], Boolean)] } diff --git a/core/shared/src/main/scala/fs2/compression/internal/InflatePipe.scala b/core/shared/src/main/scala/fs2/compression/internal/InflatePipe.scala index 6577827969..f86550329d 100644 --- a/core/shared/src/main/scala/fs2/compression/internal/InflatePipe.scala +++ b/core/shared/src/main/scala/fs2/compression/internal/InflatePipe.scala @@ -28,6 +28,8 @@ import fs2.{Chunk, INothing, Pull, Stream} object InflatePipe { + private val emptySlice = Chunk.ArraySlice(Array.empty[Byte], 0, 0) + def inflateChunks[F[_]]( inflateParams: InflateParams, trailerChunkRef: Option[Ref[F, Chunk[Byte]]], @@ -69,42 +71,29 @@ object InflatePipe { def inflateChunk( bytesChunk: Chunk.ArraySlice[Byte], - offset: Int, inflatedBytesSoFar: Long ): Pull[F, Byte, (Chunk[Byte], Long, Boolean)] = - inflater.inflateChunk(bytesChunk, offset).flatMap { + inflater.inflateChunk(bytesChunk).flatMap { case (inflatedArray, inflatedLength, remainingBytes, finished) => -// println(s"inflatedBytes: ${inflatedChunk.size}") -// println(s"remainingBytes: $remainingBytes") +// println(s"inflatedBytes: ${inflatedLength}") +// println(s"remainingBytes: ${remainingBytes.length}") // println(s"finished: $finished") if (track) crcBuilder.update(inflatedArray, 0, inflatedLength) val inflatedChunk = copyAsChunkBytes(inflatedArray, inflatedLength) Pull.output(inflatedChunk) >> { if (!finished) { - if (remainingBytes > 0) { + if (remainingBytes.nonEmpty) { inflateChunk( - bytesChunk, - bytesChunk.length - remainingBytes, + remainingBytes, inflatedBytesSoFar + inflatedChunk.size ) } else { Pull.pure((Chunk.empty, inflatedBytesSoFar + inflatedChunk.size, false)) } } else { - val remainingChunk = - if (remainingBytes > 0) { - Chunk - .array( - bytesChunk.values, - bytesChunk.offset + bytesChunk.length - remainingBytes, - remainingBytes - ) - } else { - Chunk.empty - } Pull.pure( ( - remainingChunk, + remainingBytes, inflatedBytesSoFar + inflatedChunk.size, true ) @@ -113,24 +102,77 @@ object InflatePipe { } } - def pull(bytesWritten: Long): Stream[F, Byte] => Pull[F, Byte, Unit] = in => + def drain(inflatedBytesSoFar: Long): Pull[F, Byte, (Chunk[Byte], Long, Boolean)] = +// println(s"[drain]... inflatedBytesSoFar: $inflatedBytesSoFar") + + inflater.inflateChunk(emptySlice).flatMap { + case (inflatedArray, inflatedLength, remainingBytes, finished) => +// println(s"[drain] inflatedBytes: ${inflatedLength}") +// println(s"[drain] remainingBytes: $remainingBytes") +// println(s"[drain] finished: $finished") + val p = + if (inflatedLength == 0) { + Pull.done + } else { + if (track) crcBuilder.update(inflatedArray, 0, inflatedLength) + val inflatedChunk = copyAsChunkBytes(inflatedArray, inflatedLength) + Pull.output(inflatedChunk) + } + + p >> { + if (!finished) { + drain( + inflatedBytesSoFar + inflatedLength + ) + } else { + Pull.pure( + ( + remainingBytes, + inflatedBytesSoFar + inflatedLength, + true + ) + ) + } + } + + } + + def pull(inflatedBytesSoFar: Long): Stream[F, Byte] => Pull[F, Byte, Unit] = in => in.pull.uncons.flatMap { - case None => Pull.done + case None => + inflater.end >> + drain(inflatedBytesSoFar).flatMap { + case ( + _, + _, + false // not finished + ) => + Pull.raiseError(new RuntimeException("drain did not ")) + case ( + remaining, + inflatedBytesTotal, + true // finished + ) => + if (track) + setTrailerChunk(remaining, inflatedBytesTotal)(Stream.empty) + else + Pull.done + } case Some((chunk, rest)) => - inflateChunk(chunk.toArraySlice, 0, 0).flatMap { + inflateChunk(chunk.toArraySlice, 0).flatMap { case ( remaining @ _, // remaining will be Chunk.empty chunkBytesWritten, false // not finished ) => - pull(bytesWritten + chunkBytesWritten)(rest) + pull(inflatedBytesSoFar + chunkBytesWritten)(rest) case ( remaining, chunkBytesWritten, true // finished ) => if (track) - setTrailerChunk(remaining, bytesWritten + chunkBytesWritten)(rest) + setTrailerChunk(remaining, inflatedBytesSoFar + chunkBytesWritten)(rest) else Pull.done } diff --git a/core/shared/src/test/scala/fs2/CompressionSuite.scala b/core/shared/src/test/scala/fs2/CompressionSuite.scala index 4d9c2f7cd4..db6ca2068d 100644 --- a/core/shared/src/test/scala/fs2/CompressionSuite.scala +++ b/core/shared/src/test/scala/fs2/CompressionSuite.scala @@ -215,7 +215,7 @@ abstract class CompressionSuite(implicit compression: Compression[IO]) extends F // .map(it => assert(it.sameElements(getBytes(s)))) // } // } - +// // test("deflate.compresses input") { // val uncompressed = // getBytes("""" @@ -231,29 +231,29 @@ abstract class CompressionSuite(implicit compression: Compression[IO]) extends F // .toVector // .map(compressed => assert(compressed.length < uncompressed.length)) // } - - test("deflate and inflate are reusable") { - val bytesIn: Int = 1024 * 1024 - val chunkSize = 1024 - val deflater = Compression[IO].deflate(DeflateParams(bufferSize = chunkSize)) - val inflater = Compression[IO].inflate(InflateParams(bufferSize = chunkSize)) - val stream = Stream - .chunk[IO, Byte](Chunk.array(1.to(bytesIn).map(_.toByte).toArray)) - .through(deflater) - .through(inflater) - for { - first <- - stream - .fold(Vector.empty[Byte]) { case (vector, byte) => vector :+ byte } - .compile - .last - second <- - stream - .fold(Vector.empty[Byte]) { case (vector, byte) => vector :+ byte } - .compile - .last - } yield assertEquals(first, second) - } +// +// test("deflate and inflate are reusable") { +// val bytesIn: Int = 1024 * 1024 +// val chunkSize = 1024 +// val deflater = Compression[IO].deflate(DeflateParams(bufferSize = chunkSize)) +// val inflater = Compression[IO].inflate(InflateParams(bufferSize = chunkSize)) +// val stream = Stream +// .chunk[IO, Byte](Chunk.array(1.to(bytesIn).map(_.toByte).toArray)) +// .through(deflater) +// .through(inflater) +// for { +// first <- +// stream +// .fold(Vector.empty[Byte]) { case (vector, byte) => vector :+ byte } +// .compile +// .last +// second <- +// stream +// .fold(Vector.empty[Byte]) { case (vector, byte) => vector :+ byte } +// .compile +// .last +// } yield assertEquals(first, second) +// } // // test("deflate |> inflate ~= id") { // forAllF { @@ -393,6 +393,8 @@ abstract class CompressionSuite(implicit compression: Compression[IO]) extends F // } // } // +// override def scalaCheckInitialSeed = "Xf5NPB2jQW5-wZXcdI9z80NtXBKC-JhVM2tacQEBvOD=" +// // test("gzip |> gunzip ~= id (mutually prime chunk sizes, compression larger)") { // forAllF { // ( @@ -402,6 +404,12 @@ abstract class CompressionSuite(implicit compression: Compression[IO]) extends F // flushMode: DeflateParams.FlushMode, // epochSeconds: Int // ) => +//// println("-" * 60) +//// println(s"test data: ${Chunk.array(s.getBytes).size} bytes") +//// println(s"test level: $level") +//// println(s"test strategy: $strategy") +//// println(s"test flushMode: $flushMode") +//// println(s"test epochSeconds: $epochSeconds") // val expectedFileName = Option(toEncodableFileName(s)) // val expectedComment = Option(toEncodableComment(s)) // val expectedMTime = Option(FiniteDuration(epochSeconds.toLong, TimeUnit.SECONDS)) @@ -588,29 +596,29 @@ abstract class CompressionSuite(implicit compression: Compression[IO]) extends F // .assertEquals(expectedContent) // } // -// test("gzip and gunzip are reusable") { -// val bytesIn: Int = 1024 * 1024 -// val chunkSize = 1024 -// val gzipStream = Compression[IO].gzip(bufferSize = chunkSize) -// val gunzipStream = Compression[IO].gunzip(bufferSize = chunkSize) -// val stream = Stream -// .chunk[IO, Byte](Chunk.array(1.to(bytesIn).map(_.toByte).toArray)) -// .through(gzipStream) -// .through(gunzipStream) -// .flatMap(_.content) -// for { -// first <- -// stream -// .fold(Vector.empty[Byte]) { case (vector, byte) => vector :+ byte } -// .compile -// .last -// second <- -// stream -// .fold(Vector.empty[Byte]) { case (vector, byte) => vector :+ byte } -// .compile -// .last -// } yield assertEquals(first, second) -// } + test("gzip and gunzip are reusable") { + val bytesIn: Int = 1024 * 1024 + val chunkSize = 1024 + val gzipStream = Compression[IO].gzip(bufferSize = chunkSize) + val gunzipStream = Compression[IO].gunzip(bufferSize = chunkSize) + val stream = Stream + .chunk[IO, Byte](Chunk.array(1.to(bytesIn).map(_.toByte).toArray)) + .through(gzipStream) + .through(gunzipStream) + .flatMap(_.content) + for { + first <- + stream + .fold(Vector.empty[Byte]) { case (vector, byte) => vector :+ byte } + .compile + .last + second <- + stream + .fold(Vector.empty[Byte]) { case (vector, byte) => vector :+ byte } + .compile + .last + } yield assertEquals(first, second) + } // // group("maybeGunzip") { // def maybeGunzip[F[_]: Compression](s: Stream[F, Byte]): Stream[F, Byte] = diff --git a/io/js/src/main/scala/fs2/io/compressionplatform.scala b/io/js/src/main/scala/fs2/io/compressionplatform.scala index dde384be74..5623ce0638 100644 --- a/io/js/src/main/scala/fs2/io/compressionplatform.scala +++ b/io/js/src/main/scala/fs2/io/compressionplatform.scala @@ -29,13 +29,13 @@ import fs2.internal.jsdeps.node.{nodeStrings, zlibMod} import fs2.io.internal.SuspendedStream import fs2.compression.internal.{ChunkInflater, InflatePipe, MakeChunkInflater} import fs2.internal.jsdeps.node.bufferMod.global.Buffer -import fs2.internal.jsdeps.node.streamMod.{Duplex, Readable} +import fs2.internal.jsdeps.node.streamMod.{Duplex, Readable, Writable} import fs2.io.internal.ByteChunkOps._ import scala.annotation.nowarn import scala.concurrent.duration.FiniteDuration import scala.scalajs.js -import scala.scalajs.js.| +import scala.scalajs.js.{JavaScriptException, |} private[fs2] trait compressionplatform { @@ -48,97 +48,161 @@ private[fs2] trait compressionplatform { )( body: ChunkInflater[F] => Pull[F, Byte, Unit] ): Pull[F, Byte, Unit] = - body(chunkInflater(inflateParams)) + Pull.bracketCase[F, Byte, (Duplex, Readable, zlibMod.Zlib), Unit]( + Pull.pure { + val options = zlibMod + .ZlibOptions() + .setChunkSize(inflateParams.bufferSizeOrMinimum.toDouble) + +// fs2.internal.jsdeps.node.nodeConsoleMod.global.console.log( +// "options", +// options +// ) + + val writable = (inflateParams.header match { + case ZLibParams.Header.GZIP => zlibMod.createInflateRaw(options) + case ZLibParams.Header.ZLIB => zlibMod.createInflate(options) + }).asInstanceOf[Duplex] + val readable = writable.asInstanceOf[Readable] + val inflate = writable.asInstanceOf[zlibMod.Zlib] + (writable, readable, inflate) + }, + { case (writable, readable, inflate) => body(chunkInflater(writable, readable, inflate)) }, + (r, _) => Pull.pure(r._3.close()) + ) + + private val emptySlice = Chunk.ArraySlice(Array.empty[Byte], 0, 0) private def chunkInflater[F[_]]( - inflateParams: InflateParams + writable: Duplex, + readable: Readable, + inflate: zlibMod.Zlib )(implicit F: Async[F]): ChunkInflater[F] = { - val options = zlibMod - .ZlibOptions() - .setChunkSize(inflateParams.bufferSizeOrMinimum.toDouble) + var error: Option[js.Error] = None + var ended: Boolean = false + val print = true - val writable = (inflateParams.header match { - case ZLibParams.Header.GZIP => zlibMod.createInflateRaw(options) - case ZLibParams.Header.ZLIB => zlibMod.createInflate(options) - }).asInstanceOf[Duplex] - val readable = writable.asInstanceOf[Readable] - val inflate = writable.asInstanceOf[zlibMod.Zlib] + val onError: js.Function1[Any, Unit] = { e => + if (print) println(s" . readable.error: ${e}") + error = e.asInstanceOf[js.Error].some + } + val onEnd: js.Function1[Any, Unit] = { _ => + if (print) println(s" . readable.end") + ended = true + } + val onReadable: js.Function1[Any, Unit] = { _ => + if (print) println(s" . readable.readable") + } + + readable.on("error", onError) + readable.on("end", onEnd) + readable.on("readable", onReadable) + + var bytesSent = 0 +// var writtenBefore = 0L + + var latestChunks: Seq[Chunk[Byte]] = Seq.empty + + def chunkSent(chunk: Chunk[Byte]): Unit = { + bytesSent = bytesSent + chunk.size + if (print) println(s" bytes sent: (+${chunk.size}) $bytesSent") + val bytesWritten = inflate.bytesWritten.toLong + if (print) println(s" bytes written: $bytesWritten") + val bytesToKeep = bytesSent - bytesWritten + if (print) println(s" bytes to keep: $bytesToKeep") + if (bytesToKeep <= chunk.size) { + latestChunks = Seq(chunk) + } else { + latestChunks = latestChunks.inits.toSeq + .findLast(init => init.map(_.size).sum >= bytesToKeep - chunk.size) + .getOrElse(Seq.empty) :+ chunk + } + if (print) println(s" keeping chunks: ${latestChunks.size}") + } + + def remainingChunk(lastChunk: Chunk.ArraySlice[Byte]): Chunk.ArraySlice[Byte] = { + val bytesWritten = inflate.bytesWritten.toLong + if (print) println(s" [remaining] bytes written: $bytesWritten") + val bytesToKeep = bytesSent - bytesWritten + if (print) println(s" [remaining] bytes to keep: $bytesToKeep") + Chunk.concat(latestChunks :+ lastChunk).takeRight(bytesToKeep.toInt).toArraySlice + } new ChunkInflater[F] { + def end: Pull[F, INothing, Unit] = Pull.pure { + if (print) println(s"got end") + writable.end() + } + def inflateChunk( - bytesChunk: Chunk[Byte] + bytesChunk: Chunk.ArraySlice[Byte] ): Pull[ F, INothing, - (Chunk[Byte], Int, Boolean) - ] = // (inflatedBuffer, inflatedBytes, remainingBytes, finished) - Pull.eval { - F.async_[(Chunk[Byte], Int, Boolean)] { cb => - println() - println() - println(s"got chunk to inflate: ${bytesChunk.size} bytes") - readable.read() match { - case null => - println(s" read before write: null") - - val writtenBefore = inflate.bytesWritten.toLong - val buffer = bytesChunk.toUint8Array - - def tryRead(finished: Boolean) = { - val writtenNow = inflate.bytesWritten.toLong - val bytesWriten = writtenNow - writtenBefore - println(s" bytes written: ${bytesWriten}") - val bytesRemaining = bytesChunk.size - bytesWriten - println(s" bytes remaining: $bytesRemaining bytes") - val out = readable.read() match { - case null => - println(s" read null") - (Chunk.empty[Byte], bytesRemaining.toInt, finished) - case notNull => - val buffer = notNull.asInstanceOf[Buffer] - val chunk = buffer.toChunk - println(s" read buffer: ${chunk.size} bytes") - (chunk, bytesRemaining.toInt, finished) - } - cb(out.asRight[Throwable]) - } + ( + Array[Byte], + Int, + Chunk.ArraySlice[Byte], + Boolean + ) // (inflatedBuffer, inflatedBytes, remainingBytes, finished) + ] = + error match { + case Some(e) => Pull.raiseError(JavaScriptException(e)) + case None => + Pull.eval { + F.async_[(Array[Byte], Int, Chunk.ArraySlice[Byte], Boolean)] { cb => + if (print) + println( + s"got chunk to inflate: ${bytesChunk.size} bytes" + ) +// val writtenBefore = inflate.bytesWritten.toLong +// if (print) println(s" bytes written before: ${writtenBefore}") + readable.read() match { + case null => + if (print) println(s" read null; error: $error, ended: $ended") + val writtenNow = inflate.bytesWritten.toLong +// val bytesWriten = writtenNow - writtenBefore +// val bytesRemaining = bytesChunk.size - offset - bytesWriten +// writtenBefore = writtenNow - val onError: js.Function1[Any, Unit] = e => println(s"readable.error: ${e}") - val onEnd: js.Function1[Any, Unit] = _ => { - println(s"!!! readable.end") - tryRead(true) - } +// if (print) println(s" bytes consumed: ${bytesWriten}") + + if (ended) { + cb((Array.empty[Byte], 0, remainingChunk(bytesChunk), true).asRight) + } else { + if (bytesChunk.nonEmpty) { + val buffer = bytesChunk.toUint8Array + writable.write( + buffer, + e => + if (!js.isUndefined(e)) { + if (error.isEmpty) { + error = e.asInstanceOf[js.Error].some + } + } + ) + chunkSent(bytesChunk) + } + cb((Array.empty[Byte], 0, emptySlice, false).asRight) + } + + case notNull => + val buffer = notNull.asInstanceOf[Buffer] + val chunk = buffer.toChunk + if (print) println(s" read buffer: ${chunk.size} bytes") - val onReadable: js.Function1[Any, Unit] = _ => { - println(s"!!! readable.readable") - readable.off("error", onError) - readable.off("end", onEnd) - tryRead(false) +// val writtenNow = inflate.bytesWritten.toLong +// val bytesWriten = writtenNow - writtenBefore +// val bytesRemaining = bytesChunk.size - offset - bytesWriten +// writtenBefore = writtenNow +// if (print) println(s" bytes consumed: ${bytesWriten}") + + val slice = chunk.toArraySlice + cb((slice.values, slice.length, bytesChunk, false).asRight) } - readable.once("error", onError) - readable.once("end", onEnd) - readable.once("readable", onReadable) - - val written = writable.write( - buffer, - (e: js.UndefOr[js.Error | Null]) => println(s"callback: $e") - // cb( - // e.toLeft { - // - // }.leftMap(js.JavaScriptException) - // ) - ) - println(s"written: $written") - - case notNull => - val buffer = notNull.asInstanceOf[Buffer] - val chunk = buffer.toChunk - println(s" read buffer before write: ${chunk.size} bytes") - cb((chunk, bytesChunk.size, false).asRight) + } } - - } } } } From 12b5913824804b1ca73c8d17c56fb1b8ddddf57d Mon Sep 17 00:00:00 2001 From: Iurii Malchenko Date: Mon, 24 Jan 2022 20:50:43 +0200 Subject: [PATCH 5/6] minor tweaks --- .../fs2/compression/CompressionPlatform.scala | 4 +- .../scala/fs2/io/compressionplatform.scala | 38 +++++++++---------- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/core/jvm/src/main/scala/fs2/compression/CompressionPlatform.scala b/core/jvm/src/main/scala/fs2/compression/CompressionPlatform.scala index 41e5e21a46..9ad204f501 100644 --- a/core/jvm/src/main/scala/fs2/compression/CompressionPlatform.scala +++ b/core/jvm/src/main/scala/fs2/compression/CompressionPlatform.scala @@ -138,8 +138,8 @@ private[compression] trait CompressionCompanionPlatform { Pull .bracketCase[F, Byte, Inflater, Unit]( Pull.eval(F.delay { -// println(s"-" * 60) -// println("creating new inflater") + println(s"-" * 60) + println("creating new inflater") new Inflater(inflateParams.header.juzDeflaterNoWrap) }), inflater => body(chunkInflater(inflateParams, inflater)), diff --git a/io/js/src/main/scala/fs2/io/compressionplatform.scala b/io/js/src/main/scala/fs2/io/compressionplatform.scala index 5623ce0638..4d1cc12984 100644 --- a/io/js/src/main/scala/fs2/io/compressionplatform.scala +++ b/io/js/src/main/scala/fs2/io/compressionplatform.scala @@ -49,26 +49,26 @@ private[fs2] trait compressionplatform { body: ChunkInflater[F] => Pull[F, Byte, Unit] ): Pull[F, Byte, Unit] = Pull.bracketCase[F, Byte, (Duplex, Readable, zlibMod.Zlib), Unit]( - Pull.pure { - val options = zlibMod - .ZlibOptions() - .setChunkSize(inflateParams.bufferSizeOrMinimum.toDouble) - -// fs2.internal.jsdeps.node.nodeConsoleMod.global.console.log( -// "options", -// options -// ) - - val writable = (inflateParams.header match { - case ZLibParams.Header.GZIP => zlibMod.createInflateRaw(options) - case ZLibParams.Header.ZLIB => zlibMod.createInflate(options) - }).asInstanceOf[Duplex] - val readable = writable.asInstanceOf[Readable] - val inflate = writable.asInstanceOf[zlibMod.Zlib] - (writable, readable, inflate) + Pull.eval { + F.delay { + println(s"-" * 60) + println("creating new inflater") + + val options = zlibMod + .ZlibOptions() + .setChunkSize(inflateParams.bufferSizeOrMinimum.toDouble) + + val writable = (inflateParams.header match { + case ZLibParams.Header.GZIP => zlibMod.createInflateRaw(options) + case ZLibParams.Header.ZLIB => zlibMod.createInflate(options) + }).asInstanceOf[Duplex] + val readable = writable.asInstanceOf[Readable] + val inflate = writable.asInstanceOf[zlibMod.Zlib] + (writable, readable, inflate) + } }, { case (writable, readable, inflate) => body(chunkInflater(writable, readable, inflate)) }, - (r, _) => Pull.pure(r._3.close()) + (r, _) => Pull.eval(F.delay(r._3.close())) ) private val emptySlice = Chunk.ArraySlice(Array.empty[Byte], 0, 0) @@ -80,7 +80,7 @@ private[fs2] trait compressionplatform { )(implicit F: Async[F]): ChunkInflater[F] = { var error: Option[js.Error] = None var ended: Boolean = false - val print = true + val print = false val onError: js.Function1[Any, Unit] = { e => if (print) println(s" . readable.error: ${e}") From cc4003b55a772723dc9e3dc24bc9cf1c0cf2b998 Mon Sep 17 00:00:00 2001 From: Iurii Malchenko Date: Mon, 24 Jan 2022 22:07:27 +0200 Subject: [PATCH 6/6] fix jvm chunk inflater --- .../fs2/compression/CompressionPlatform.scala | 7 +- .../compression/internal/ChunkInflater.scala | 2 +- .../compression/internal/InflatePipe.scala | 43 +- .../src/test/scala/fs2/CompressionSuite.scala | 1072 ++++++++--------- .../scala/fs2/io/compressionplatform.scala | 6 +- 5 files changed, 567 insertions(+), 563 deletions(-) diff --git a/core/jvm/src/main/scala/fs2/compression/CompressionPlatform.scala b/core/jvm/src/main/scala/fs2/compression/CompressionPlatform.scala index 9ad204f501..06167a3c90 100644 --- a/core/jvm/src/main/scala/fs2/compression/CompressionPlatform.scala +++ b/core/jvm/src/main/scala/fs2/compression/CompressionPlatform.scala @@ -138,8 +138,6 @@ private[compression] trait CompressionCompanionPlatform { Pull .bracketCase[F, Byte, Inflater, Unit]( Pull.eval(F.delay { - println(s"-" * 60) - println("creating new inflater") new Inflater(inflateParams.header.juzDeflaterNoWrap) }), inflater => body(chunkInflater(inflateParams, inflater)), @@ -152,8 +150,9 @@ private[compression] trait CompressionCompanionPlatform { ): ChunkInflater[F] = { val inflatedBuffer = new Array[Byte](inflateParams.bufferSizeOrMinimum) new ChunkInflater[F] { - def end: Pull[F, INothing, Unit] = Pull.pure { + def end: Pull[F, INothing, Boolean] = Pull.pure { inflater.end() + false } def inflateChunk( @@ -171,7 +170,7 @@ private[compression] trait CompressionCompanionPlatform { inflatedBuffer, inflatedBytes, bytesChunk.copy( - offset = bytesChunk.length - remaining, + offset = bytesChunk.offset + (bytesChunk.length - remaining), length = remaining ), inflater.finished() diff --git a/core/shared/src/main/scala/fs2/compression/internal/ChunkInflater.scala b/core/shared/src/main/scala/fs2/compression/internal/ChunkInflater.scala index 5054ad16ff..8f31a3920a 100644 --- a/core/shared/src/main/scala/fs2/compression/internal/ChunkInflater.scala +++ b/core/shared/src/main/scala/fs2/compression/internal/ChunkInflater.scala @@ -25,7 +25,7 @@ package internal trait ChunkInflater[F[_]] { - def end: Pull[F, INothing, Unit] + def end: Pull[F, INothing, Boolean] /** @param bytesChunk bytes to inflate * @param offset offset diff --git a/core/shared/src/main/scala/fs2/compression/internal/InflatePipe.scala b/core/shared/src/main/scala/fs2/compression/internal/InflatePipe.scala index f86550329d..249f9f99c4 100644 --- a/core/shared/src/main/scala/fs2/compression/internal/InflatePipe.scala +++ b/core/shared/src/main/scala/fs2/compression/internal/InflatePipe.scala @@ -140,24 +140,31 @@ object InflatePipe { def pull(inflatedBytesSoFar: Long): Stream[F, Byte] => Pull[F, Byte, Unit] = in => in.pull.uncons.flatMap { case None => - inflater.end >> - drain(inflatedBytesSoFar).flatMap { - case ( - _, - _, - false // not finished - ) => - Pull.raiseError(new RuntimeException("drain did not ")) - case ( - remaining, - inflatedBytesTotal, - true // finished - ) => - if (track) - setTrailerChunk(remaining, inflatedBytesTotal)(Stream.empty) - else - Pull.done - } + inflater.end.flatMap { + case true => + drain(inflatedBytesSoFar).flatMap { + case ( + _, + _, + false // not finished + ) => + Pull.raiseError(new RuntimeException("drain did not ")) + case ( + remaining, + inflatedBytesTotal, + true // finished + ) => + if (track) + setTrailerChunk(remaining, inflatedBytesTotal)(Stream.empty) + else + Pull.done + } + case false => + if (track) + setTrailerChunk(Chunk.empty, inflatedBytesSoFar)(Stream.empty) + else + Pull.done + } case Some((chunk, rest)) => inflateChunk(chunk.toArraySlice, 0).flatMap { case ( diff --git a/core/shared/src/test/scala/fs2/CompressionSuite.scala b/core/shared/src/test/scala/fs2/CompressionSuite.scala index db6ca2068d..647786b06a 100644 --- a/core/shared/src/test/scala/fs2/CompressionSuite.scala +++ b/core/shared/src/test/scala/fs2/CompressionSuite.scala @@ -86,516 +86,516 @@ abstract class CompressionSuite(implicit compression: Compression[IO]) extends F ) ) -// test("deflate input") { -// forAllF { (s: String, level0: Int, strategy0: Int, nowrap: Boolean) => -// val level = (level0 % 10).abs -// val strategy = Array( -// DeflateParams.Strategy.DEFAULT.juzDeflaterStrategy, -// DeflateParams.Strategy.FILTERED.juzDeflaterStrategy, -// DeflateParams.Strategy.HUFFMAN_ONLY.juzDeflaterStrategy -// )( -// (strategy0 % 3).abs -// ) -// val expected = deflateStream(getBytes(s), level, strategy, nowrap).toVector -// Stream -// .chunk[IO, Byte](Chunk.array(getBytes(s))) -// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) -// .through( -// Compression[IO].deflate( -// DeflateParams( -// level = DeflateParams.Level(level), -// strategy = DeflateParams.Strategy(strategy), -// header = ZLibParams.Header(nowrap) -// ) -// ) -// ) -// .compile -// .toVector -// .assertEquals(expected) -// } -// } -// -// test("inflate input") { -// forAllF { -// ( -// s: String, -// nowrap: Boolean, -// level: DeflateParams.Level, -// strategy: DeflateParams.Strategy, -// flushMode: DeflateParams.FlushMode -// ) => -// Stream -// .chunk[IO, Byte](Chunk.array(getBytes(s))) -// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) -// .through( -// Compression[IO].deflate( -// DeflateParams( -// bufferSize = 32 * 1024, -// header = if (nowrap) ZLibParams.Header.GZIP else ZLibParams.Header.ZLIB, -// level = level, -// strategy = strategy, -// flushMode = flushMode -// ) -// ) -// ) -// .compile -// .to(Array) -// .flatMap { deflated => -// val expected = inflateStream(deflated, nowrap).toVector -// Stream -// .chunk[IO, Byte](Chunk.array(deflated)) -// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) -// .through(Compression[IO].inflate(InflateParams(header = ZLibParams.Header(nowrap)))) -// .compile -// .toVector -// .assertEquals(expected) -// } -// } -// } -// -// test("inflate input (deflated larger than inflated)") { -// Stream -// .chunk[IO, Byte]( -// Chunk.array( -// getBytes( -// "꒔諒ᇂ즆ᰃ遇ኼ㎐만咘똠ᯈ䕍쏮쿻ࣇ㦲䷱瘫椪⫐褽睌쨘꛹騏蕾☦余쒧꺠ܝ猸b뷈埣ꂓ琌ཬ隖㣰忢鐮橀쁚誅렌폓㖅ꋹ켗餪庺Đ懣㫍㫌굦뢲䅦苮Ѣқ闭䮚ū﫣༶漵>껆拦휬콯耙腒䔖돆圹Ⲷ曩ꀌ㒈" -// ) -// ) -// ) -// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) -// .through( -// Compression[IO].deflate( -// DeflateParams( -// header = ZLibParams.Header.ZLIB -// ) -// ) -// ) -// .compile -// .to(Array) -// .flatMap { deflated => -// val expected = new String(inflateStream(deflated, nowrap = false)) -// Stream -// .chunk[IO, Byte](Chunk.array(deflated)) -// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) -// .through(Compression[IO].inflate(InflateParams(header = ZLibParams.Header(false)))) -// .compile -// .to(Array) -// .map(new String(_)) -// .assertEquals(expected) -// } -// } -// -// test("deflate |> inflate ~= id") { -// forAllF { -// ( -// s: String, -// nowrap: Boolean, -// level: DeflateParams.Level, -// strategy: DeflateParams.Strategy, -// flushMode: DeflateParams.FlushMode -// ) => -// Stream -// .chunk[IO, Byte](Chunk.array(getBytes(s))) -// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) -// .through( -// Compression[IO].deflate( -// DeflateParams( -// bufferSize = 32 * 1024, -// header = if (nowrap) ZLibParams.Header.GZIP else ZLibParams.Header.ZLIB, -// level = level, -// strategy = strategy, -// flushMode = flushMode -// ) -// ) -// ) -// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) -// .through(Compression[IO].inflate(InflateParams(header = ZLibParams.Header(nowrap)))) -// .compile -// .to(Array) -// .map(it => assert(it.sameElements(getBytes(s)))) -// } -// } -// -// test("deflate.compresses input") { -// val uncompressed = -// getBytes("""" -// |"A type system is a tractable syntactic method for proving the absence -// |of certain program behaviors by classifying phrases according to the -// |kinds of values they compute." -// |-- Pierce, Benjamin C. (2002). Types and Programming Languages""") -// Stream -// .chunk[IO, Byte](Chunk.array(uncompressed)) -// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) -// .through(Compression[IO].deflate(DeflateParams(level = DeflateParams.Level.NINE))) -// .compile -// .toVector -// .map(compressed => assert(compressed.length < uncompressed.length)) -// } -// -// test("deflate and inflate are reusable") { -// val bytesIn: Int = 1024 * 1024 -// val chunkSize = 1024 -// val deflater = Compression[IO].deflate(DeflateParams(bufferSize = chunkSize)) -// val inflater = Compression[IO].inflate(InflateParams(bufferSize = chunkSize)) -// val stream = Stream -// .chunk[IO, Byte](Chunk.array(1.to(bytesIn).map(_.toByte).toArray)) -// .through(deflater) -// .through(inflater) -// for { -// first <- -// stream -// .fold(Vector.empty[Byte]) { case (vector, byte) => vector :+ byte } -// .compile -// .last -// second <- -// stream -// .fold(Vector.empty[Byte]) { case (vector, byte) => vector :+ byte } -// .compile -// .last -// } yield assertEquals(first, second) -// } -// -// test("deflate |> inflate ~= id") { -// forAllF { -// ( -// s: String, -// level: DeflateParams.Level, -// strategy: DeflateParams.Strategy, -// flushMode: DeflateParams.FlushMode -// ) => -// Stream -// .chunk(Chunk.array(s.getBytes)) -// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) -// .through( -// Compression[IO].deflate( -// DeflateParams( -// bufferSize = 8192, -// header = ZLibParams.Header.GZIP, -// level = level, -// strategy = strategy, -// flushMode = flushMode -// ) -// ) -// ) -// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) -// .through( -// Compression[IO].inflate( -// InflateParams( -// bufferSize = 8192, -// header = ZLibParams.Header.GZIP -// ) -// ) -// ) -// .compile -// .toVector -// .map { result => -// assertEquals(result, s.getBytes.toVector) -// () -// } -// } -// } -// -// test("empty.gz |> gunzip") { -// -// val bytes = Array( -// 0x1f, 0x8b, 0x08, 0x08, 0x0f, 0x85, 0xc7, 0x61, 0x00, 0x03, 0x65, 0x6d, 0x70, 0x74, 0x79, -// 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 -// ).map(_.toByte) -// val expectedBytes = Array.empty[Byte] -// -// val expectedFileName = Option(toEncodableFileName("empty")) -// Stream -// .chunk(Chunk.array(bytes)) -// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) -// .through( -// Compression[IO].gunzip(8192) -// ) -// .flatMap { gunzipResult => -// assertEquals(gunzipResult.fileName, expectedFileName) -// gunzipResult.content -// } -// .compile -// .toVector -// .assertEquals(expectedBytes.toVector) -// } -// -// test("hello-compression.gz |> gunzip") { -// -// val bytes = Array( -// 0x1f, 0x8b, 0x08, 0x08, 0x99, 0x8a, 0xc7, 0x61, 0x00, 0x03, 0x68, 0x65, 0x6c, 0x6c, 0x6f, -// 0x2d, 0x63, 0x6f, 0x6d, 0x70, 0x72, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x6a, 0x73, -// 0x6f, 0x6e, 0x00, 0xab, 0x56, 0xca, 0x48, 0xcd, 0xc9, 0xc9, 0x57, 0xb2, 0x52, 0x4a, 0xce, -// 0xcf, 0x2d, 0x28, 0x4a, 0x2d, 0x2e, 0xce, 0xcc, 0xcf, 0x53, 0xaa, 0xe5, 0x02, 0x00, 0x47, -// 0x6f, 0xf6, 0xe9, 0x18, 0x00, 0x00, 0x00 -// ).map(_.toByte) -// val expectedBytes = -// """{"hello":"compression"} -// |""".stripMargin.getBytes -// -// val expectedFileName = Option(toEncodableFileName("hello-compression.json")) -// Stream -// .chunk(Chunk.array(bytes)) -// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) -// .through( -// Compression[IO].gunzip(8192) -// ) -// .flatMap { gunzipResult => -// assertEquals(gunzipResult.fileName, expectedFileName) -// gunzipResult.content -// } -// .compile -// .toVector -// .assertEquals(expectedBytes.toVector) -// } -// -// test("gzip |> gunzip ~= id") { -// forAllF { -// ( -// s: String, -// level: DeflateParams.Level, -// strategy: DeflateParams.Strategy, -// flushMode: DeflateParams.FlushMode, -// epochSeconds: Int -// ) => -// val expectedFileName = Option(toEncodableFileName(s)) -// val expectedComment = Option(toEncodableComment(s)) -// val expectedMTime = Option(FiniteDuration(epochSeconds.toLong, TimeUnit.SECONDS)) -// Stream -// .chunk(Chunk.array(s.getBytes)) -// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) -// .through( -// Compression[IO].gzip( -// fileName = Some(s), -// modificationTime = Some(FiniteDuration(epochSeconds.toLong, TimeUnit.SECONDS)), -// comment = Some(s), -// DeflateParams( -// bufferSize = 8192, -// header = ZLibParams.Header.GZIP, -// level = level, -// strategy = strategy, -// flushMode = flushMode -// ) -// ) -// ) -// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) -// .through( -// Compression[IO].gunzip(8192) -// ) -// .flatMap { gunzipResult => -// assertEquals(gunzipResult.fileName, expectedFileName) -// assertEquals(gunzipResult.comment, expectedComment) -// if (epochSeconds > 0) assertEquals(gunzipResult.modificationEpochTime, expectedMTime) -// gunzipResult.content -// } -// .compile -// .toVector -// .assertEquals(s.getBytes.toVector) -// } -// } -// -// override def scalaCheckInitialSeed = "Xf5NPB2jQW5-wZXcdI9z80NtXBKC-JhVM2tacQEBvOD=" -// -// test("gzip |> gunzip ~= id (mutually prime chunk sizes, compression larger)") { -// forAllF { -// ( -// s: String, -// level: DeflateParams.Level, -// strategy: DeflateParams.Strategy, -// flushMode: DeflateParams.FlushMode, -// epochSeconds: Int -// ) => -//// println("-" * 60) -//// println(s"test data: ${Chunk.array(s.getBytes).size} bytes") -//// println(s"test level: $level") -//// println(s"test strategy: $strategy") -//// println(s"test flushMode: $flushMode") -//// println(s"test epochSeconds: $epochSeconds") -// val expectedFileName = Option(toEncodableFileName(s)) -// val expectedComment = Option(toEncodableComment(s)) -// val expectedMTime = Option(FiniteDuration(epochSeconds.toLong, TimeUnit.SECONDS)) -// Stream -// .chunk(Chunk.array(s.getBytes)) -// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) -// .through( -// Compression[IO].gzip( -// fileName = Some(s), -// modificationTime = Some(FiniteDuration(epochSeconds.toLong, TimeUnit.SECONDS)), -// comment = Some(s), -// DeflateParams( -// bufferSize = 1031, -// header = ZLibParams.Header.GZIP, -// level = level, -// strategy = strategy, -// flushMode = flushMode -// ) -// ) -// ) -// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) -// .through( -// Compression[IO].gunzip(509) -// ) -// .flatMap { gunzipResult => -// assertEquals(gunzipResult.fileName, expectedFileName) -// assertEquals(gunzipResult.comment, expectedComment) -// if (epochSeconds > 0) assertEquals(gunzipResult.modificationEpochTime, expectedMTime) -// gunzipResult.content -// } -// .compile -// .toVector -// .assertEquals(s.getBytes.toVector) -// } -// } -// -// test("gzip |> gunzip ~= id (mutually prime chunk sizes, decompression larger)") { -// forAllF { -// ( -// s: String, -// level: DeflateParams.Level, -// strategy: DeflateParams.Strategy, -// flushMode: DeflateParams.FlushMode, -// epochSeconds: Int -// ) => -// val expectedFileName = Option(toEncodableFileName(s)) -// val expectedComment = Option(toEncodableComment(s)) -// val expectedMTime = Option(FiniteDuration(epochSeconds.toLong, TimeUnit.SECONDS)) -// Stream -// .chunk(Chunk.array(s.getBytes)) -// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) -// .through( -// Compression[IO].gzip( -// fileName = Some(s), -// modificationTime = Some(FiniteDuration(epochSeconds.toLong, TimeUnit.SECONDS)), -// comment = Some(s), -// DeflateParams( -// bufferSize = 509, -// header = ZLibParams.Header.GZIP, -// level = level, -// strategy = strategy, -// flushMode = flushMode -// ) -// ) -// ) -// .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) -// .through( -// Compression[IO].gunzip(1031) -// ) -// .flatMap { gunzipResult => -// assertEquals(gunzipResult.fileName, expectedFileName) -// assertEquals(gunzipResult.comment, expectedComment) -// if (epochSeconds > 0) assertEquals(gunzipResult.modificationEpochTime, expectedMTime) -// gunzipResult.content -// } -// .compile -// .toVector -// .assertEquals(s.getBytes.toVector) -// } -// } -// -// test("gzip.compresses input") { -// val uncompressed = -// getBytes("""" -// |"A type system is a tractable syntactic method for proving the absence -// |of certain program behaviors by classifying phrases according to the -// |kinds of values they compute." -// |-- Pierce, Benjamin C. (2002). Types and Programming Languages""") -// Stream -// .chunk[IO, Byte](Chunk.array(uncompressed)) -// .through(Compression[IO].gzip(2048)) -// .compile -// .toVector -// .map(compressed => assert(compressed.length < uncompressed.length)) -// } -// -// test("gzip.compresses input, with FLG.FHCRC set") { -// Stream -// .chunk[IO, Byte](Chunk.array(getBytes("Foo"))) -// .through( -// Compression[IO].gzip( -// fileName = None, -// modificationTime = None, -// comment = None, -// deflateParams = DeflateParams.apply( -// bufferSize = 1024 * 32, -// header = ZLibParams.Header.GZIP, -// level = DeflateParams.Level.DEFAULT, -// strategy = DeflateParams.Strategy.DEFAULT, -// flushMode = DeflateParams.FlushMode.DEFAULT, -// fhCrcEnabled = true -// ) -// ) -// ) -// .compile -// .toVector -// .map { compressed => -// val headerBytes = ByteVector(compressed.take(10)) -// val crc32 = crc.crc32(headerBytes.toBitVector).toByteArray -// val expectedCrc16 = crc32.reverse.take(2).toVector -// val actualCrc16 = compressed.drop(10).take(2) -// assertEquals(actualCrc16, expectedCrc16) -// } -// } -// -// test("gunzip limit fileName and comment length") { -// val longString: String = -// Array -// .fill(1034 * 1024)("x") -// .mkString( -// "" -// ) // max(classic.fileNameBytesSoftLimit, classic.fileCommentBytesSoftLimit) + 1 -// val expectedFileName = Option(toEncodableFileName(longString)) -// val expectedComment = Option(toEncodableComment(longString)) -// -// Stream -// .chunk(Chunk.empty[Byte]) -// .through(Compression[IO].gzip(8192, fileName = Some(longString), comment = Some(longString))) -// .chunkLimit(512) -// .unchunks // ensure chunk sizes are less than file name and comment size soft limits -// .through(Compression[IO].gunzip(8192)) -// .flatMap { gunzipResult => -// assert( -// gunzipResult.fileName -// .map(_.length) -// .getOrElse(0) < expectedFileName.map(_.length).getOrElse(0) -// ) -// assert( -// gunzipResult.comment -// .map(_.length) -// .getOrElse(0) < expectedComment.map(_.length).getOrElse(0) -// ) -// gunzipResult.content -// } -// .compile -// .last -// .assertEquals(None) -// } -// -// test("unix.gzip |> gunzip") { -// val expectedContent = "fs2.compress implementing RFC 1952\n" -// val expectedFileName = Option(toEncodableFileName("fs2.compress")) -// val expectedComment = Option.empty[String] -// val expectedMTime = Option(FiniteDuration(1580853602, TimeUnit.SECONDS)) // 2020-02-04T22:00:02Z -// val compressed = Array(0x1f, 0x8b, 0x08, 0x08, 0x62, 0xe9, 0x39, 0x5e, 0x00, 0x03, 0x66, 0x73, -// 0x32, 0x2e, 0x63, 0x6f, 0x6d, 0x70, 0x72, 0x65, 0x73, 0x73, 0x00, 0x4b, 0x2b, 0x36, 0xd2, -// 0x4b, 0xce, 0xcf, 0x2d, 0x28, 0x4a, 0x2d, 0x2e, 0x56, 0xc8, 0xcc, 0x2d, 0xc8, 0x49, 0xcd, -// 0x4d, 0xcd, 0x2b, 0xc9, 0xcc, 0x4b, 0x57, 0x08, 0x72, 0x73, 0x56, 0x30, 0xb4, 0x34, 0x35, -// 0xe2, 0x02, 0x00, 0x57, 0xb3, 0x5e, 0x6d, 0x23, 0x00, 0x00, 0x00).map(_.toByte) -// Stream -// .chunk(Chunk.array(compressed)) -// .through( -// Compression[IO].gunzip() -// ) -// .flatMap { gunzipResult => -// assertEquals(gunzipResult.fileName, expectedFileName) -// assertEquals(gunzipResult.comment, expectedComment) -// assertEquals(gunzipResult.modificationEpochTime, expectedMTime) -// gunzipResult.content -// } -// .compile -// .toVector -// .map(vector => new String(vector.toArray, StandardCharsets.US_ASCII)) -// .assertEquals(expectedContent) -// } -// + test("deflate input") { + forAllF { (s: String, level0: Int, strategy0: Int, nowrap: Boolean) => + val level = (level0 % 10).abs + val strategy = Array( + DeflateParams.Strategy.DEFAULT.juzDeflaterStrategy, + DeflateParams.Strategy.FILTERED.juzDeflaterStrategy, + DeflateParams.Strategy.HUFFMAN_ONLY.juzDeflaterStrategy + )( + (strategy0 % 3).abs + ) + val expected = deflateStream(getBytes(s), level, strategy, nowrap).toVector + Stream + .chunk[IO, Byte](Chunk.array(getBytes(s))) + .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) + .through( + Compression[IO].deflate( + DeflateParams( + level = DeflateParams.Level(level), + strategy = DeflateParams.Strategy(strategy), + header = ZLibParams.Header(nowrap) + ) + ) + ) + .compile + .toVector + .assertEquals(expected) + } + } + + test("inflate input") { + forAllF { + ( + s: String, + nowrap: Boolean, + level: DeflateParams.Level, + strategy: DeflateParams.Strategy, + flushMode: DeflateParams.FlushMode + ) => + Stream + .chunk[IO, Byte](Chunk.array(getBytes(s))) + .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) + .through( + Compression[IO].deflate( + DeflateParams( + bufferSize = 32 * 1024, + header = if (nowrap) ZLibParams.Header.GZIP else ZLibParams.Header.ZLIB, + level = level, + strategy = strategy, + flushMode = flushMode + ) + ) + ) + .compile + .to(Array) + .flatMap { deflated => + val expected = inflateStream(deflated, nowrap).toVector + Stream + .chunk[IO, Byte](Chunk.array(deflated)) + .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) + .through(Compression[IO].inflate(InflateParams(header = ZLibParams.Header(nowrap)))) + .compile + .toVector + .assertEquals(expected) + } + } + } + + test("inflate input (deflated larger than inflated)") { + Stream + .chunk[IO, Byte]( + Chunk.array( + getBytes( + "꒔諒ᇂ즆ᰃ遇ኼ㎐만咘똠ᯈ䕍쏮쿻ࣇ㦲䷱瘫椪⫐褽睌쨘꛹騏蕾☦余쒧꺠ܝ猸b뷈埣ꂓ琌ཬ隖㣰忢鐮橀쁚誅렌폓㖅ꋹ켗餪庺Đ懣㫍㫌굦뢲䅦苮Ѣқ闭䮚ū﫣༶漵>껆拦휬콯耙腒䔖돆圹Ⲷ曩ꀌ㒈" + ) + ) + ) + .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) + .through( + Compression[IO].deflate( + DeflateParams( + header = ZLibParams.Header.ZLIB + ) + ) + ) + .compile + .to(Array) + .flatMap { deflated => + val expected = new String(inflateStream(deflated, nowrap = false)) + Stream + .chunk[IO, Byte](Chunk.array(deflated)) + .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) + .through(Compression[IO].inflate(InflateParams(header = ZLibParams.Header(false)))) + .compile + .to(Array) + .map(new String(_)) + .assertEquals(expected) + } + } + + test("deflate |> inflate ~= id") { + forAllF { + ( + s: String, + nowrap: Boolean, + level: DeflateParams.Level, + strategy: DeflateParams.Strategy, + flushMode: DeflateParams.FlushMode + ) => + Stream + .chunk[IO, Byte](Chunk.array(getBytes(s))) + .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) + .through( + Compression[IO].deflate( + DeflateParams( + bufferSize = 32 * 1024, + header = if (nowrap) ZLibParams.Header.GZIP else ZLibParams.Header.ZLIB, + level = level, + strategy = strategy, + flushMode = flushMode + ) + ) + ) + .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) + .through(Compression[IO].inflate(InflateParams(header = ZLibParams.Header(nowrap)))) + .compile + .to(Array) + .map(it => assert(it.sameElements(getBytes(s)))) + } + } + + test("deflate.compresses input") { + val uncompressed = + getBytes("""" + |"A type system is a tractable syntactic method for proving the absence + |of certain program behaviors by classifying phrases according to the + |kinds of values they compute." + |-- Pierce, Benjamin C. (2002). Types and Programming Languages""") + Stream + .chunk[IO, Byte](Chunk.array(uncompressed)) + .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) + .through(Compression[IO].deflate(DeflateParams(level = DeflateParams.Level.NINE))) + .compile + .toVector + .map(compressed => assert(compressed.length < uncompressed.length)) + } + + test("deflate and inflate are reusable") { + val bytesIn: Int = 1024 * 1024 + val chunkSize = 1024 + val deflater = Compression[IO].deflate(DeflateParams(bufferSize = chunkSize)) + val inflater = Compression[IO].inflate(InflateParams(bufferSize = chunkSize)) + val stream = Stream + .chunk[IO, Byte](Chunk.array(1.to(bytesIn).map(_.toByte).toArray)) + .through(deflater) + .through(inflater) + for { + first <- + stream + .fold(Vector.empty[Byte]) { case (vector, byte) => vector :+ byte } + .compile + .last + second <- + stream + .fold(Vector.empty[Byte]) { case (vector, byte) => vector :+ byte } + .compile + .last + } yield assertEquals(first, second) + } + + test("deflate |> inflate ~= id") { + forAllF { + ( + s: String, + level: DeflateParams.Level, + strategy: DeflateParams.Strategy, + flushMode: DeflateParams.FlushMode + ) => + Stream + .chunk(Chunk.array(s.getBytes)) + .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) + .through( + Compression[IO].deflate( + DeflateParams( + bufferSize = 8192, + header = ZLibParams.Header.GZIP, + level = level, + strategy = strategy, + flushMode = flushMode + ) + ) + ) + .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) + .through( + Compression[IO].inflate( + InflateParams( + bufferSize = 8192, + header = ZLibParams.Header.GZIP + ) + ) + ) + .compile + .toVector + .map { result => + assertEquals(result, s.getBytes.toVector) + () + } + } + } + + test("empty.gz |> gunzip") { + + val bytes = Array( + 0x1f, 0x8b, 0x08, 0x08, 0x0f, 0x85, 0xc7, 0x61, 0x00, 0x03, 0x65, 0x6d, 0x70, 0x74, 0x79, + 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 + ).map(_.toByte) + val expectedBytes = Array.empty[Byte] + + val expectedFileName = Option(toEncodableFileName("empty")) + Stream + .chunk(Chunk.array(bytes)) + .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) + .through( + Compression[IO].gunzip(8192) + ) + .flatMap { gunzipResult => + assertEquals(gunzipResult.fileName, expectedFileName) + gunzipResult.content + } + .compile + .toVector + .assertEquals(expectedBytes.toVector) + } + + test("hello-compression.gz |> gunzip") { + + val bytes = Array( + 0x1f, 0x8b, 0x08, 0x08, 0x99, 0x8a, 0xc7, 0x61, 0x00, 0x03, 0x68, 0x65, 0x6c, 0x6c, 0x6f, + 0x2d, 0x63, 0x6f, 0x6d, 0x70, 0x72, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x2e, 0x6a, 0x73, + 0x6f, 0x6e, 0x00, 0xab, 0x56, 0xca, 0x48, 0xcd, 0xc9, 0xc9, 0x57, 0xb2, 0x52, 0x4a, 0xce, + 0xcf, 0x2d, 0x28, 0x4a, 0x2d, 0x2e, 0xce, 0xcc, 0xcf, 0x53, 0xaa, 0xe5, 0x02, 0x00, 0x47, + 0x6f, 0xf6, 0xe9, 0x18, 0x00, 0x00, 0x00 + ).map(_.toByte) + val expectedBytes = + """{"hello":"compression"} + |""".stripMargin.getBytes + + val expectedFileName = Option(toEncodableFileName("hello-compression.json")) + Stream + .chunk(Chunk.array(bytes)) + .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) + .through( + Compression[IO].gunzip(8192) + ) + .flatMap { gunzipResult => + assertEquals(gunzipResult.fileName, expectedFileName) + gunzipResult.content + } + .compile + .toVector + .assertEquals(expectedBytes.toVector) + } + + test("gzip |> gunzip ~= id") { + forAllF { + ( + s: String, + level: DeflateParams.Level, + strategy: DeflateParams.Strategy, + flushMode: DeflateParams.FlushMode, + epochSeconds: Int + ) => + val expectedFileName = Option(toEncodableFileName(s)) + val expectedComment = Option(toEncodableComment(s)) + val expectedMTime = Option(FiniteDuration(epochSeconds.toLong, TimeUnit.SECONDS)) + Stream + .chunk(Chunk.array(s.getBytes)) + .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) + .through( + Compression[IO].gzip( + fileName = Some(s), + modificationTime = Some(FiniteDuration(epochSeconds.toLong, TimeUnit.SECONDS)), + comment = Some(s), + DeflateParams( + bufferSize = 8192, + header = ZLibParams.Header.GZIP, + level = level, + strategy = strategy, + flushMode = flushMode + ) + ) + ) + .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) + .through( + Compression[IO].gunzip(8192) + ) + .flatMap { gunzipResult => + assertEquals(gunzipResult.fileName, expectedFileName) + assertEquals(gunzipResult.comment, expectedComment) + if (epochSeconds > 0) assertEquals(gunzipResult.modificationEpochTime, expectedMTime) + gunzipResult.content + } + .compile + .toVector + .assertEquals(s.getBytes.toVector) + } + } + + override def scalaCheckInitialSeed = "Xf5NPB2jQW5-wZXcdI9z80NtXBKC-JhVM2tacQEBvOD=" + + test("gzip |> gunzip ~= id (mutually prime chunk sizes, compression larger)") { + forAllF { + ( + s: String, + level: DeflateParams.Level, + strategy: DeflateParams.Strategy, + flushMode: DeflateParams.FlushMode, + epochSeconds: Int + ) => +// println("-" * 60) +// println(s"test data: ${Chunk.array(s.getBytes).size} bytes") +// println(s"test level: $level") +// println(s"test strategy: $strategy") +// println(s"test flushMode: $flushMode") +// println(s"test epochSeconds: $epochSeconds") + val expectedFileName = Option(toEncodableFileName(s)) + val expectedComment = Option(toEncodableComment(s)) + val expectedMTime = Option(FiniteDuration(epochSeconds.toLong, TimeUnit.SECONDS)) + Stream + .chunk(Chunk.array(s.getBytes)) + .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) + .through( + Compression[IO].gzip( + fileName = Some(s), + modificationTime = Some(FiniteDuration(epochSeconds.toLong, TimeUnit.SECONDS)), + comment = Some(s), + DeflateParams( + bufferSize = 1031, + header = ZLibParams.Header.GZIP, + level = level, + strategy = strategy, + flushMode = flushMode + ) + ) + ) + .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) + .through( + Compression[IO].gunzip(509) + ) + .flatMap { gunzipResult => + assertEquals(gunzipResult.fileName, expectedFileName) + assertEquals(gunzipResult.comment, expectedComment) + if (epochSeconds > 0) assertEquals(gunzipResult.modificationEpochTime, expectedMTime) + gunzipResult.content + } + .compile + .toVector + .assertEquals(s.getBytes.toVector) + } + } + + test("gzip |> gunzip ~= id (mutually prime chunk sizes, decompression larger)") { + forAllF { + ( + s: String, + level: DeflateParams.Level, + strategy: DeflateParams.Strategy, + flushMode: DeflateParams.FlushMode, + epochSeconds: Int + ) => + val expectedFileName = Option(toEncodableFileName(s)) + val expectedComment = Option(toEncodableComment(s)) + val expectedMTime = Option(FiniteDuration(epochSeconds.toLong, TimeUnit.SECONDS)) + Stream + .chunk(Chunk.array(s.getBytes)) + .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) + .through( + Compression[IO].gzip( + fileName = Some(s), + modificationTime = Some(FiniteDuration(epochSeconds.toLong, TimeUnit.SECONDS)), + comment = Some(s), + DeflateParams( + bufferSize = 509, + header = ZLibParams.Header.GZIP, + level = level, + strategy = strategy, + flushMode = flushMode + ) + ) + ) + .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) + .through( + Compression[IO].gunzip(1031) + ) + .flatMap { gunzipResult => + assertEquals(gunzipResult.fileName, expectedFileName) + assertEquals(gunzipResult.comment, expectedComment) + if (epochSeconds > 0) assertEquals(gunzipResult.modificationEpochTime, expectedMTime) + gunzipResult.content + } + .compile + .toVector + .assertEquals(s.getBytes.toVector) + } + } + + test("gzip.compresses input") { + val uncompressed = + getBytes("""" + |"A type system is a tractable syntactic method for proving the absence + |of certain program behaviors by classifying phrases according to the + |kinds of values they compute." + |-- Pierce, Benjamin C. (2002). Types and Programming Languages""") + Stream + .chunk[IO, Byte](Chunk.array(uncompressed)) + .through(Compression[IO].gzip(2048)) + .compile + .toVector + .map(compressed => assert(compressed.length < uncompressed.length)) + } + + test("gzip.compresses input, with FLG.FHCRC set") { + Stream + .chunk[IO, Byte](Chunk.array(getBytes("Foo"))) + .through( + Compression[IO].gzip( + fileName = None, + modificationTime = None, + comment = None, + deflateParams = DeflateParams.apply( + bufferSize = 1024 * 32, + header = ZLibParams.Header.GZIP, + level = DeflateParams.Level.DEFAULT, + strategy = DeflateParams.Strategy.DEFAULT, + flushMode = DeflateParams.FlushMode.DEFAULT, + fhCrcEnabled = true + ) + ) + ) + .compile + .toVector + .map { compressed => + val headerBytes = ByteVector(compressed.take(10)) + val crc32 = crc.crc32(headerBytes.toBitVector).toByteArray + val expectedCrc16 = crc32.reverse.take(2).toVector + val actualCrc16 = compressed.drop(10).take(2) + assertEquals(actualCrc16, expectedCrc16) + } + } + + test("gunzip limit fileName and comment length") { + val longString: String = + Array + .fill(1034 * 1024)("x") + .mkString( + "" + ) // max(classic.fileNameBytesSoftLimit, classic.fileCommentBytesSoftLimit) + 1 + val expectedFileName = Option(toEncodableFileName(longString)) + val expectedComment = Option(toEncodableComment(longString)) + + Stream + .chunk(Chunk.empty[Byte]) + .through(Compression[IO].gzip(8192, fileName = Some(longString), comment = Some(longString))) + .chunkLimit(512) + .unchunks // ensure chunk sizes are less than file name and comment size soft limits + .through(Compression[IO].gunzip(8192)) + .flatMap { gunzipResult => + assert( + gunzipResult.fileName + .map(_.length) + .getOrElse(0) < expectedFileName.map(_.length).getOrElse(0) + ) + assert( + gunzipResult.comment + .map(_.length) + .getOrElse(0) < expectedComment.map(_.length).getOrElse(0) + ) + gunzipResult.content + } + .compile + .last + .assertEquals(None) + } + + test("unix.gzip |> gunzip") { + val expectedContent = "fs2.compress implementing RFC 1952\n" + val expectedFileName = Option(toEncodableFileName("fs2.compress")) + val expectedComment = Option.empty[String] + val expectedMTime = Option(FiniteDuration(1580853602, TimeUnit.SECONDS)) // 2020-02-04T22:00:02Z + val compressed = Array(0x1f, 0x8b, 0x08, 0x08, 0x62, 0xe9, 0x39, 0x5e, 0x00, 0x03, 0x66, 0x73, + 0x32, 0x2e, 0x63, 0x6f, 0x6d, 0x70, 0x72, 0x65, 0x73, 0x73, 0x00, 0x4b, 0x2b, 0x36, 0xd2, + 0x4b, 0xce, 0xcf, 0x2d, 0x28, 0x4a, 0x2d, 0x2e, 0x56, 0xc8, 0xcc, 0x2d, 0xc8, 0x49, 0xcd, + 0x4d, 0xcd, 0x2b, 0xc9, 0xcc, 0x4b, 0x57, 0x08, 0x72, 0x73, 0x56, 0x30, 0xb4, 0x34, 0x35, + 0xe2, 0x02, 0x00, 0x57, 0xb3, 0x5e, 0x6d, 0x23, 0x00, 0x00, 0x00).map(_.toByte) + Stream + .chunk(Chunk.array(compressed)) + .through( + Compression[IO].gunzip() + ) + .flatMap { gunzipResult => + assertEquals(gunzipResult.fileName, expectedFileName) + assertEquals(gunzipResult.comment, expectedComment) + assertEquals(gunzipResult.modificationEpochTime, expectedMTime) + gunzipResult.content + } + .compile + .toVector + .map(vector => new String(vector.toArray, StandardCharsets.US_ASCII)) + .assertEquals(expectedContent) + } + test("gzip and gunzip are reusable") { val bytesIn: Int = 1024 * 1024 val chunkSize = 1024 @@ -619,32 +619,32 @@ abstract class CompressionSuite(implicit compression: Compression[IO]) extends F .last } yield assertEquals(first, second) } -// -// group("maybeGunzip") { -// def maybeGunzip[F[_]: Compression](s: Stream[F, Byte]): Stream[F, Byte] = -// s.pull -// .unconsN(2, allowFewer = true) -// .flatMap { -// case Some((hd, tl)) => -// if (hd == Chunk[Byte](0x1f, 0x8b.toByte)) -// Compression[F].gunzip(128)(tl.cons(hd)).flatMap(_.content).pull.echo -// else tl.cons(hd).pull.echo -// case None => Pull.done -// } -// .stream -// -// test("not gzip") { -// forAllF { (s: Stream[Pure, Byte]) => -// maybeGunzip[IO](s).compile.toList.assertEquals(s.toList) -// } -// } -// -// test("gzip") { -// forAllF { (s: Stream[Pure, Byte]) => -// maybeGunzip[IO](s.through(Compression[IO].gzip())).compile.toList.assertEquals(s.toList) -// } -// } -// } + + group("maybeGunzip") { + def maybeGunzip[F[_]: Compression](s: Stream[F, Byte]): Stream[F, Byte] = + s.pull + .unconsN(2, allowFewer = true) + .flatMap { + case Some((hd, tl)) => + if (hd == Chunk[Byte](0x1f, 0x8b.toByte)) + Compression[F].gunzip(128)(tl.cons(hd)).flatMap(_.content).pull.echo + else tl.cons(hd).pull.echo + case None => Pull.done + } + .stream + + test("not gzip") { + forAllF { (s: Stream[Pure, Byte]) => + maybeGunzip[IO](s).compile.toList.assertEquals(s.toList) + } + } + + test("gzip") { + forAllF { (s: Stream[Pure, Byte]) => + maybeGunzip[IO](s.through(Compression[IO].gzip())).compile.toList.assertEquals(s.toList) + } + } + } def toEncodableFileName(fileName: String): String = new String( diff --git a/io/js/src/main/scala/fs2/io/compressionplatform.scala b/io/js/src/main/scala/fs2/io/compressionplatform.scala index 4d1cc12984..baf81f8d6e 100644 --- a/io/js/src/main/scala/fs2/io/compressionplatform.scala +++ b/io/js/src/main/scala/fs2/io/compressionplatform.scala @@ -51,9 +51,6 @@ private[fs2] trait compressionplatform { Pull.bracketCase[F, Byte, (Duplex, Readable, zlibMod.Zlib), Unit]( Pull.eval { F.delay { - println(s"-" * 60) - println("creating new inflater") - val options = zlibMod .ZlibOptions() .setChunkSize(inflateParams.bufferSizeOrMinimum.toDouble) @@ -129,9 +126,10 @@ private[fs2] trait compressionplatform { } new ChunkInflater[F] { - def end: Pull[F, INothing, Unit] = Pull.pure { + def end: Pull[F, INothing, Boolean] = Pull.pure { if (print) println(s"got end") writable.end() + true } def inflateChunk(