Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ abstract class CoderSpec(codecName: String) extends AnyWordSpec with CodecSpecSu
case object AllDataAllowed extends Exception with NoStackTrace
protected def corruptInputCheck: Boolean = true

protected def autoFlush: Boolean = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does that need flush interval too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, having an interval based flush with compression would yield wild results


def extraTests(): Unit = {}

s"The $codecName codec" should {
Expand Down Expand Up @@ -145,6 +147,7 @@ abstract class CoderSpec(codecName: String) extends AnyWordSpec with CodecSpecSu
}

"be able to decode chunk-by-chunk (depending on input chunks)" in {
assume(autoFlush)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is only valid when you flush on each element on the stream as its testing that each individual element can be decompressed on its own and that is not the case when you have autoFlush disabled

val minLength = 100
val maxLength = 1000
val numElements = 1000
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.stream.io.compression

import org.apache.pekko.stream.scaladsl.{ Compression, Flow }
import org.apache.pekko.util.ByteString

import java.util.zip.Deflater

class DeflateAutoFlushSpec extends DeflateSpec {
override protected val encoderFlow: Flow[ByteString, ByteString, Any] =
Compression.deflate(Deflater.BEST_COMPRESSION, nowrap = false, autoFlush = false)
override protected val autoFlush: Boolean = false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.stream.io.compression

import org.apache.pekko.stream.scaladsl.{ Compression, Flow }
import org.apache.pekko.util.ByteString

import java.util.zip.Deflater

class GzipAutoFlushSpec extends GzipSpec {
override protected val encoderFlow: Flow[ByteString, ByteString, Any] =
Compression.gzip(Deflater.BEST_COMPRESSION, autoFlush = false)
override protected val autoFlush: Boolean = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,21 @@ import pekko.util.ByteString
/**
* Creates a flow from a compressor constructor.
*/
def compressorFlow(newCompressor: () => Compressor): Flow[ByteString, ByteString, NotUsed] =
def compressorFlow(newCompressor: () => Compressor, autoFlush: Boolean = true)
: Flow[ByteString, ByteString, NotUsed] =
Flow.fromGraph {
new SimpleLinearGraphStage[ByteString] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
val compressor = newCompressor()

override def onPush(): Unit = {
val data = compressor.compressAndFlush(grab(in))
val grabbed = grab(in)
val data = if (autoFlush)
compressor.compressAndFlush(grabbed)
else
compressor.compress(grabbed)

if (data.nonEmpty) push(out, data)
else pull(in)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ object Compression {
scaladsl.Compression.inflate(maxBytesPerChunk, nowrap).asJava

/**
* Creates a flow that gzip-compresses a stream of ByteStrings. Note that the compressor
* will SYNC_FLUSH after every [[pekko.util.ByteString]] so that it is guaranteed that every [[pekko.util.ByteString]]
* coming out of the flow can be fully decompressed without waiting for additional data. This may
* come at a compression performance cost for very small chunks.
* Creates a flow that gzip-compresses a stream of ByteStrings. Note that the compressor will
* flush after every single element in stream so that it is guaranteed that every [[pekko.util.ByteString]]
* coming out of the flow can be fully decompressed without waiting for additional data. This may come at
* a compression performance cost for very small chunks.
*/
def gzip: Flow[ByteString, ByteString, NotUsed] =
scaladsl.Compression.gzip.asJava
Expand All @@ -64,10 +64,21 @@ object Compression {
scaladsl.Compression.gzip(level).asJava

/**
* Creates a flow that deflate-compresses a stream of ByteString. Note that the compressor
* will SYNC_FLUSH after every [[pekko.util.ByteString]] so that it is guaranteed that every [[pekko.util.ByteString]]
* coming out of the flow can be fully decompressed without waiting for additional data. This may
* come at a compression performance cost for very small chunks.
* Same as [[gzip]] with a custom level and configurable flush mode.
*
* @param level Compression level (0-9)
* @param autoFlush If true will automatically flush after every single element in the stream.
*
* @since 1.3.0
*/
def gzip(level: Int, autoFlush: Boolean): Flow[ByteString, ByteString, NotUsed] =
scaladsl.Compression.gzip(level, autoFlush).asJava

/**
* Creates a flow that deflate-compresses a stream of ByteString. Note that the compressor will
* flush after every single element in stream so that it is guaranteed that every [[pekko.util.ByteString]]
* coming out of the flow can be fully decompressed without waiting for additional data. This may come at
* a compression performance cost for very small chunks.
*/
def deflate: Flow[ByteString, ByteString, NotUsed] =
scaladsl.Compression.deflate.asJava
Expand All @@ -81,4 +92,16 @@ object Compression {
def deflate(level: Int, nowrap: Boolean): Flow[ByteString, ByteString, NotUsed] =
scaladsl.Compression.deflate(level, nowrap).asJava

/**
* Same as [[deflate]] with configurable level, nowrap and autoFlush.
*
* @param level Compression level (0-9)
* @param nowrap if true then use GZIP compatible compression
* @param autoFlush If true will automatically flush after every single element in the stream.
*
* @since 1.3.0
*/
def deflate(level: Int, nowrap: Boolean, autoFlush: Boolean): Flow[ByteString, ByteString, NotUsed] =
scaladsl.Compression.deflate(level, nowrap, autoFlush).asJava

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@ object Compression {
final val MaxBytesPerChunkDefault = 64 * 1024

/**
* Creates a flow that gzip-compresses a stream of ByteStrings. Note that the compressor
* will SYNC_FLUSH after every [[pekko.util.ByteString]] so that it is guaranteed that every [[pekko.util.ByteString]]
* coming out of the flow can be fully decompressed without waiting for additional data. This may
* come at a compression performance cost for very small chunks.
*
* FIXME: should strategy / flush mode be configurable? See https://github.com/akka/akka/issues/21849
* Creates a flow that gzip-compresses a stream of ByteStrings. Note that the compressor will
* flush after every single element in stream so that it is guaranteed that every [[pekko.util.ByteString]]
* coming out of the flow can be fully decompressed without waiting for additional data. This may come at
* a compression performance cost for very small chunks.
*/
def gzip: Flow[ByteString, ByteString, NotUsed] = gzip(Deflater.BEST_COMPRESSION)

Expand All @@ -41,6 +39,17 @@ object Compression {
def gzip(level: Int): Flow[ByteString, ByteString, NotUsed] =
CompressionUtils.compressorFlow(() => new GzipCompressor(level))

/**
* Same as [[gzip]] with a custom level and configurable flush mode.
*
* @param level Compression level (0-9)
* @param autoFlush If true will automatically flush after every single element in the stream.
*
* @since 1.3.0
*/
def gzip(level: Int, autoFlush: Boolean): Flow[ByteString, ByteString, NotUsed] =
CompressionUtils.compressorFlow(() => new GzipCompressor(level), autoFlush)

/**
* Creates a Flow that decompresses a gzip-compressed stream of data.
*
Expand All @@ -51,14 +60,12 @@ object Compression {
Flow[ByteString].via(new GzipDecompressor(maxBytesPerChunk)).named("gzipDecompress")

/**
* Creates a flow that deflate-compresses a stream of ByteString. Note that the compressor
* will SYNC_FLUSH after every [[pekko.util.ByteString]] so that it is guaranteed that every [[pekko.util.ByteString]]
* coming out of the flow can be fully decompressed without waiting for additional data. This may
* come at a compression performance cost for very small chunks.
*
* FIXME: should strategy / flush mode be configurable? See https://github.com/akka/akka/issues/21849
* Creates a flow that deflate-compresses a stream of ByteString. Note that the compressor will
* flush after every single element in stream so that it is guaranteed that every [[pekko.util.ByteString]]
* coming out of the flow can be fully decompressed without waiting for additional data. This may come at
* a compression performance cost for very small chunks.
*/
def deflate: Flow[ByteString, ByteString, NotUsed] = deflate(Deflater.BEST_COMPRESSION, false)
def deflate: Flow[ByteString, ByteString, NotUsed] = deflate(Deflater.BEST_COMPRESSION, nowrap = false)

/**
* Same as [[deflate]] with configurable level and nowrap
Expand All @@ -69,6 +76,18 @@ object Compression {
def deflate(level: Int, nowrap: Boolean): Flow[ByteString, ByteString, NotUsed] =
CompressionUtils.compressorFlow(() => new DeflateCompressor(level, nowrap))

/**
* Same as [[deflate]] with configurable level, nowrap and autoFlush.
*
* @param level Compression level (0-9)
* @param nowrap if true then use GZIP compatible compression
* @param autoFlush If true will automatically flush after every single element in the stream.
*
* @since 1.3.0
*/
def deflate(level: Int, nowrap: Boolean, autoFlush: Boolean): Flow[ByteString, ByteString, NotUsed] =
CompressionUtils.compressorFlow(() => new DeflateCompressor(level, nowrap), autoFlush)

/**
* Creates a Flow that decompresses a deflate-compressed stream of data.
*
Expand Down