diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/CoderSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/CoderSpec.scala index bed6fef7d9..af7191b0d7 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/CoderSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/CoderSpec.scala @@ -45,6 +45,8 @@ abstract class CoderSpec(codecName: String) extends AnyWordSpec with CodecSpecSu case object AllDataAllowed extends Exception with NoStackTrace protected def corruptInputCheck: Boolean = true + protected def autoFlush: Boolean = true + def extraTests(): Unit = {} s"The $codecName codec" should { @@ -145,6 +147,7 @@ abstract class CoderSpec(codecName: String) extends AnyWordSpec with CodecSpecSu } "be able to decode chunk-by-chunk (depending on input chunks)" in { + assume(autoFlush) val minLength = 100 val maxLength = 1000 val numElements = 1000 diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/DeflateAutoFlushSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/DeflateAutoFlushSpec.scala new file mode 100644 index 0000000000..284aaa3ae1 --- /dev/null +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/DeflateAutoFlushSpec.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.io.compression + +import org.apache.pekko.stream.scaladsl.{ Compression, Flow } +import org.apache.pekko.util.ByteString + +import java.util.zip.Deflater + +class DeflateAutoFlushSpec extends DeflateSpec { + override protected val encoderFlow: Flow[ByteString, ByteString, Any] = + Compression.deflate(Deflater.BEST_COMPRESSION, nowrap = false, autoFlush = false) + override protected val autoFlush: Boolean = false +} diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/GzipAutoFlushSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/GzipAutoFlushSpec.scala new file mode 100644 index 0000000000..031c19428b --- /dev/null +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/GzipAutoFlushSpec.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.io.compression + +import org.apache.pekko.stream.scaladsl.{ Compression, Flow } +import org.apache.pekko.util.ByteString + +import java.util.zip.Deflater + +class GzipAutoFlushSpec extends GzipSpec { + override protected val encoderFlow: Flow[ByteString, ByteString, Any] = + Compression.gzip(Deflater.BEST_COMPRESSION, autoFlush = false) + override protected val autoFlush: Boolean = false +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/io/compression/CompressionUtils.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/io/compression/CompressionUtils.scala index 7ce44d962c..d6150f48f3 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/io/compression/CompressionUtils.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/io/compression/CompressionUtils.scala @@ -28,7 +28,8 @@ import pekko.util.ByteString /** * Creates a flow from a compressor constructor. */ - def compressorFlow(newCompressor: () => Compressor): Flow[ByteString, ByteString, NotUsed] = + def compressorFlow(newCompressor: () => Compressor, autoFlush: Boolean = true) + : Flow[ByteString, ByteString, NotUsed] = Flow.fromGraph { new SimpleLinearGraphStage[ByteString] { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = @@ -36,7 +37,12 @@ import pekko.util.ByteString val compressor = newCompressor() override def onPush(): Unit = { - val data = compressor.compressAndFlush(grab(in)) + val grabbed = grab(in) + val data = if (autoFlush) + compressor.compressAndFlush(grabbed) + else + compressor.compress(grabbed) + if (data.nonEmpty) push(out, data) else pull(in) } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Compression.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Compression.scala index b5c4c94a8e..f143528da0 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Compression.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Compression.scala @@ -47,10 +47,10 @@ object Compression { scaladsl.Compression.inflate(maxBytesPerChunk, nowrap).asJava /** - * Creates a flow that gzip-compresses a stream of ByteStrings. Note that the compressor - * will SYNC_FLUSH after every [[pekko.util.ByteString]] so that it is guaranteed that every [[pekko.util.ByteString]] - * coming out of the flow can be fully decompressed without waiting for additional data. This may - * come at a compression performance cost for very small chunks. + * Creates a flow that gzip-compresses a stream of ByteStrings. Note that the compressor will + * flush after every single element in stream so that it is guaranteed that every [[pekko.util.ByteString]] + * coming out of the flow can be fully decompressed without waiting for additional data. This may come at + * a compression performance cost for very small chunks. */ def gzip: Flow[ByteString, ByteString, NotUsed] = scaladsl.Compression.gzip.asJava @@ -64,10 +64,21 @@ object Compression { scaladsl.Compression.gzip(level).asJava /** - * Creates a flow that deflate-compresses a stream of ByteString. Note that the compressor - * will SYNC_FLUSH after every [[pekko.util.ByteString]] so that it is guaranteed that every [[pekko.util.ByteString]] - * coming out of the flow can be fully decompressed without waiting for additional data. This may - * come at a compression performance cost for very small chunks. + * Same as [[gzip]] with a custom level and configurable flush mode. + * + * @param level Compression level (0-9) + * @param autoFlush If true will automatically flush after every single element in the stream. + * + * @since 1.3.0 + */ + def gzip(level: Int, autoFlush: Boolean): Flow[ByteString, ByteString, NotUsed] = + scaladsl.Compression.gzip(level, autoFlush).asJava + + /** + * Creates a flow that deflate-compresses a stream of ByteString. Note that the compressor will + * flush after every single element in stream so that it is guaranteed that every [[pekko.util.ByteString]] + * coming out of the flow can be fully decompressed without waiting for additional data. This may come at + * a compression performance cost for very small chunks. */ def deflate: Flow[ByteString, ByteString, NotUsed] = scaladsl.Compression.deflate.asJava @@ -81,4 +92,16 @@ object Compression { def deflate(level: Int, nowrap: Boolean): Flow[ByteString, ByteString, NotUsed] = scaladsl.Compression.deflate(level, nowrap).asJava + /** + * Same as [[deflate]] with configurable level, nowrap and autoFlush. + * + * @param level Compression level (0-9) + * @param nowrap if true then use GZIP compatible compression + * @param autoFlush If true will automatically flush after every single element in the stream. + * + * @since 1.3.0 + */ + def deflate(level: Int, nowrap: Boolean, autoFlush: Boolean): Flow[ByteString, ByteString, NotUsed] = + scaladsl.Compression.deflate(level, nowrap, autoFlush).asJava + } diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Compression.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Compression.scala index c8f889dcd2..ceddb7f70f 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Compression.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Compression.scala @@ -24,12 +24,10 @@ object Compression { final val MaxBytesPerChunkDefault = 64 * 1024 /** - * Creates a flow that gzip-compresses a stream of ByteStrings. Note that the compressor - * will SYNC_FLUSH after every [[pekko.util.ByteString]] so that it is guaranteed that every [[pekko.util.ByteString]] - * coming out of the flow can be fully decompressed without waiting for additional data. This may - * come at a compression performance cost for very small chunks. - * - * FIXME: should strategy / flush mode be configurable? See https://github.com/akka/akka/issues/21849 + * Creates a flow that gzip-compresses a stream of ByteStrings. Note that the compressor will + * flush after every single element in stream so that it is guaranteed that every [[pekko.util.ByteString]] + * coming out of the flow can be fully decompressed without waiting for additional data. This may come at + * a compression performance cost for very small chunks. */ def gzip: Flow[ByteString, ByteString, NotUsed] = gzip(Deflater.BEST_COMPRESSION) @@ -41,6 +39,17 @@ object Compression { def gzip(level: Int): Flow[ByteString, ByteString, NotUsed] = CompressionUtils.compressorFlow(() => new GzipCompressor(level)) + /** + * Same as [[gzip]] with a custom level and configurable flush mode. + * + * @param level Compression level (0-9) + * @param autoFlush If true will automatically flush after every single element in the stream. + * + * @since 1.3.0 + */ + def gzip(level: Int, autoFlush: Boolean): Flow[ByteString, ByteString, NotUsed] = + CompressionUtils.compressorFlow(() => new GzipCompressor(level), autoFlush) + /** * Creates a Flow that decompresses a gzip-compressed stream of data. * @@ -51,14 +60,12 @@ object Compression { Flow[ByteString].via(new GzipDecompressor(maxBytesPerChunk)).named("gzipDecompress") /** - * Creates a flow that deflate-compresses a stream of ByteString. Note that the compressor - * will SYNC_FLUSH after every [[pekko.util.ByteString]] so that it is guaranteed that every [[pekko.util.ByteString]] - * coming out of the flow can be fully decompressed without waiting for additional data. This may - * come at a compression performance cost for very small chunks. - * - * FIXME: should strategy / flush mode be configurable? See https://github.com/akka/akka/issues/21849 + * Creates a flow that deflate-compresses a stream of ByteString. Note that the compressor will + * flush after every single element in stream so that it is guaranteed that every [[pekko.util.ByteString]] + * coming out of the flow can be fully decompressed without waiting for additional data. This may come at + * a compression performance cost for very small chunks. */ - def deflate: Flow[ByteString, ByteString, NotUsed] = deflate(Deflater.BEST_COMPRESSION, false) + def deflate: Flow[ByteString, ByteString, NotUsed] = deflate(Deflater.BEST_COMPRESSION, nowrap = false) /** * Same as [[deflate]] with configurable level and nowrap @@ -69,6 +76,18 @@ object Compression { def deflate(level: Int, nowrap: Boolean): Flow[ByteString, ByteString, NotUsed] = CompressionUtils.compressorFlow(() => new DeflateCompressor(level, nowrap)) + /** + * Same as [[deflate]] with configurable level, nowrap and autoFlush. + * + * @param level Compression level (0-9) + * @param nowrap if true then use GZIP compatible compression + * @param autoFlush If true will automatically flush after every single element in the stream. + * + * @since 1.3.0 + */ + def deflate(level: Int, nowrap: Boolean, autoFlush: Boolean): Flow[ByteString, ByteString, NotUsed] = + CompressionUtils.compressorFlow(() => new DeflateCompressor(level, nowrap), autoFlush) + /** * Creates a Flow that decompresses a deflate-compressed stream of data. *