From 921d5aa2826dda4b1298782ee411f5c8bd49f6ba Mon Sep 17 00:00:00 2001 From: Jonathan Wu Date: Wed, 23 Jul 2025 15:10:04 -0400 Subject: [PATCH 1/3] Add support for message compression Signed-off-by: Jonathan Wu --- CMakeLists.txt | 4 +- README.md | 14 +- dockerfiles/dev.Dockerfile | 5 +- examples/rmqperftest/rmqperftest_runner.cpp | 2 +- licenses-binary/LICENSE-zstd.txt | 30 +++ licenses/LICENSE-zstd.txt | 30 +++ src/CMakeLists.txt | 3 + src/rmq/CMakeLists.txt | 15 +- src/rmq/rmqa/CMakeLists.txt | 10 +- src/rmq/rmqa/rmqa_compressiontransformer.cpp | 58 +++++ src/rmq/rmqa/rmqa_compressiontransformer.h | 98 +++++++++ .../rmqa/rmqa_compressiontransformerimpl.cpp | 168 +++++++++++++++ .../rmqa/rmqa_compressiontransformerimpl.h | 67 ++++++ src/rmq/rmqa/rmqa_connectionimpl.cpp | 8 +- src/rmq/rmqa/rmqa_consumerimpl.cpp | 69 +++++- src/rmq/rmqa/rmqa_consumerimpl.h | 30 ++- src/rmq/rmqa/rmqa_producer.cpp | 6 + src/rmq/rmqa/rmqa_producer.h | 10 + src/rmq/rmqa/rmqa_producerimpl.cpp | 63 +++++- src/rmq/rmqa/rmqa_producerimpl.h | 11 + src/rmq/rmqa/rmqa_tracingconsumerimpl.cpp | 7 +- src/rmq/rmqa/rmqa_tracingconsumerimpl.h | 9 +- src/rmq/rmqamqp/rmqamqp_channelfactory.h | 2 +- src/rmq/rmqio/rmqio_timer.h | 2 +- src/rmq/rmqp/CMakeLists.txt | 1 + src/rmq/rmqp/rmqp_messagetransformer.cpp | 21 ++ src/rmq/rmqp/rmqp_messagetransformer.h | 61 ++++++ src/rmq/rmqp/rmqp_producer.h | 16 +- src/rmq/rmqt/rmqt_consumerconfig.cpp | 1 + src/rmq/rmqt/rmqt_consumerconfig.h | 24 +++ src/rmq/rmqt/rmqt_credentials.h | 2 +- src/rmq/rmqt/rmqt_endpoint.h | 2 +- src/rmqtestmocks/rmqtestmocks_mockproducer.h | 5 + src/tests/rmqa/CMakeLists.txt | 5 + src/tests/rmqa/rmqa_consumerimpl.t.cpp | 31 ++- src/tests/rmqa/rmqa_messagetransformer.t.cpp | 200 ++++++++++++++++++ .../rmqtestutil/rmqtestutil_testsuite.t.h | 2 +- vcpkg.json | 3 +- 38 files changed, 1035 insertions(+), 60 deletions(-) create mode 100644 licenses-binary/LICENSE-zstd.txt create mode 100644 licenses/LICENSE-zstd.txt create mode 100644 src/rmq/rmqa/rmqa_compressiontransformer.cpp create mode 100644 src/rmq/rmqa/rmqa_compressiontransformer.h create mode 100644 src/rmq/rmqa/rmqa_compressiontransformerimpl.cpp create mode 100644 src/rmq/rmqa/rmqa_compressiontransformerimpl.h create mode 100644 src/rmq/rmqp/rmqp_messagetransformer.cpp create mode 100644 src/rmq/rmqp/rmqp_messagetransformer.h create mode 100644 src/tests/rmqa/rmqa_messagetransformer.t.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 33db7caf..01006667 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,8 @@ -cmake_minimum_required(VERSION 3.25) +cmake_minimum_required(VERSION 3.25) project(rmq LANGUAGES C CXX) +option(ENABLE_COMPRESSION "Enable zstd compression support" ON) + set(CMAKE_EXPORT_COMPILE_COMMANDS 1) enable_testing() diff --git a/README.md b/README.md index dabb87c1..e7c47b0f 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,7 @@ This library has been built from experience learned while supporting other Rabbi + `rmqcpp` always declares topology when creating consumers & producers, as per RabbitMQ best practices. 4. ✉ Reliable Message Delivery 'on' by default + Publisher confirmations. This ensures clients are aware when messages are owned by RabbitMQ, and avoids messages being silently black holed. - + Consumer acknowledgements. Switching these on manually helps avoid messages being silently dropped during restart/outages, as would be the case with 'autoack'. + + Consumer acknowledgements. Switching these on manually helps avoid messages being silently dropped during restart/outages, as would be the case with 'autoack'. + Durable queues and persistent delivery mode ensure messages always persist during broker restarts and total datacenter shutdowns. + [Mandatory flag](https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.publish.mandatory) is defaulted to 'true' for all messages to ensure none are silently dropped due to missed bindings. + All of the above properties are used by default. Publisher confirms and consumer acknowledgements are required. @@ -58,7 +58,7 @@ graph TD; rmqp-->rmqt; ``` -**Library** | **Purpose** | **Examples** +**Library** | **Purpose** | **Examples** :--- | :---- | :---- [rmqp](src/rmq/rmqp) | RabbitMQ library interface (protocol) | Interfaces used to allow testing/mocking of `rmqcpp` applications [rmqa](src/rmq/rmqa) | RabbitMQ Library interface implementation | The main concrete objects used by applications @@ -70,7 +70,7 @@ graph TD; ## Quick Start The quickest way to get started is to take a look at our integration tests and sample 'hello world' program, which is possible by following the Docker [Build](#building) steps and then: from the interactive shell window running `./build/examples/helloworld/rmqhelloworld_producer` - + ## Usage @@ -195,7 +195,7 @@ consumer->cancelAndDrain(); ## Documentation Doxygen generated API documentation can be found [here](https://bloomberg.github.io/rmqcpp/index.html) - + ## Building ### Prerequisites @@ -206,6 +206,8 @@ Doxygen generated API documentation can be found [here](https://bloomberg.github There are build configuration options which can be specified using the environment variable `CMAKE_PRESET` (choose from configurations in `CMakePresets.json`) - eg. `export CMAKE_PRESET=macos-arm64-vcpkg`. +`zstd` compression support is enabled by default, and requires the `zstd` library to be installed. To disable this, pass the `-DENABLE_COMPRESSION=OFF` option to cmake. + ### Build Steps Once the prerequisites are configured: @@ -221,8 +223,8 @@ We also provide Dockerfiles for building and running this in an isolated environment. If you don't wish to get vcpkg set up on your build machine, this can be an alternative quick way to get started. -1. `make docker-setup` - Build required base images and setup vcpkg, prerequisite for running commands below -2. `make docker-build` - Build rmqcpp in the container using vcpkg +1. `make docker-setup` - Build required base images and setup vcpkg, prerequisite for running commands below +2. `make docker-build` - Build rmqcpp in the container using vcpkg 3. `make docker-unit` - Build rmqcpp and run unit tests in the container 4. `make docker-shell` - Get an interactive shell within the build environment container diff --git a/dockerfiles/dev.Dockerfile b/dockerfiles/dev.Dockerfile index d16354aa..b96befee 100644 --- a/dockerfiles/dev.Dockerfile +++ b/dockerfiles/dev.Dockerfile @@ -5,11 +5,12 @@ RUN apt-get update && apt-get install -y \ clang-format \ cmake \ curl \ - gcc \ + gcc \ gdb \ git \ libboost-dev \ libssl-dev \ + libzstd-dev \ net-tools \ netcat-traditional \ ninja-build \ @@ -24,7 +25,7 @@ RUN apt-get update && apt-get install -y \ zip \ && rm -rf /var/lib/apt/lists/* -ENV VCPKG_FORCE_SYSTEM_BINARIES=1 +ENV VCPKG_FORCE_SYSTEM_BINARIES=1 # clone and install vcpkg RUN git clone https://github.com/Microsoft/vcpkg.git /build/vcpkg && \ diff --git a/examples/rmqperftest/rmqperftest_runner.cpp b/examples/rmqperftest/rmqperftest_runner.cpp index 9ef89ddc..951e92f0 100644 --- a/examples/rmqperftest/rmqperftest_runner.cpp +++ b/examples/rmqperftest/rmqperftest_runner.cpp @@ -86,7 +86,7 @@ class ConfirmCallback { void operator()(const rmqt::Message&, const bsl::string&, - const rmqt::ConfirmResponse&){}; + const rmqt::ConfirmResponse&) {}; }; bsl::string_view queueNameOrRoutingKey(bsl::string_view routingKey, diff --git a/licenses-binary/LICENSE-zstd.txt b/licenses-binary/LICENSE-zstd.txt new file mode 100644 index 00000000..75800288 --- /dev/null +++ b/licenses-binary/LICENSE-zstd.txt @@ -0,0 +1,30 @@ +BSD License + +For Zstandard software + +Copyright (c) Meta Platforms, Inc. and affiliates. All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + * Neither the name Facebook, nor Meta, nor the names of its contributors may + be used to endorse or promote products derived from this software without + specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/licenses/LICENSE-zstd.txt b/licenses/LICENSE-zstd.txt new file mode 100644 index 00000000..75800288 --- /dev/null +++ b/licenses/LICENSE-zstd.txt @@ -0,0 +1,30 @@ +BSD License + +For Zstandard software + +Copyright (c) Meta Platforms, Inc. and affiliates. All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + * Neither the name Facebook, nor Meta, nor the names of its contributors may + be used to endorse or promote products derived from this software without + specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index f0518207..262b14d7 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,5 +1,8 @@ find_package(Threads REQUIRED) # CMake 3.26-rc3 Bug https://gitlab.kitware.com/cmake/cmake/-/issues/24505 find_package(ZLIB REQUIRED) +if (ENABLE_COMPRESSION) + find_package(zstd CONFIG REQUIRED) +endif() find_package(Boost REQUIRED) set(OPENSSL_USE_STATIC_LIBS TRUE) find_package(OpenSSL REQUIRED) diff --git a/src/rmq/CMakeLists.txt b/src/rmq/CMakeLists.txt index e3ef66ab..4407d7fe 100644 --- a/src/rmq/CMakeLists.txt +++ b/src/rmq/CMakeLists.txt @@ -15,6 +15,9 @@ add_library(rmq STATIC ) target_link_libraries(rmq PUBLIC bsl bdl bal ZLIB::ZLIB OpenSSL::Crypto OpenSSL::SSL) +if (ENABLE_COMPRESSION) + target_link_libraries(rmq PUBLIC zstd::libzstd_static) +endif() get_target_property(OPENSSL_TARGET_TYPE OpenSSL::SSL TYPE) if(OPENSSL_CRYPTO_LIBRARY MATCHES "\\.a$") @@ -42,19 +45,19 @@ install( COMPONENT librmq-dev ) -install(EXPORT rmqcppTargets - FILE rmqcppTargets.cmake - DESTINATION share/rmqcpp +install(EXPORT rmqcppTargets + FILE rmqcppTargets.cmake + DESTINATION share/rmqcpp NAMESPACE rmqcpp::) include(CMakePackageConfigHelpers) -configure_package_config_file(${CMAKE_CURRENT_SOURCE_DIR}/Config.cmake.in +configure_package_config_file(${CMAKE_CURRENT_SOURCE_DIR}/Config.cmake.in "${CMAKE_CURRENT_BINARY_DIR}/rmqcppConfig.cmake" - INSTALL_DESTINATION share/rmqcpp) + INSTALL_DESTINATION share/rmqcpp) install(FILES "${CMAKE_CURRENT_BINARY_DIR}/rmqcppConfig.cmake" DESTINATION share/rmqcpp) # Emit some metadata required internally -set(RMQ_PC_DEP_NAMES bsl bdl bal openssl) +set(RMQ_PC_DEP_NAMES bsl bdl bal openssl zstd) find_package(GenBDEMetadata QUIET) if (GenBDEMetadata_FOUND) gen_bde_metadata(PACKAGE_GROUP rmq INSTALL_COMPONENT librmq-dev DEPS "${RMQ_PC_DEP_NAMES}") diff --git a/src/rmq/rmqa/CMakeLists.txt b/src/rmq/rmqa/CMakeLists.txt index 26971d2c..bf61b103 100644 --- a/src/rmq/rmqa/CMakeLists.txt +++ b/src/rmq/rmqa/CMakeLists.txt @@ -1,4 +1,6 @@ -add_library(rmqa OBJECT +add_library(rmqa OBJECT + rmqa_compressiontransformer.cpp + rmqa_compressiontransformerimpl.cpp rmqa_consumer.cpp rmqa_consumerimpl.cpp rmqa_connectionimpl.cpp @@ -25,12 +27,16 @@ target_link_libraries(rmqa PUBLIC bsl bdl bal - rmqt + rmqt rmqp rmqamqp rmqio rmqamqpt ) +if (ENABLE_COMPRESSION) + target_link_libraries(rmqa PUBLIC zstd::libzstd_static) + target_compile_definitions(rmqa PRIVATE RMQCPP_ENABLE_COMPRESSION) +endif() if( "${CMAKE_CXX_COMPILER_ID}" STREQUAL "SunPro" ) # _RWSTD_ALLOCATOR tells the solaris header to define a std::allocator # which conforms better to the C++ standard, which is expected by Boost. Without diff --git a/src/rmq/rmqa/rmqa_compressiontransformer.cpp b/src/rmq/rmqa/rmqa_compressiontransformer.cpp new file mode 100644 index 00000000..7e830f70 --- /dev/null +++ b/src/rmq/rmqa/rmqa_compressiontransformer.cpp @@ -0,0 +1,58 @@ +// Copyright 2025 Bloomberg Finance L.P. +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include + +namespace BloombergLP { +namespace rmqa { + +rmqt::Result CompressionTransformer::create() +{ + return rmqt::Result( + bsl::make_shared()); +} + +CompressionTransformer::CompressionTransformer( + bslma::ManagedPtr& impl) +: d_impl(impl) +{ +} + +CompressionTransformer::~CompressionTransformer() {} + +rmqt::Result +CompressionTransformer::transform(bsl::shared_ptr >& data, + rmqt::Properties& props) +{ + return d_impl->transform(data, props); +} + +rmqt::Result<> CompressionTransformer::inverseTransform( + bsl::shared_ptr >& data, + rmqt::Properties& props) +{ + return d_impl->inverseTransform(data, props); +} + +bsl::string CompressionTransformer::name() const { return d_impl->name(); } + +} // namespace rmqa +} // namespace BloombergLP diff --git a/src/rmq/rmqa/rmqa_compressiontransformer.h b/src/rmq/rmqa/rmqa_compressiontransformer.h new file mode 100644 index 00000000..862517bb --- /dev/null +++ b/src/rmq/rmqa/rmqa_compressiontransformer.h @@ -0,0 +1,98 @@ +// Copyright 2025 Bloomberg Finance L.P. +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef INCLUDED_RMQA_COMPRESSIONTRANSFORMER +#define INCLUDED_RMQA_COMPRESSIONTRANSFORMER + +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace BloombergLP { +namespace rmqa { + +/// \class CompressionTransformer +/// \brief A message transformer that compresses messages using zstd +/// +/// CompressionTransformers are typically bound to Producers and +/// Consumers, to process messages before sending and after receiving them. +class CompressionTransformer { + public: + // CREATORS + /// Create an instance of a CompressionTransformer that can be used + /// by a single thread to manage message compression. + /// The function may fail if insufficient memory is available to allocate + /// a compression context. + static rmqt::Result create(); + + explicit CompressionTransformer( + bslma::ManagedPtr& impl); + + ~CompressionTransformer(); + + // MANIPULATORS + /// Transform the given `data` and `props` in-place into compressed form, + /// if possible. The resulting data will never be larger than the original + /// data, and the properties will be updated to indicate that the message + /// has been compressed. + /// + /// \param data The data to be compressed + /// \param props The message properties + /// + /// \return true Returned if the data was successfully compressed + /// \return false Returned if the data is deemed to be not compressible. + /// In this case, the data and properties will remain + /// unchanged + /// \return error Returned if the compression failed for any other + /// reason. The data and properties will remain + /// unchanged + rmqt::Result transform(bsl::shared_ptr >& data, + rmqt::Properties& props); + + /// Decompresses the given `data` and `props` in-place, if they were + /// compressed. If the data is not compressed, it will remain unchanged. + /// + /// \param data The data to be compressed + /// \param props The message properties + /// + /// \return true Returned if the data was successfully decompressed + /// \return false Returned if the decompression failed + rmqt::Result<> + inverseTransform(bsl::shared_ptr >& data, + rmqt::Properties& props); + + /// Returns the header name used by this transformer to indicate + /// that the message has been compressed. + bsl::string name() const; + + private: + CompressionTransformer(const CompressionTransformer&) BSLS_KEYWORD_DELETED; + CompressionTransformer& + operator=(const CompressionTransformer&) BSLS_KEYWORD_DELETED; + + private: + bslma::ManagedPtr d_impl; +}; + +} // namespace rmqa +} // namespace BloombergLP + +#endif diff --git a/src/rmq/rmqa/rmqa_compressiontransformerimpl.cpp b/src/rmq/rmqa/rmqa_compressiontransformerimpl.cpp new file mode 100644 index 00000000..f8953e18 --- /dev/null +++ b/src/rmq/rmqa/rmqa_compressiontransformerimpl.cpp @@ -0,0 +1,168 @@ +// Copyright 2025 Bloomberg Finance L.P. +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifdef RMQCPP_ENABLE_COMPRESSION + +#include + +#include +#include +#include + +#include + +#include +#include + +#include +#include +#include + +namespace BloombergLP { +namespace rmqa { + +namespace { +BALL_LOG_SET_CLASS_CATEGORY("RMQA.COMPRESSIONTRANSFORMERIMPL"); +} + +CompressionTransformerImpl::CompressionTransformerImpl() +: zctx(ZSTD_createCCtx()) +{ + if (!zctx) { + BALL_LOG_WARN << "Failed to allocate ZSTD context. Compression will " + "not be enabled."; + } +} + +CompressionTransformerImpl::~CompressionTransformerImpl() +{ + ZSTD_freeCCtx(zctx); + zctx = NULL; +} + +rmqt::Result CompressionTransformerImpl::transform( + bsl::shared_ptr >& data, + rmqt::Properties& props) +{ + if (!zctx || data->size() < 8 * 1024) { + // Do not compress small messages + return rmqt::Result(bsl::make_shared(false)); + } + + // Allocate space for compressed data + bsl::shared_ptr > compressedData = + bsl::make_shared >(); + compressedData->resize(data->size()); + + // Perform the compression + size_t compressedSize = ZSTD_compressCCtx(zctx, + compressedData->data(), + compressedData->size(), + data->data(), + data->size(), + 1); + + if (ZSTD_isError(compressedSize)) { + ZSTD_ErrorCode errorCode = ZSTD_getErrorCode(compressedSize); + if (errorCode == ZSTD_error_dstSize_tooSmall) { + // The data is not compressible + return rmqt::Result(bsl::make_shared(false)); + } + return rmqt::Result(ZSTD_getErrorName(errorCode)); + } + + // Update headers + if (!props.headers) { + props.headers = bsl::make_shared(); + } + props.headers->emplace("sdk.transform.compression.alg", + rmqt::FieldValue(bsl::string("zstd"))); + props.headers->emplace( + "sdk.transform.compression.size", + rmqt::FieldValue(static_cast(data->size()))); + + // Destroy the old data and replace it with compressed data + data = compressedData; + data->resize(compressedSize); + return rmqt::Result(bsl::make_shared(true)); +} + +rmqt::Result<> CompressionTransformerImpl::decompressZstd( + bsl::shared_ptr >& data, + size_t originalSize) +{ + bsl::shared_ptr > decompressedData = + bsl::make_shared >(originalSize); + size_t decompressedSize = ZSTD_decompress( + decompressedData->data(), originalSize, data->data(), data->size()); + if (ZSTD_isError(decompressedSize) || + (size_t)decompressedSize != originalSize) { + BALL_LOG_ERROR << "Decompression failed: " + << ZSTD_getErrorName( + ZSTD_getErrorCode(decompressedSize)); + return rmqt::Result<>( + bsl::string("Decompression failed: ") + + ZSTD_getErrorName(ZSTD_getErrorCode(decompressedSize))); + } + + // Replace the data with decompressed data + data = decompressedData; + return rmqt::Result<>(); +} + +rmqt::Result<> CompressionTransformerImpl::inverseTransform( + bsl::shared_ptr >& data, + rmqt::Properties& props) +{ + if (!props.headers) { + return rmqt::Result<>("Malformed message"); + } + int64_t originalSize = + (*props.headers)["sdk.transform.compression.size"].the(); + if (originalSize <= 0) { + BALL_LOG_ERROR << "Invalid original size for decompression: " + << originalSize; + return rmqt::Result<>("Invalid original size for decompression: " + + bsl::to_string(originalSize)); + } + bsl::string compressionAlg = + (*props.headers)["sdk.transform.compression.alg"].the(); + + rmqt::Result<> result; + if (compressionAlg == "zstd") { + result = decompressZstd(data, originalSize); + } + else { + // Unsupported compression algorithm + BALL_LOG_ERROR << "Unsupported compression algorithm: " + << compressionAlg; + return rmqt::Result<>("Unsupported compression algorithm: " + + compressionAlg); + } + + if (!result) { + return result; // Propagate error from decompression + } + + // Remove compression headers + props.headers->erase("sdk.transform.compression.alg"); + props.headers->erase("sdk.transform.compression.size"); + return rmqt::Result<>(); +} + +} // namespace rmqa +} // namespace BloombergLP + +#endif // RMQCPP_ENABLE_COMPRESSION diff --git a/src/rmq/rmqa/rmqa_compressiontransformerimpl.h b/src/rmq/rmqa/rmqa_compressiontransformerimpl.h new file mode 100644 index 00000000..0de6698f --- /dev/null +++ b/src/rmq/rmqa/rmqa_compressiontransformerimpl.h @@ -0,0 +1,67 @@ +// Copyright 2025 Bloomberg Finance L.P. +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef INCLUDED_RMQA_COMPRESSIONTRANSFORMERIMPL +#define INCLUDED_RMQA_COMPRESSIONTRANSFORMERIMPL + +#include +#include +#include + +#include +#include +#include +#include + +struct ZSTD_CCtx_s; +typedef struct ZSTD_CCtx_s ZSTD_CCtx; + +namespace BloombergLP { +namespace rmqa { + +class CompressionTransformerImpl : public rmqp::MessageTransformer { + public: + CompressionTransformerImpl(); + + ~CompressionTransformerImpl() BSLS_KEYWORD_OVERRIDE; + + rmqt::Result transform(bsl::shared_ptr >& data, + rmqt::Properties& props) BSLS_KEYWORD_OVERRIDE; + + rmqt::Result<> + inverseTransform(bsl::shared_ptr >& data, + rmqt::Properties& props) BSLS_KEYWORD_OVERRIDE; + + bsl::string name() const BSLS_KEYWORD_OVERRIDE + { + return bsl::string("compression"); + } + + private: + ZSTD_CCtx* zctx; + + rmqt::Result<> decompressZstd(bsl::shared_ptr >& data, + size_t originalSize); + + CompressionTransformerImpl(const CompressionTransformerImpl&) + BSLS_KEYWORD_DELETED; + CompressionTransformerImpl& + operator=(const CompressionTransformerImpl&) BSLS_KEYWORD_DELETED; +}; + +} // namespace rmqa +} // namespace BloombergLP + +#endif diff --git a/src/rmq/rmqa/rmqa_connectionimpl.cpp b/src/rmq/rmqa/rmqa_connectionimpl.cpp index e663c01a..d9e203aa 100644 --- a/src/rmq/rmqa/rmqa_connectionimpl.cpp +++ b/src/rmq/rmqa/rmqa_connectionimpl.cpp @@ -28,6 +28,8 @@ #include #include +#include + #include #include #include @@ -41,6 +43,7 @@ #include #include #include +#include namespace BloombergLP { namespace rmqa { @@ -75,6 +78,7 @@ rmqt::Result setupConsumer( bdlmt::ThreadPool& threadPool, const bsl::shared_ptr& ackQueue, const bsl::shared_ptr& consumerFactory, + const bsl::vector >& transformers, const rmqt::Result& receiveChannel) { if (receiveChannel) { @@ -85,7 +89,8 @@ rmqt::Result setupConsumer( consumerTag, bsl::ref(threadPool), bsl::ref(eventLoop), - ackQueue)); + ackQueue, + transformers)); rmqt::Result<> result = consumer->start(); return result ? rmqt::Result(consumer) : rmqt::Result(result.error(), @@ -345,6 +350,7 @@ rmqt::Future ConnectionImpl::createConsumerAsync( : bsl::ref(d_threadPool), ackQueue, d_consumerFactory, + consumerConfig.transformers(), bdlf::PlaceHolders::_1)); } diff --git a/src/rmq/rmqa/rmqa_consumerimpl.cpp b/src/rmq/rmqa/rmqa_consumerimpl.cpp index 698ddd9b..6ccb7e89 100644 --- a/src/rmq/rmqa/rmqa_consumerimpl.cpp +++ b/src/rmq/rmqa/rmqa_consumerimpl.cpp @@ -20,18 +20,21 @@ #include #include #include +#include #include +#include #include #include #include #include -#include -#include -#include +#include #include #include +#include +#include +#include namespace BloombergLP { namespace rmqa { @@ -49,7 +52,9 @@ bsl::shared_ptr ConsumerImpl::Factory::create( const bsl::string& consumerTag, bdlmt::ThreadPool& threadPool, rmqio::EventLoop& eventLoop, - const bsl::shared_ptr& ackQueue) const + const bsl::shared_ptr& ackQueue, + const bsl::vector >& transformers) + const { return bsl::shared_ptr( new ConsumerImpl(channel, @@ -59,7 +64,8 @@ bsl::shared_ptr ConsumerImpl::Factory::create( threadPool, eventLoop, ackQueue, - bsl::make_shared())); + bsl::make_shared(), + transformers)); } ConsumerImpl::ConsumerImpl( @@ -70,7 +76,8 @@ ConsumerImpl::ConsumerImpl( bdlmt::ThreadPool& threadPool, rmqio::EventLoop& eventLoop, const bsl::shared_ptr& ackQueue, - const bsl::shared_ptr& guardFactory) + const bsl::shared_ptr& guardFactory, + const bsl::vector >& transformers) : d_consumerTag(consumerTag) , d_queue(queue) , d_onMessage(onMessage) @@ -81,6 +88,7 @@ ConsumerImpl::ConsumerImpl( , d_ackMessageMutex() , d_channel(channel) , d_guardFactory(guardFactory) +, d_transformers(transformers) , d_onNewAckBatch( bdlf::BindUtil::bind(&rmqamqp::ReceiveChannel::consumeAckBatchFromQueue, d_channel)) @@ -182,6 +190,42 @@ void ConsumerImpl::ackMessage(const rmqt::ConsumerAck& ack) } } +bool ConsumerImpl::unpackTransformations(rmqt::Message& dstMessage, + const rmqt::Message& srcMessage) +{ + // Unpack source message + bsl::shared_ptr > rawData = + bsl::make_shared >(srcMessage.payload(), + srcMessage.payload() + + srcMessage.payloadSize()); + rmqt::Properties properties = srcMessage.properties(); + + // Undo all transformations + for (bsl::vector< + bsl::shared_ptr >::reverse_iterator it = + d_transformers.rbegin(); + it != d_transformers.rend(); + ++it) { + bsl::string headerName = "sdk.transform." + (*it)->name(); + if (!properties.headers || + properties.headers->find(headerName) == properties.headers->end()) { + BALL_LOG_DEBUG << "No transformation header found for " + << (*it)->name(); + continue; // No transformation header, skip + } + rmqt::Result<> r = (*it)->inverseTransform(rawData, properties); + if (!r) { + BALL_LOG_ERROR << "Inverse transformation failed: " << r.error(); + return false; + } + properties.headers->erase(headerName); // Remove transformation header + } + + // Pack into destination message + dstMessage = rmqt::Message(rawData, properties); + return true; +} + void ConsumerImpl::threadPoolHandleMessage( const bsl::weak_ptr& consumerWeakPtr, const rmqt::Message& message, @@ -195,11 +239,22 @@ void ConsumerImpl::threadPoolHandleMessage( return; } + rmqt::Message untransformedMsg; + if (consumer->d_transformers.size() > 0) { + if (!consumer->unpackTransformations(untransformedMsg, message)) { + BALL_LOG_ERROR << "Failed to undo transformations to message " + << message.guid(); + return; + } + } + const rmqt::Message& realMsg = + consumer->d_transformers.size() > 0 ? untransformedMsg : message; + using bdlf::PlaceHolders::_1; bslma::ManagedPtr guard( consumer->d_guardFactory->create( - message, envelope, consumer->d_messageGuardCb, consumer.ptr())); + realMsg, envelope, consumer->d_messageGuardCb, consumer.ptr())); BALL_LOG_DEBUG << "Delivering: " << *guard << " to client"; diff --git a/src/rmq/rmqa/rmqa_consumerimpl.h b/src/rmq/rmqa/rmqa_consumerimpl.h index 5fe7249a..ee42dbfc 100644 --- a/src/rmq/rmqa/rmqa_consumerimpl.h +++ b/src/rmq/rmqa/rmqa_consumerimpl.h @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -35,6 +36,7 @@ #include #include #include +#include //@PURPOSE: Provide a RabbitMQ Async Consumer API // @@ -62,19 +64,23 @@ class ConsumerImpl : public rmqp::Consumer, const bsl::string& consumerTag, bdlmt::ThreadPool& threadPool, rmqio::EventLoop& eventLoop, - const bsl::shared_ptr& ackQueue) const; + const bsl::shared_ptr& ackQueue, + const bsl::vector >& + transformers) const; }; // CREATORS - ConsumerImpl(const bsl::shared_ptr& channel, - rmqt::QueueHandle queue, - const bsl::shared_ptr& onMessage, - const bsl::string& consumerTag, - bdlmt::ThreadPool& threadPool, - rmqio::EventLoop& eventLoop, - const bsl::shared_ptr& ackQueue, - const bsl::shared_ptr& - messageGuardFactory); + ConsumerImpl( + const bsl::shared_ptr& channel, + rmqt::QueueHandle queue, + const bsl::shared_ptr& onMessage, + const bsl::string& consumerTag, + bdlmt::ThreadPool& threadPool, + rmqio::EventLoop& eventLoop, + const bsl::shared_ptr& ackQueue, + const bsl::shared_ptr& messageGuardFactory, + const bsl::vector >& + transformers); /// Destructor stops the consumer ~ConsumerImpl(); @@ -100,6 +106,9 @@ class ConsumerImpl : public rmqp::Consumer, void ackMessage(const rmqt::ConsumerAck& ack); + bool unpackTransformations(rmqt::Message& dstMessage, + const rmqt::Message& srcMessage); + /// Called from the event loop thread with a received message /// This class dispatches this call onto a threadpool thread static void @@ -128,6 +137,7 @@ class ConsumerImpl : public rmqp::Consumer, bsl::shared_ptr d_channel; bsl::shared_ptr d_guardFactory; + bsl::vector > d_transformers; bsl::function d_onNewAckBatch; bsl::function d_messageGuardCb; diff --git a/src/rmq/rmqa/rmqa_producer.cpp b/src/rmq/rmqa/rmqa_producer.cpp index 7c60190c..0ba18bc1 100644 --- a/src/rmq/rmqa/rmqa_producer.cpp +++ b/src/rmq/rmqa/rmqa_producer.cpp @@ -25,6 +25,12 @@ Producer::Producer(bslma::ManagedPtr& impl) Producer::~Producer() {} +void Producer::addTransformer( + const bsl::shared_ptr& transformer) +{ + d_impl->addTransformer(transformer); +} + rmqp::Producer::SendStatus Producer::send(const rmqt::Message& message, const bsl::string& routingKey, diff --git a/src/rmq/rmqa/rmqa_producer.h b/src/rmq/rmqa/rmqa_producer.h index e50569dc..baf00f84 100644 --- a/src/rmq/rmqa/rmqa_producer.h +++ b/src/rmq/rmqa/rmqa_producer.h @@ -18,6 +18,7 @@ #define INCLUDED_RMQA_PRODUCER #include +#include #include #include #include @@ -25,6 +26,7 @@ #include #include +#include #include #include #include @@ -88,6 +90,14 @@ class Producer { const rmqp::Producer::ConfirmationCallback& confirmCallback, const bsls::TimeInterval& timeout = bsls::TimeInterval()); + /// Registers the tranformation function to be applied to all outgoing + /// messages before being sent to the broker. + /// + /// \param transformer The transformation function to be applied to all + /// messages processed by this producer + void addTransformer( + const bsl::shared_ptr& transformer); + /// Send a message with the given `routingKey` to the exchange this /// Producer targets, with the specified mandatory flag. /// Use the simpler Producer::send() unless you understand & intend to set diff --git a/src/rmq/rmqa/rmqa_producerimpl.cpp b/src/rmq/rmqa/rmqa_producerimpl.cpp index 239ba224..878a888b 100644 --- a/src/rmq/rmqa/rmqa_producerimpl.cpp +++ b/src/rmq/rmqa/rmqa_producerimpl.cpp @@ -17,6 +17,7 @@ #include #include +#include #include #include #include @@ -32,6 +33,7 @@ #include #include #include +#include namespace BloombergLP { namespace rmqa { @@ -163,6 +165,12 @@ bool ProducerImpl::registerUniqueCallback( return true; } +void ProducerImpl::addTransformer( + const bsl::shared_ptr& transformer) +{ + d_transformers.push_back(transformer); +} + rmqp::Producer::SendStatus ProducerImpl::send(const rmqt::Message& message, const bsl::string& routingKey, @@ -189,6 +197,46 @@ ProducerImpl::send(const rmqt::Message& message, message, routingKey, mandatoryFlag, confirmCallback, timeout); } +bool ProducerImpl::applyTransformations(rmqt::Message& dstMessage, + const rmqt::Message& srcMessage) +{ + // Unpack source message + bsl::shared_ptr > rawData = + bsl::make_shared >( + srcMessage.payload(), + srcMessage.payload() + srcMessage.payloadSize()); + rmqt::Properties props = srcMessage.properties(); + + // Apply all transformations + for (bsl::vector >::iterator it = + d_transformers.begin(); + it != d_transformers.end(); + ++it) { + rmqt::Result r = (*it)->transform(rawData, props); + if (!r) { + BALL_LOG_ERROR << "Transformation failed"; + return false; + } + else if (*r.value()) { + if (!props.headers + ->emplace("sdk.transform." + (*it)->name(), + rmqt::FieldValue(bsl::string("ok"))) + .second) { + BALL_LOG_ERROR << "Reserved header 'sdk.transform." + << (*it)->name() << "' already exists"; + return false; + } + } + else { + BALL_LOG_DEBUG << "Transformation ignored"; + } + } + + // Pack into destination message + dstMessage = rmqt::Message(rawData, props); + return true; +} + rmqp::Producer::SendStatus ProducerImpl::sendImpl( const rmqt::Message& message, const bsl::string& routingKey, @@ -196,9 +244,20 @@ rmqp::Producer::SendStatus ProducerImpl::sendImpl( const rmqp::Producer::ConfirmationCallback& confirmCallback, const bsls::TimeInterval& timeout) { + rmqt::Message transformedMsg; + if (d_transformers.size() > 0) { + if (!applyTransformations(transformedMsg, message)) { + BALL_LOG_ERROR << "Failed to apply transformations to message " + << message.guid(); + return rmqp::Producer::TRANSFORM_ERROR; + } + } + const rmqt::Message& realMsg = + d_transformers.size() > 0 ? transformedMsg : message; + BALL_LOG_TRACE << "Waiting on send(exchange) outstanding message limit for message " - << message; + << realMsg; if (timeout.totalNanoseconds()) { if (d_sharedState->outstandingMessagesCap.timedWait( @@ -210,7 +269,7 @@ rmqp::Producer::SendStatus ProducerImpl::sendImpl( d_sharedState->outstandingMessagesCap.wait(); } - return doSend(message, routingKey, mandatoryFlag, confirmCallback); + return doSend(realMsg, routingKey, mandatoryFlag, confirmCallback); } rmqp::Producer::SendStatus ProducerImpl::trySend( diff --git a/src/rmq/rmqa/rmqa_producerimpl.h b/src/rmq/rmqa/rmqa_producerimpl.h index 6a84e477..a789e978 100644 --- a/src/rmq/rmqa/rmqa_producerimpl.h +++ b/src/rmq/rmqa/rmqa_producerimpl.h @@ -16,6 +16,7 @@ #ifndef INCLUDED_RMQA_PRODUCERIMPL #define INCLUDED_RMQA_PRODUCERIMPL +#include #include #include #include @@ -37,6 +38,7 @@ #include #include #include +#include //@PURPOSE: Implements the rmqa::Producer interface // @@ -70,6 +72,10 @@ class ProducerImpl : public rmqp::Producer { ~ProducerImpl() BSLS_KEYWORD_OVERRIDE; + void + addTransformer(const bsl::shared_ptr& transformer) + BSLS_KEYWORD_OVERRIDE; + SendStatus send(const rmqt::Message& message, const bsl::string& routingKey, const rmqp::Producer::ConfirmationCallback& confirmCallback, @@ -142,12 +148,17 @@ class ProducerImpl : public rmqp::Producer { const rmqp::Producer::ConfirmationCallback& confirmCallback, const bsls::TimeInterval& timeout); + bool applyTransformations(rmqt::Message& dstMessage, + const rmqt::Message& srcMessage); + rmqio::EventLoop& d_eventLoop; bsl::shared_ptr d_channel; bsl::shared_ptr d_sharedState; + bsl::vector > d_transformers; + }; // class Producer } // namespace rmqa diff --git a/src/rmq/rmqa/rmqa_tracingconsumerimpl.cpp b/src/rmq/rmqa/rmqa_tracingconsumerimpl.cpp index d531c322..109f7fd6 100644 --- a/src/rmq/rmqa/rmqa_tracingconsumerimpl.cpp +++ b/src/rmq/rmqa/rmqa_tracingconsumerimpl.cpp @@ -36,7 +36,9 @@ bsl::shared_ptr TracingConsumerImpl::Factory::create( const bsl::string& consumerTag, bdlmt::ThreadPool& threadPool, rmqio::EventLoop& eventLoop, - const bsl::shared_ptr& ackQueue) const + const bsl::shared_ptr& ackQueue, + const bsl::vector >& transformers) + const { return bsl::shared_ptr( new ConsumerImpl(channel, @@ -47,7 +49,8 @@ bsl::shared_ptr TracingConsumerImpl::Factory::create( eventLoop, ackQueue, bsl::make_shared( - queue, d_endpoint, d_tracing))); + queue, d_endpoint, d_tracing), + transformers)); } } // namespace rmqa diff --git a/src/rmq/rmqa/rmqa_tracingconsumerimpl.h b/src/rmq/rmqa/rmqa_tracingconsumerimpl.h index b4828d4e..f474abd1 100644 --- a/src/rmq/rmqa/rmqa_tracingconsumerimpl.h +++ b/src/rmq/rmqa/rmqa_tracingconsumerimpl.h @@ -19,6 +19,7 @@ #include #include +#include #include //@PURPOSE: Provide a RabbitMQ Async Tracing Consumer API @@ -36,7 +37,8 @@ class ReceiveChannel; } namespace rmqp { class ConsumerTracing; -} +class MessageTransformer; +} // namespace rmqp namespace rmqa { @@ -54,8 +56,9 @@ class TracingConsumerImpl { const bsl::string& consumerTag, bdlmt::ThreadPool& threadPool, rmqio::EventLoop& eventLoop, - const bsl::shared_ptr& ackQueue) const - BSLS_KEYWORD_OVERRIDE; + const bsl::shared_ptr& ackQueue, + const bsl::vector >& + transformers) const BSLS_KEYWORD_OVERRIDE; private: bsl::shared_ptr d_endpoint; diff --git a/src/rmq/rmqamqp/rmqamqp_channelfactory.h b/src/rmq/rmqamqp/rmqamqp_channelfactory.h index 49a2358b..d0ea3ed7 100644 --- a/src/rmq/rmqamqp/rmqamqp_channelfactory.h +++ b/src/rmq/rmqamqp/rmqamqp_channelfactory.h @@ -36,7 +36,7 @@ namespace rmqamqp { class ChannelFactory { public: - virtual ~ChannelFactory(){}; + virtual ~ChannelFactory() {}; virtual bsl::shared_ptr createReceiveChannel( const rmqt::Topology& topology, diff --git a/src/rmq/rmqio/rmqio_timer.h b/src/rmq/rmqio/rmqio_timer.h index f31e7952..327db63c 100644 --- a/src/rmq/rmqio/rmqio_timer.h +++ b/src/rmq/rmqio/rmqio_timer.h @@ -72,7 +72,7 @@ class Timer { class TimerFactory { public: - virtual ~TimerFactory(){}; + virtual ~TimerFactory() {}; /// Creates a timer and initializes its timeout. The timer should be /// started by calling start(). diff --git a/src/rmq/rmqp/CMakeLists.txt b/src/rmq/rmqp/CMakeLists.txt index 0ccdba98..8d858ab5 100644 --- a/src/rmq/rmqp/CMakeLists.txt +++ b/src/rmq/rmqp/CMakeLists.txt @@ -3,6 +3,7 @@ add_library(rmqp OBJECT rmqp_consumertracing.cpp rmqp_connection.cpp rmqp_messageguard.cpp + rmqp_messagetransformer.cpp rmqp_metricpublisher.cpp rmqp_producer.cpp rmqp_producertracing.cpp diff --git a/src/rmq/rmqp/rmqp_messagetransformer.cpp b/src/rmq/rmqp/rmqp_messagetransformer.cpp new file mode 100644 index 00000000..49cf857e --- /dev/null +++ b/src/rmq/rmqp/rmqp_messagetransformer.cpp @@ -0,0 +1,21 @@ +// Copyright 2025 Bloomberg Finance L.P. +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +namespace BloombergLP { +namespace rmqp { +} // namespace rmqp +} // namespace BloombergLP diff --git a/src/rmq/rmqp/rmqp_messagetransformer.h b/src/rmq/rmqp/rmqp_messagetransformer.h new file mode 100644 index 00000000..0ddddf5e --- /dev/null +++ b/src/rmq/rmqp/rmqp_messagetransformer.h @@ -0,0 +1,61 @@ +// Copyright 2025 Bloomberg Finance L.P. +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef INCLUDED_RMQP_MESSAGETRANSFORMER +#define INCLUDED_RMQP_MESSAGETRANSFORMER + +#include +#include + +#include +#include +#include + +namespace BloombergLP { +namespace rmqp { + +/// \brief Abstract class for message transformations. +class MessageTransformer { + public: + virtual ~MessageTransformer() {}; + + /// \brief Transform the data and properties of a message. + /// + /// The transform method can modify the data and properties of a message + /// (ideally in place). + /// It should return `true` if the transformation was successful and should + /// be reversed upon receipt of this message, `false` if the transformation + /// was not applicable or should not be reversed, and an error message if + /// the transformation failed due to an error. + virtual rmqt::Result + transform(bsl::shared_ptr >& data, + rmqt::Properties& props) = 0; + + /// \brief Inverse transform the data and properties of a message. + /// + /// This method should reverse the effects of the `transform` method. + /// It should return an empty Result on success, or an error message + /// on failure. + virtual rmqt::Result<> + inverseTransform(bsl::shared_ptr >& data, + rmqt::Properties& props) = 0; + + virtual bsl::string name() const = 0; +}; + +} // namespace rmqp +} // namespace BloombergLP + +#endif diff --git a/src/rmq/rmqp/rmqp_producer.h b/src/rmq/rmqp/rmqp_producer.h index d0becb99..71715a3c 100644 --- a/src/rmq/rmqp/rmqp_producer.h +++ b/src/rmq/rmqp/rmqp_producer.h @@ -19,6 +19,7 @@ #include +#include #include #include #include @@ -29,6 +30,7 @@ #include #include +#include #include #include @@ -45,7 +47,13 @@ class Producer { public: // TYPES /// \brief Possible results of rmqp::Producer#send. - enum SendStatus { SENDING, DUPLICATE, TIMEOUT, INFLIGHT_LIMIT }; + enum SendStatus { + SENDING, + DUPLICATE, + TIMEOUT, + INFLIGHT_LIMIT, + TRANSFORM_ERROR + }; /// \brief Invoked on receipt of message confirmation. /// @@ -67,6 +75,12 @@ class Producer { // MANIPULATORS + /// \brief Adds a transformation function to be run on all messages + /// + /// \param transformer The transformation function + virtual void addTransformer( + const bsl::shared_ptr& transformer) = 0; + /// \brief Send a message with the given `routingKey` to the exchange /// targeted by the producer. /// diff --git a/src/rmq/rmqt/rmqt_consumerconfig.cpp b/src/rmq/rmqt/rmqt_consumerconfig.cpp index 4bb38962..d083ef1a 100644 --- a/src/rmq/rmqt/rmqt_consumerconfig.cpp +++ b/src/rmq/rmqt/rmqt_consumerconfig.cpp @@ -33,6 +33,7 @@ ConsumerConfig::ConsumerConfig( , d_threadpool(threadpool) , d_exclusiveFlag(exclusiveFlag) , d_consumerPriority(consumerPriority) +, d_transformers(bsl::vector >()) { } diff --git a/src/rmq/rmqt/rmqt_consumerconfig.h b/src/rmq/rmqt/rmqt_consumerconfig.h index 3c4e2ab9..b3ef999a 100644 --- a/src/rmq/rmqt/rmqt_consumerconfig.h +++ b/src/rmq/rmqt/rmqt_consumerconfig.h @@ -24,9 +24,15 @@ #include #include +#include #include +#include namespace BloombergLP { +namespace rmqp { +class MessageTransformer; // forward declaration +} + namespace rmqt { /// \brief Class for passing arguments to Consumer @@ -82,6 +88,12 @@ class ConsumerConfig { return d_consumerPriority; } + const bsl::vector >& + transformers() const + { + return d_transformers; + } + // Setters /// \param consumerTag A label for the consumer which is displayed on the /// RabbitMQ Management UI. It is useful to give this a meaningful @@ -140,12 +152,24 @@ class ConsumerConfig { return *this; } + /// \param transform A message transformer to be undone for each + /// message received by the consumer. Multiple transformers will be + /// called in the inverse order, i.e. the last transformer in + /// added is applied first. + ConsumerConfig& + addTransformer(const bsl::shared_ptr& transformer) + { + d_transformers.push_back(transformer); + return *this; + } + private: bsl::string d_consumerTag; uint16_t d_prefetchCount; bdlmt::ThreadPool* d_threadpool; rmqt::Exclusive::Value d_exclusiveFlag; bsl::optional d_consumerPriority; + bsl::vector > d_transformers; }; } // namespace rmqt diff --git a/src/rmq/rmqt/rmqt_credentials.h b/src/rmq/rmqt/rmqt_credentials.h index a736c706..346f3d5c 100644 --- a/src/rmq/rmqt/rmqt_credentials.h +++ b/src/rmq/rmqt/rmqt_credentials.h @@ -25,7 +25,7 @@ namespace rmqt { class Credentials { public: - virtual ~Credentials(){}; + virtual ~Credentials() {}; virtual bsl::string formatCredentials() = 0; virtual bsl::string authenticationMechanism() = 0; }; diff --git a/src/rmq/rmqt/rmqt_endpoint.h b/src/rmq/rmqt/rmqt_endpoint.h index dbc13efb..77052735 100644 --- a/src/rmq/rmqt/rmqt_endpoint.h +++ b/src/rmq/rmqt/rmqt_endpoint.h @@ -29,7 +29,7 @@ class SecurityParameters; class Endpoint { public: - virtual ~Endpoint(){}; + virtual ~Endpoint() {}; virtual bsl::string formatAddress() const = 0; virtual bsl::string hostname() const = 0; virtual bsl::string vhost() const = 0; diff --git a/src/rmqtestmocks/rmqtestmocks_mockproducer.h b/src/rmqtestmocks/rmqtestmocks_mockproducer.h index 52c3f1d6..9b6a9881 100644 --- a/src/rmqtestmocks/rmqtestmocks_mockproducer.h +++ b/src/rmqtestmocks/rmqtestmocks_mockproducer.h @@ -17,6 +17,7 @@ #define INCLUDED_RMQTESTMOCKS_MOCKPRODUCER #include +#include #include #include #include @@ -48,6 +49,10 @@ class MockProducer : public rmqp::Producer { static rmqt::Future timeoutAsync(); static rmqt::Future errorAsync(); + MOCK_METHOD1( + addTransformer, + void(const bsl::shared_ptr& transformer)); + MOCK_METHOD4( send, rmqp::Producer::SendStatus( diff --git a/src/tests/rmqa/CMakeLists.txt b/src/tests/rmqa/CMakeLists.txt index 307e2257..07286ace 100644 --- a/src/tests/rmqa/CMakeLists.txt +++ b/src/tests/rmqa/CMakeLists.txt @@ -3,6 +3,7 @@ add_executable(rmqa_tests rmqa_consumerimpl.t.cpp rmqa_connectionimpl.t.cpp rmqa_connectionstring.t.cpp + rmqa_messagetransformer.t.cpp rmqa_messageguard.t.cpp rmqa_producerimpl.t.cpp rmqa_rabbitcontextimpl.t.cpp @@ -25,4 +26,8 @@ target_link_libraries(rmqa_tests PUBLIC GTest::gmock ) +if (ENABLE_COMPRESSION) + target_compile_definitions(rmqa_tests PRIVATE RMQCPP_ENABLE_COMPRESSION) +endif() + add_test(NAME rmqa_tests COMMAND rmqa_tests) diff --git a/src/tests/rmqa/rmqa_consumerimpl.t.cpp b/src/tests/rmqa/rmqa_consumerimpl.t.cpp index 6b54d2e5..55fbdb2e 100644 --- a/src/tests/rmqa/rmqa_consumerimpl.t.cpp +++ b/src/tests/rmqa/rmqa_consumerimpl.t.cpp @@ -19,6 +19,7 @@ #include #include +#include #include #include @@ -128,6 +129,7 @@ class ConsumerImplTests : public TestWithParam { bsl::shared_ptr d_callback; bsl::shared_ptr d_tracing; bsl::shared_ptr d_factory; + bsl::vector > d_transformers; ConsumerImplTests() : d_queue(bsl::make_shared("test")) @@ -143,7 +145,7 @@ class ConsumerImplTests : public TestWithParam { _1))) , d_tracing(bsl::make_shared()) , d_factory(paramPicker(GetParam())) - + , d_transformers(bsl::vector >()) { d_threadPool.start(); ON_CALL(d_eventLoop, postImpl(_)).WillByDefault(ExecuteItem()); @@ -171,7 +173,8 @@ TEST_P(ConsumerImplTests, ItsAlive) d_consumerTag, bsl::ref(d_threadPool), bsl::ref(d_eventLoop), - d_ackQueue); + d_ackQueue, + d_transformers); } TEST_P(ConsumerImplTests, MessageTriggersClientCallback) @@ -187,7 +190,8 @@ TEST_P(ConsumerImplTests, MessageTriggersClientCallback) d_consumerTag, bsl::ref(d_threadPool), bsl::ref(d_eventLoop), - d_ackQueue); + d_ackQueue, + d_transformers); consumer->start(); bsl::shared_ptr > data = @@ -227,7 +231,8 @@ TEST_P(ConsumerImplTests, MessageTriggersChannelAck) d_consumerTag, bsl::ref(d_threadPool), bsl::ref(d_eventLoop), - d_ackQueue); + d_ackQueue, + d_transformers); consumer->start(); bsl::shared_ptr > data = @@ -265,7 +270,8 @@ TEST_P(ConsumerImplTests, Cancel) d_consumerTag, bsl::ref(d_threadPool), bsl::ref(d_eventLoop), - d_ackQueue); + d_ackQueue, + d_transformers); EXPECT_CALL(*d_channel, cancel()) .WillOnce(Return(fakeyCancelFuture.second)); @@ -289,7 +295,8 @@ TEST_P(ConsumerImplTests, Drain) d_consumerTag, bsl::ref(d_threadPool), bsl::ref(d_eventLoop), - d_ackQueue); + d_ackQueue, + d_transformers); EXPECT_CALL(*d_channel, drain()).WillOnce(Return(fakeyDrainFuture.second)); rmqt::Future<> future = consumer->drain(); @@ -313,7 +320,8 @@ TEST_P(ConsumerImplTests, UpdateCallback) d_consumerTag, bsl::ref(d_threadPool), bsl::ref(d_eventLoop), - d_ackQueue); + d_ackQueue, + d_transformers); bsl::shared_ptr exchangePtr = bsl::make_shared("exchange"); @@ -350,7 +358,8 @@ TEST_P(ConsumerImplTests, BindUnbind) d_consumerTag, bsl::ref(d_threadPool), bsl::ref(d_eventLoop), - d_ackQueue); + d_ackQueue, + d_transformers); bsl::shared_ptr exchangePtr = bsl::make_shared("exchange"); @@ -396,7 +405,8 @@ TEST_P(ConsumerImplTests, DestructionInitiatesChannelClose) d_consumerTag, bsl::ref(d_threadPool), bsl::ref(d_eventLoop), - d_ackQueue); + d_ackQueue, + d_transformers); consumer->start(); EXPECT_CALL(*d_channel, gracefulClose()); @@ -418,7 +428,8 @@ TEST_P(ConsumerImplTests, UpdateCallbackFromTwoThreadsAtOnce) d_consumerTag, bsl::ref(d_threadPool), bsl::ref(d_eventLoop), - d_ackQueue); + d_ackQueue, + d_transformers); bsl::shared_ptr exchangePtr = bsl::make_shared("exchange"); diff --git a/src/tests/rmqa/rmqa_messagetransformer.t.cpp b/src/tests/rmqa/rmqa_messagetransformer.t.cpp new file mode 100644 index 00000000..ee31847b --- /dev/null +++ b/src/tests/rmqa/rmqa_messagetransformer.t.cpp @@ -0,0 +1,200 @@ +// Copyright 2025 Bloomberg Finance L.P. +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include + +using namespace BloombergLP; +using namespace rmqa; +using namespace ::testing; + +namespace { +class TransformerTester { + public: + enum ExpectedResult { SUCCESS, IGNORED, FAILURE }; + + static void test(rmqp::MessageTransformer& transformer, + const rmqt::Message& message, + ExpectedResult expectedResult) + { + bsl::shared_ptr > data = + bsl::make_shared >( + message.payload(), message.payload() + message.payloadSize()); + rmqt::Properties props = message.properties(); + rmqt::Result result = transformer.transform(data, props); + if (expectedResult == FAILURE) { + EXPECT_FALSE(result); // Expect failure + return; + } + ASSERT_TRUE(result); // Expect success or ignored + if (expectedResult == IGNORED) { + EXPECT_FALSE(*result.value()); // Task ignored + return; + } + ASSERT_TRUE(*result.value()); // Task succeeded + + rmqt::Result<> inverseResult = + transformer.inverseTransform(data, props); + ASSERT_TRUE(inverseResult); + + bsl::vector originalData( + message.payload(), message.payload() + message.payloadSize()); + EXPECT_EQ(*data, originalData); + if (!message.properties().headers) { + // original headers empty --> new headers should be empty too + EXPECT_TRUE(!props.headers || props.headers->empty()); + } + else { + // otherwise they better exist + ASSERT_TRUE(props.headers); + EXPECT_EQ(props, message.properties()); + } + } +}; + +// Basic transform that reverse the payload to test for message integrity +// and correctness of the transformation. +class BasicMessageTransform : public rmqp::MessageTransformer { + public: + rmqt::Result transform(bsl::shared_ptr >& data, + rmqt::Properties&) BSLS_KEYWORD_OVERRIDE + { + for (size_t i = 0; i < data->size() / 2; ++i) { + uint8_t tmp = (*data)[i]; + (*data)[i] = (*data)[data->size() - 1 - i]; + (*data)[data->size() - 1 - i] = tmp; + } + return rmqt::Result(bsl::make_shared(true)); + } + + rmqt::Result<> + inverseTransform(bsl::shared_ptr >& data, + rmqt::Properties& props) BSLS_KEYWORD_OVERRIDE + { + transform(data, props); + return rmqt::Result<>(); + } + + bsl::string name() const BSLS_KEYWORD_OVERRIDE + { + return "BasicMessageTransform"; + } +}; +} // namespace + +TEST(MessageBuilderTests, BasicTransformIsValid) +{ + // Build the message + bsl::string s = "[1, 2, 3]"; + rmqt::Message message( + bsl::make_shared >(s.cbegin(), s.cend())); + + // Test + BasicMessageTransform transformer; + TransformerTester::test(transformer, message, TransformerTester::SUCCESS); +} + +#ifdef RMQCPP_ENABLE_COMPRESSION +TEST(MessageBuilderTests, CompressionTransformIsValid) +{ + // Build the messages + bsl::string s1 = "[1, 2, 3]"; + rmqt::Message msg1( + bsl::make_shared >(s1.cbegin(), s1.cend())); + + bsl::string s2 = "lorem ipsum dolor sit amet" + bsl::string(16 * 1024, 'x'); + rmqt::Message msg2( + bsl::make_shared >(s2.cbegin(), s2.cend())); + + rmqt::Result transformer = + rmqa::CompressionTransformer::create(); + + // Small messages should not be compressed + TransformerTester::test( + *transformer.value(), msg1, TransformerTester::IGNORED); + + // While larger ones should + TransformerTester::test( + *transformer.value(), msg2, TransformerTester::SUCCESS); +} + +TEST(MessageBuilderTests, ChainedCompressionIsValid) +{ + // Setup + bsl::string s = "lorem ipsum dolor sit amet" + bsl::string(16 * 1024, 'x'); + bsl::shared_ptr > data = + bsl::make_shared >(s.cbegin(), s.cend()); + rmqt::Properties props = rmqt::Message(data).properties(); + + // Build message + BasicMessageTransform t1; + rmqt::Result t2 = + rmqa::CompressionTransformer::create(); + ASSERT_TRUE(t2); + + t1.transform(data, props); + EXPECT_THAT(data->size(), Eq(s.size())); + EXPECT_THAT((*data)[0], Eq('x')); + t2.value()->transform(data, props); + EXPECT_THAT(data->size(), Lt(s.size())); + + // Unbuild message + t2.value()->inverseTransform(data, props); + EXPECT_THAT(data->size(), Eq(s.size())); + EXPECT_THAT((*data)[0], Eq('x')); + t1.inverseTransform(data, props); + EXPECT_THAT(data->size(), Eq(s.size())); + for (uint64_t i = 0; i < data->size(); ++i) { + EXPECT_THAT((*data)[i], Eq(s[i])); + } +} +#endif // RMQCPP_ENABLE_COMPRESSION + +TEST(MessageBuilderTests, ChainedTransformIsValid) +{ + // Setup + bsl::string s = "lorem ipsum dolor sit amet" + bsl::string(16 * 1024, 'x'); + bsl::shared_ptr > data = + bsl::make_shared >(s.cbegin(), s.cend()); + rmqt::Properties props = rmqt::Message(data).properties(); + + // Build message + BasicMessageTransform t1; + + t1.transform(data, props); + t1.transform(data, props); + EXPECT_THAT(data->size(), Eq(s.size())); + for (uint64_t i = 0; i < data->size(); ++i) { + EXPECT_THAT((*data)[i], Eq(s[i])); + } + + // Unbuild message + t1.inverseTransform(data, props); + t1.inverseTransform(data, props); + EXPECT_THAT(data->size(), Eq(s.size())); + for (uint64_t i = 0; i < data->size(); ++i) { + EXPECT_THAT((*data)[i], Eq(s[i])); + } +} diff --git a/src/tests/rmqtestutil/rmqtestutil_testsuite.t.h b/src/tests/rmqtestutil/rmqtestutil_testsuite.t.h index d87e224b..ad1aa334 100644 --- a/src/tests/rmqtestutil/rmqtestutil_testsuite.t.h +++ b/src/tests/rmqtestutil/rmqtestutil_testsuite.t.h @@ -12,4 +12,4 @@ #else #define RMQTESTUTIL_TESTSUITE_P INSTANTIATE_TEST_SUITE_P #endif -#endif // RMQTESTUTIL_TESTSUITE_T_H \ No newline at end of file +#endif // RMQTESTUTIL_TESTSUITE_T_H diff --git a/vcpkg.json b/vcpkg.json index cbb159bc..0faba156 100644 --- a/vcpkg.json +++ b/vcpkg.json @@ -6,6 +6,7 @@ "boost-asio", "openssl", "gtest", - "bde" + "bde", + "zstd" ] } From b754d488783703893ebe157f5ba23af5f7149e81 Mon Sep 17 00:00:00 2001 From: Will <2185386+willhoy@users.noreply.github.com> Date: Mon, 6 Oct 2025 14:42:59 +0100 Subject: [PATCH 2/3] Update requirements.txt add telnetlib3 Signed-off-by: Will <2185386+willhoy@users.noreply.github.com> --- src/tests/integration/requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/src/tests/integration/requirements.txt b/src/tests/integration/requirements.txt index 2001f3e2..7d3dcd02 100644 --- a/src/tests/integration/requirements.txt +++ b/src/tests/integration/requirements.txt @@ -1,2 +1,3 @@ requests pytest +telnetlib3 From 4fa195f2b0d9b760c52620f8d1ee769fe3bff72d Mon Sep 17 00:00:00 2001 From: Will <2185386+willhoy@users.noreply.github.com> Date: Mon, 6 Oct 2025 14:49:59 +0100 Subject: [PATCH 3/3] Update fixtures.py Signed-off-by: Will <2185386+willhoy@users.noreply.github.com> --- src/tests/integration/rmqapitests/fixtures.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tests/integration/rmqapitests/fixtures.py b/src/tests/integration/rmqapitests/fixtures.py index d0d8123a..f205d9be 100644 --- a/src/tests/integration/rmqapitests/fixtures.py +++ b/src/tests/integration/rmqapitests/fixtures.py @@ -19,7 +19,7 @@ import requests from requests.auth import HTTPBasicAuth from typing import Any -from telnetlib import Telnet +from telnetlib3 import Telnet from contextlib import contextmanager import logging import re