diff --git a/CMakeLists.txt b/CMakeLists.txt index dba77d38be..5f9aa76c28 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -137,6 +137,7 @@ openpmd_option(MPI "Parallel, Multi-Node I/O for clusters" AUTO) openpmd_option(HDF5 "HDF5 backend (.h5 files)" AUTO) openpmd_option(ADIOS2 "ADIOS2 backend (.bp files)" AUTO) openpmd_option(PYTHON "Enable Python bindings" AUTO) +openpmd_option(AWS "Enable AWS/S3 storage" AUTO) option(openPMD_INSTALL "Add installation targets" ON) option(openPMD_INSTALL_RPATH "Add RPATHs to installed binaries" ON) @@ -385,9 +386,22 @@ else() endif() unset(openPMD_REQUIRED_ADIOS2_COMPONENTS) -# external library: pybind11 (optional) -include(${openPMD_SOURCE_DIR}/cmake/dependencies/pybind11.cmake) +if(openPMD_USE_AWS STREQUAL AUTO) + find_package(AWSSDK COMPONENTS s3) + if(AWSSDK_FOUND) + set(openPMD_HAVE_AWS TRUE) + else() + set(openPMD_HAVE_AWS FALSE) + endif() +elseif(openPMD_USE_AWS) + find_package(AWSSDK REQUIRED COMPONENTS s3) + set(openPMD_HAVE_AWS TRUE) +else() + set(openPMD_HAVE_AWS FALSE) +endif() +#external library : pybind11(optional) +include(${openPMD_SOURCE_DIR}/cmake/dependencies/pybind11.cmake) # Targets ##################################################################### # @@ -434,7 +448,12 @@ set(CORE_SOURCE src/snapshots/IteratorTraits.cpp src/snapshots/RandomAccessIterator.cpp src/snapshots/Snapshots.cpp - src/snapshots/StatefulIterator.cpp) + src/snapshots/StatefulIterator.cpp + src/toolkit/ExternalBlockStorage.cpp + src/toolkit/AwsBuilder.cpp + src/toolkit/Aws.cpp + src/toolkit/StdioBuilder.cpp + src/toolkit/Stdio.cpp) set(IO_SOURCE src/IO/AbstractIOHandler.cpp src/IO/AbstractIOHandlerImpl.cpp @@ -562,7 +581,11 @@ if(openPMD_HAVE_ADIOS2) endif() endif() -# Runtime parameter and API status checks ("asserts") +if(openPMD_HAVE_AWS) + target_link_libraries(openPMD PUBLIC ${AWSSDK_LIBRARIES}) +endif() + +#Runtime parameter and API status checks("asserts") if(openPMD_USE_VERIFY) target_compile_definitions(openPMD PRIVATE openPMD_USE_VERIFY=1) else() @@ -704,6 +727,7 @@ set(openPMD_TEST_NAMES set(openPMD_CLI_TOOL_NAMES ls convert-toml-json + merge-json ) set(openPMD_PYTHON_CLI_TOOL_NAMES pipe diff --git a/include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp b/include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp index 6df0c60ced..e2ed747e3a 100644 --- a/include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp +++ b/include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp @@ -29,9 +29,11 @@ #include "openPMD/auxiliary/JSON_internal.hpp" #include "openPMD/backend/Variant_internal.hpp" #include "openPMD/config.hpp" +#include "openPMD/toolkit/ExternalBlockStorage.hpp" #include #include +#include #if openPMD_HAVE_MPI #include #endif @@ -153,8 +155,72 @@ void from_json(const nlohmann::json &j, std::complex &p) } } // namespace std +namespace openPMD::internal +{ +auto jsonDatatypeToString(Datatype dt) -> std::string; + +struct JsonDatatypeHandling +{ + template + static auto encodeDatatype(nlohmann::json &j) -> bool + { + auto const &needed_datatype = + jsonDatatypeToString(determineDatatype()); + if (auto it = j.find("datatype"); it != j.end()) + { + return it.value().get() == needed_datatype; + } + else + { + j["datatype"] = needed_datatype; + return true; + } + } + + template + static auto checkDatatype(nlohmann::json const &j) -> bool + { + auto const &needed_datatype = + jsonDatatypeToString(determineDatatype()); + if (auto it = j.find("datatype"); it != j.end()) + { + return it.value().get() == needed_datatype; + } + else + { + return false; + } + } + + template + static auto decodeDatatype(nlohmann::json const &j, Args &&...args) -> bool + { + if (auto it = j.find("datatype"); it != j.end()) + { + switchDatasetType( + stringToDatatype(it.value().get()), + std::forward(args)...); + return true; + } + else + { + return false; + } + } +}; +} // namespace openPMD::internal + namespace openPMD { +namespace dataset_mode_types +{ + struct Dataset_t + {}; + struct Template_t + {}; + using External_t = std::shared_ptr; +} // namespace dataset_mode_types + class JSONIOHandlerImpl : public AbstractIOHandlerImpl { using json = nlohmann::json; @@ -241,43 +307,9 @@ class JSONIOHandlerImpl : public AbstractIOHandlerImpl void touch(Writable *, Parameter const &) override; - std::future flush(); - -private: -#if openPMD_HAVE_MPI - std::optional m_communicator; -#endif - - using FILEHANDLE = std::fstream; - - // map each Writable to its associated file - // contains only the filename, without the OS path - std::unordered_map m_files; - - std::unordered_map> m_jsonVals; - - // files that have logically, but not physically been written to - std::unordered_set m_dirty; - - /* - * Is set by constructor. - */ - FileFormat m_fileFormat{}; + void advance(Writable *, Parameter &) override; - /* - * Under which key do we find the backend configuration? - * -> "json" for the JSON backend, "toml" for the TOML backend. - */ - std::string backendConfigKey() const; - - /* - * First return value: The location of the JSON value (either "json" or - * "toml") Second return value: The value that was maybe found at this place - */ - std::pair> - getBackendConfig(openPMD::json::TracingJSON &) const; - - std::string m_originalExtension; + std::future flush(); /* * Was the config value explicitly user-chosen, or are we still working with @@ -293,17 +325,36 @@ class JSONIOHandlerImpl : public AbstractIOHandlerImpl // Dataset IO mode // ///////////////////// - enum class DatasetMode + struct DatasetMode + : std::variant< + dataset_mode_types::Dataset_t, + dataset_mode_types::Template_t, + dataset_mode_types::External_t> { - Dataset, - Template + using Dataset_t = dataset_mode_types::Dataset_t; + using Template_t = dataset_mode_types::Template_t; + using External_t = dataset_mode_types::External_t; + constexpr static Dataset_t Dataset{}; + constexpr static Template_t Template{}; + + using variant_t = std::variant< + dataset_mode_types::Dataset_t, + dataset_mode_types::Template_t, + External_t>; + using variant_t ::operator=; + + // casts needed because of + // https://gcc.gnu.org/bugzilla/show_bug.cgi?id=90943 + inline auto as_base() const -> variant_t const & + { + return *this; + } + inline auto as_base() -> variant_t & + { + return *this; + } }; - // IOMode m_mode{}; - // SpecificationVia m_IOModeSpecificationVia = - // SpecificationVia::DefaultValue; bool m_printedSkippedWriteWarningAlready - // = false; - struct DatasetMode_s { // Initialized in init() @@ -317,9 +368,20 @@ class JSONIOHandlerImpl : public AbstractIOHandlerImpl return std::tuple{ m_mode, m_specificationVia, m_skipWarnings}; } + + template + auto mapExternalStorage(F &&functor) + { + std::visit( + auxiliary::overloaded{ + [&functor](DatasetMode::External_t &externalStorage) { + return static_cast(functor)( + externalStorage); + }, + [](auto &&) {}}, + m_mode.as_base()); + } }; - DatasetMode_s m_datasetMode; - DatasetMode_s retrieveDatasetMode(openPMD::json::TracingJSON &config) const; /////////////////////// // Attribute IO mode // @@ -338,8 +400,58 @@ class JSONIOHandlerImpl : public AbstractIOHandlerImpl AttributeMode m_mode{}; SpecificationVia m_specificationVia = SpecificationVia::DefaultValue; }; - AttributeMode_s m_attributeMode; +private: +#if openPMD_HAVE_MPI + std::optional m_communicator; +#endif + + using FILEHANDLE = std::fstream; + + // map each Writable to its associated file + // contains only the filename, without the OS path + std::unordered_map m_files; + + std::unordered_map> m_jsonVals; + + // files that have logically, but not physically been written to + std::unordered_set m_dirty; + + /* + * Is set by constructor. + */ + FileFormat m_fileFormat{}; + + /* + * Under which key do we find the backend configuration? + * -> "json" for the JSON backend, "toml" for the TOML backend. + */ + std::string backendConfigKey() const; + + /* + * First return value: The location of the JSON value (either "json" or + * "toml") Second return value: The value that was maybe found at this place + */ + std::pair> + getBackendConfig(openPMD::json::TracingJSON &) const; + static std::pair> + getBackendConfig( + openPMD::json::TracingJSON &, std::string const &configLocation); + + std::string m_originalExtension; + + /* + * In read mode, we can only open the external block storage backend upon + * opening the JSON file, because it contains meta information relevant + * for configuring the backend. + */ + std::optional + m_deferredExternalBlockstorageConfig; + DatasetMode_s m_datasetMode; + DatasetMode_s + retrieveDatasetMode(openPMD::json::TracingJSON &config, bool do_init); + + AttributeMode_s m_attributeMode; AttributeMode_s retrieveAttributeMode(openPMD::json::TracingJSON &config) const; @@ -389,7 +501,8 @@ class JSONIOHandlerImpl : public AbstractIOHandlerImpl // essentially: m_i = \prod_{j=0}^{i-1} extent_j static Extent getMultiplicators(Extent const &extent); - static std::pair getExtent(nlohmann::json &j); + static std::pair + getExtent(nlohmann::json &j, DatasetMode const &baseMode); // remove single '/' in the beginning and end of a string static std::string removeSlashes(std::string); diff --git a/include/openPMD/Series.hpp b/include/openPMD/Series.hpp index 603e540c2b..43f24822f3 100644 --- a/include/openPMD/Series.hpp +++ b/include/openPMD/Series.hpp @@ -36,6 +36,10 @@ #include "openPMD/snapshots/Snapshots.hpp" #include "openPMD/version.hpp" +#if openPMD_HAVE_AWS +#include +#endif + #if openPMD_HAVE_MPI #include #endif @@ -239,6 +243,10 @@ namespace internal std::optional> m_deferred_initialization = std::nullopt; +#if openPMD_HAVE_AWS + std::optional m_manageAwsAPI = std::nullopt; +#endif + void close(); #if openPMD_HAVE_MPI diff --git a/include/openPMD/cli/convert-toml-json.hpp b/include/openPMD/cli/convert-toml-json.hpp new file mode 100644 index 0000000000..48e4eceb25 --- /dev/null +++ b/include/openPMD/cli/convert-toml-json.hpp @@ -0,0 +1,202 @@ +/* Copyright 2026 Franz Poeschel + * + * This file is part of openPMD-api. + * + * openPMD-api is free software: you can redistribute it and/or modify + * it under the terms of of either the GNU General Public License or + * the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * openPMD-api is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License and the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU General Public License + * and the GNU Lesser General Public License along with openPMD-api. + * If not, see . + */ +#pragma once + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace from_format_to_format +{ +namespace json = openPMD::json; +struct ID +{ + template + static auto call(nlohmann::json const &&val) + // template <> + // auto call(nlohmann::json const &val) -> + // nlohmann::json const& + { + if constexpr (originallySpecifiedAs == json::SupportedLanguages::JSON) + { + return val; + } + else + { + return json::jsonToToml(val); + } + } +}; + +struct switch_ +{ + template + struct other_type; + template + static auto call(nlohmann::json const &&val) + { + return ID::call::value>( + std::move(val)); + } +}; +template <> +struct switch_::other_type +{ + static constexpr json::SupportedLanguages value = + json::SupportedLanguages::TOML; +}; +template <> +struct switch_::other_type +{ + static constexpr json::SupportedLanguages value = + json::SupportedLanguages::JSON; +}; +} // namespace from_format_to_format + +template +class convert_json_toml +{ + static void print(toml::value &val) + { + namespace json = openPMD::json; + std::cout << json::format_toml(val); + } + static void print(nlohmann::json const &val) + { + std::cout << val << '\n'; + } + static void + with_parsed_cmdline_args(openPMD::json::ParsedConfig parsed_config) + { + namespace json = openPMD::json; + auto [config, originallySpecifiedAs] = std::move(parsed_config); + switch (originallySpecifiedAs) + { + using SL = json::SupportedLanguages; + case SL::JSON: { + auto for_print = + FromFormatToFormat::template call(std::move(config)); + print(for_print); + } + break; + case SL::TOML: { + auto for_print = + FromFormatToFormat::template call(std::move(config)); + print(for_print); + } + break; + } + } + + struct ByLine : std::string + { + friend auto operator>>(std::istream &i, ByLine &l) -> std::istream & + { + decltype(auto) res = std::getline(i, l); + if (res) + { + l.insert(0, 1, '@'); + } + return res; + } + }; + using ByLineIterator = std::istream_iterator; + + template + static auto merge(It begin, It end) -> openPMD::json::ParsedConfig + { + namespace json = openPMD::json; + if (begin == end) + { + throw std::runtime_error( + "merge: need at least one JSON/TOML file."); + } + auto config = json::parseOptions( + *begin, + /* considerFiles = */ true, + /* convertLowercase = */ false); + for (++begin; begin != end; ++begin) + { + auto [next, _] = json::parseOptions( + *begin, + /* considerFiles = */ true, + /* convertLowercase = */ false); + json::merge_internal(config.config, next, /* do_prune = */ false); + } + return config; + } + +public: + enum class UseStdinAs : std::uint8_t + { + InlineJson, + ListOfJson + }; + + static void run_application( + int argc, + char const **argv, + UseStdinAs stdinconfig, + void (*print_help_message)(char const *)) + { + std::string jsonOrToml; + switch (argc) + { + case 0: + case 1: + switch (stdinconfig) + { + case UseStdinAs::InlineJson: { + // Just read the whole stream into memory + // Not very elegant, but we'll hold the entire JSON/TOML dataset + // in memory at some point anyway, so it doesn't really matter + std::stringbuf readEverything; + std::cin >> &readEverything; + jsonOrToml = readEverything.str(); + break; + } + case UseStdinAs::ListOfJson: { + auto parsed_config = + merge(ByLineIterator(std::cin), ByLineIterator{}); + with_parsed_cmdline_args(std::move(parsed_config)); + break; + } + } + break; + default: + if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-h") == 0) + { + print_help_message(argv[1]); + exit(0); + } + auto parsed_config = merge(argv + 1, argv + argc); + with_parsed_cmdline_args(std::move(parsed_config)); + break; + } + } +}; diff --git a/include/openPMD/config.hpp.in b/include/openPMD/config.hpp.in index 8df5dae9de..9335e93836 100644 --- a/include/openPMD/config.hpp.in +++ b/include/openPMD/config.hpp.in @@ -1,4 +1,4 @@ -/* Copyright 2019-2021 Axel Huebl +/* Copyright 2019-2026 Axel Huebl, Franz Poeschel, Junmin Gu * * This file is part of openPMD-api. * @@ -45,3 +45,7 @@ #ifndef openPMD_HAVE_CUDA_EXAMPLES #cmakedefine01 openPMD_HAVE_CUDA_EXAMPLES #endif + +#ifndef openPMD_HAVE_AWS +#cmakedefine01 openPMD_HAVE_AWS +#endif diff --git a/include/openPMD/toolkit/Aws.hpp b/include/openPMD/toolkit/Aws.hpp new file mode 100644 index 0000000000..e62666d81f --- /dev/null +++ b/include/openPMD/toolkit/Aws.hpp @@ -0,0 +1,91 @@ +/* Copyright 2026 Franz Poeschel + * + * This file is part of openPMD-api. + * + * openPMD-api is free software: you can redistribute it and/or modify + * it under the terms of of either the GNU General Public License or + * the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * openPMD-api is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License and the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU General Public License + * and the GNU Lesser General Public License along with openPMD-api. + * If not, see . + */ +#pragma once + +#include "openPMD/config.hpp" + +#if openPMD_HAVE_AWS + +#include "openPMD/toolkit/ExternalBlockStorage.hpp" + +#include + +#include + +namespace openPMD::internal +{ +struct AwsAsyncCounter +{ + std::mutex mutex; + std::condition_variable event; + std::size_t request_counter = 0; + // Upon C++20, we can use a std::atomic for this and ditch the + // condition_variable + mutex approach + std::size_t completion_counter = 0; + + void wait(); + void add_task(); + void add_and_notify_result(); + + ~AwsAsyncCounter(); +}; + +struct AwsAsyncHandler +{ + // We can defer std::unique_ptr operations longer than std::shared_ptr + // operations, since no one else has the memory, so use two counters. TODO: + // Add some form of restriction on how long the std::unique_ptr queue may + // become. Currently it can theoretically be spammed ad libitum. Either + // restrict the queue to a configurable length, or add a syncEverything() + // call. + AwsAsyncCounter shared_ptr_operations, unique_ptr_operations; +}; + +struct ExternalBlockStorageAws : ExternalBlockStorageBackend +{ +private: + Aws::S3::S3Client m_client; + std::string m_bucketName; + std::optional m_endpoint; + std::optional m_async; + +public: + ExternalBlockStorageAws( + Aws::S3::S3Client, + std::string bucketName, + std::optional endpoint, + bool async); + auto + put(std::string const &identifier, auxiliary::WriteBuffer data, size_t len) + -> std::string override; + void + get(std::string const &external_ref, + std::shared_ptr data, + size_t len) override; + [[nodiscard]] auto externalStorageLocation() const + -> nlohmann::json override; + void syncMandatoryOperations() override; + void syncAllOperations() override; + + ~ExternalBlockStorageAws() override; +}; +} // namespace openPMD::internal +#endif diff --git a/include/openPMD/toolkit/AwsBuilder.hpp b/include/openPMD/toolkit/AwsBuilder.hpp new file mode 100644 index 0000000000..fda657fff9 --- /dev/null +++ b/include/openPMD/toolkit/AwsBuilder.hpp @@ -0,0 +1,71 @@ +/* Copyright 2026 Franz Poeschel + * + * This file is part of openPMD-api. + * + * openPMD-api is free software: you can redistribute it and/or modify + * it under the terms of of either the GNU General Public License or + * the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * openPMD-api is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License and the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU General Public License + * and the GNU Lesser General Public License along with openPMD-api. + * If not, see . + */ +#pragma once + +#include "openPMD/config.hpp" + +#include +#include +#include +#include + +namespace openPMD +{ +class ExternalBlockStorage; +} + +namespace openPMD::internal +{ +struct AwsBuilder +{ + AwsBuilder( + std::string bucketName, std::string accessKeyId, std::string secretKey); + + enum class Scheme : uint8_t + { + HTTP, + HTTPS + }; + std::string m_bucketName; + std::string m_accessKeyId; + std::string m_secretKey; + std::optional m_sessionToken; + std::initializer_list m_credentials; + std::optional m_endpointOverride; + std::optional m_region; + std::optional m_scheme; + std::optional m_verifySSL; + std::optional m_useAsyncIO; + + auto setBucketName(std::string bucketName) -> AwsBuilder &; + auto setCredentials(std::string accessKeyId, std::string secretKey) + -> AwsBuilder &; + auto setSessionToken(std::string sessionToken) -> AwsBuilder &; + auto setEndpointOverride(std::string endpoint) -> AwsBuilder &; + auto setRegion(std::string regionName) -> AwsBuilder &; + auto setScheme(Scheme s) -> AwsBuilder &; + auto setVerifySSL(bool verify) -> AwsBuilder &; + auto setAsyncIO(bool useAsyncIO) -> AwsBuilder &; + + operator ::openPMD::ExternalBlockStorage(); + auto build() -> ::openPMD::ExternalBlockStorage; +}; +} // namespace openPMD::internal diff --git a/include/openPMD/toolkit/ExternalBlockStorage.hpp b/include/openPMD/toolkit/ExternalBlockStorage.hpp new file mode 100644 index 0000000000..8d7e734e1a --- /dev/null +++ b/include/openPMD/toolkit/ExternalBlockStorage.hpp @@ -0,0 +1,143 @@ +/* Copyright 2026 Franz Poeschel + * + * This file is part of openPMD-api. + * + * openPMD-api is free software: you can redistribute it and/or modify + * it under the terms of of either the GNU General Public License or + * the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * openPMD-api is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License and the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU General Public License + * and the GNU Lesser General Public License along with openPMD-api. + * If not, see . + */ +#pragma once + +#include "openPMD/Dataset.hpp" +#include "openPMD/auxiliary/Memory.hpp" +#include "openPMD/toolkit/AwsBuilder.hpp" +#include "openPMD/toolkit/StdioBuilder.hpp" + +#include + +#include +#include +#include +#include + +namespace openPMD +{ +class ExternalBlockStorage; +} + +namespace openPMD::internal +{ +struct ExternalBlockStorageBackend +{ + virtual auto + put(std::string const &identifier, auxiliary::WriteBuffer data, size_t len) + -> std::string = 0; + virtual void + get(std::string const &external_ref, + std::shared_ptr data, + size_t len) = 0; + [[nodiscard]] virtual auto externalStorageLocation() const + -> nlohmann::json = 0; + + virtual void syncMandatoryOperations(); + virtual void syncAllOperations(); + + virtual ~ExternalBlockStorageBackend(); +}; +} // namespace openPMD::internal + +namespace openPMD +{ +// used nowhere, just shows the signatures +// TODO: replace this with a concept upon switching to C++20 +struct DatatypeHandling_Interface +{ + /* + * Returns false if the same JSON location was previously encoded as + * another datatype. + */ + template + static auto encodeDatatype(nlohmann::json &) -> bool; + + /* + * Returns false if the encoded datatype does not match T_required + * or if no datatype has been encoded. + */ + template + static auto checkDatatype(nlohmann::json const &j) -> bool; + + /* + * Returns false if no encoded datatype could be found + */ + template + static auto decodeDatatype(nlohmann::json const &j, Args &&...args) -> bool; +}; + +class ExternalBlockStorage +{ +private: + std::unique_ptr m_worker; + ExternalBlockStorage( + std::unique_ptr); + + friend struct internal::StdioBuilder; + friend struct internal::AwsBuilder; + +public: + explicit ExternalBlockStorage(); + + static auto makeStdioSession(std::string directory) + -> internal::StdioBuilder; + static auto makeAwsSession( + std::string bucketName, std::string accessKeyId, std::string secretKey) + -> internal::AwsBuilder; + + // returns created JSON key + template + auto store( + Extent const &globalExtent, + Offset const &blockOffset, + Extent const &blockExtent, + nlohmann::json &fullJsonDataset, + nlohmann::json::json_pointer const &path, + std::optional infix, // e.g. for distinguishing MPI ranks + auxiliary::WriteBuffer data) -> std::string; + + template + void read( + std::string const &identifier, + nlohmann::json const &fullJsonDataset, + nlohmann::json::json_pointer const &path, + std::shared_ptr &data); + + template + void read( + Offset const &blockOffset, + Extent const &blockExtent, + nlohmann::json const &fullJsonDataset, + nlohmann::json::json_pointer const &path, + std::shared_ptr &data); + + void syncMandatoryOperations(); + void syncAllOperations(); + + [[nodiscard]] auto externalStorageLocation() const -> nlohmann::json; + + static void sanitizeString(std::string &s); +}; + +// Implementations + +} // namespace openPMD diff --git a/include/openPMD/toolkit/Stdio.hpp b/include/openPMD/toolkit/Stdio.hpp new file mode 100644 index 0000000000..7d07a708f1 --- /dev/null +++ b/include/openPMD/toolkit/Stdio.hpp @@ -0,0 +1,46 @@ +/* Copyright 2026 Franz Poeschel + * + * This file is part of openPMD-api. + * + * openPMD-api is free software: you can redistribute it and/or modify + * it under the terms of of either the GNU General Public License or + * the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * openPMD-api is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License and the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU General Public License + * and the GNU Lesser General Public License along with openPMD-api. + * If not, see . + */ +#pragma once + +#include "openPMD/toolkit/ExternalBlockStorage.hpp" + +namespace openPMD::internal +{ +struct ExternalBlockStorageStdio : ExternalBlockStorageBackend +{ +private: + std::string m_directory; + std::string m_openMode; + +public: + ExternalBlockStorageStdio(std::string directory, std::string openMode); + auto + put(std::string const &identifier, auxiliary::WriteBuffer data, size_t len) + -> std::string override; + void + get(std::string const &external_ref, + std::shared_ptr data, + size_t len) override; + [[nodiscard]] auto externalStorageLocation() const + -> nlohmann::json override; + ~ExternalBlockStorageStdio() override; +}; +} // namespace openPMD::internal diff --git a/include/openPMD/toolkit/StdioBuilder.hpp b/include/openPMD/toolkit/StdioBuilder.hpp new file mode 100644 index 0000000000..74d2ca3d6b --- /dev/null +++ b/include/openPMD/toolkit/StdioBuilder.hpp @@ -0,0 +1,44 @@ +/* Copyright 2026 Franz Poeschel + * + * This file is part of openPMD-api. + * + * openPMD-api is free software: you can redistribute it and/or modify + * it under the terms of of either the GNU General Public License or + * the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * openPMD-api is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License and the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU General Public License + * and the GNU Lesser General Public License along with openPMD-api. + * If not, see . + */ +#pragma once + +#include +#include + +namespace openPMD +{ +class ExternalBlockStorage; +} + +namespace openPMD::internal +{ +struct StdioBuilder +{ + std::string m_directory; + std::optional m_openMode = std::nullopt; + + auto setDirectory(std::string directory) -> StdioBuilder &; + auto setOpenMode(std::string openMode) -> StdioBuilder &; + + operator ::openPMD::ExternalBlockStorage(); + auto build() -> ::openPMD::ExternalBlockStorage; +}; +} // namespace openPMD::internal diff --git a/src/IO/JSON/JSONIOHandlerImpl.cpp b/src/IO/JSON/JSONIOHandlerImpl.cpp index 59541c1e30..3ca987aa38 100644 --- a/src/IO/JSON/JSONIOHandlerImpl.cpp +++ b/src/IO/JSON/JSONIOHandlerImpl.cpp @@ -24,6 +24,7 @@ #include "openPMD/Error.hpp" #include "openPMD/IO/AbstractIOHandler.hpp" #include "openPMD/IO/AbstractIOHandlerImpl.hpp" +#include "openPMD/IO/Access.hpp" #include "openPMD/ThrowError.hpp" #include "openPMD/auxiliary/Filesystem.hpp" #include "openPMD/auxiliary/JSONMatcher.hpp" @@ -31,10 +32,16 @@ #include "openPMD/auxiliary/Memory.hpp" #include "openPMD/auxiliary/StringManip.hpp" #include "openPMD/auxiliary/TypeTraits.hpp" +#include "openPMD/auxiliary/Variant.hpp" #include "openPMD/backend/Attribute.hpp" #include "openPMD/backend/Writable.hpp" +#include "openPMD/toolkit/ExternalBlockStorage.hpp" +#if openPMD_USE_FILESYSTEM_HEADER +#include +#endif #include +#include #include #include @@ -42,6 +49,7 @@ #include #include #include +#include namespace openPMD { @@ -140,11 +148,30 @@ namespace return *accum_ptr; } - void warnUnusedJson(openPMD::json::TracingJSON const &jsonConfig) + auto prepend_to_json(nlohmann::json j) -> nlohmann::json + { + return j; + } + + template + auto prepend_to_json(nlohmann::json j, Arg &&arg, Args &&...args) + -> nlohmann::json + { + return nlohmann::json{ + {std::forward(arg), + prepend_to_json(std::move(j), std::forward(args)...)}}; + } + + template + void warnUnusedJson( + openPMD::json::TracingJSON const &jsonConfig, + Args &&...extra_json_hierarchy) { auto shadow = jsonConfig.invertShadow(); if (shadow.size() > 0) { + shadow = prepend_to_json( + std::move(shadow), std::forward(extra_json_hierarchy)...); switch (jsonConfig.originallySpecifiedAs) { case openPMD::json::SupportedLanguages::JSON: @@ -162,7 +189,10 @@ namespace } } } +} // namespace +namespace internal +{ // Does the same as datatypeToString(), but this makes sure that we don't // accidentally change the JSON schema by modifying datatypeToString() std::string jsonDatatypeToString(Datatype dt) @@ -251,17 +281,227 @@ namespace } return "Unreachable!"; } +} // namespace internal + +namespace +{ + void parse_internal_mode( + nlohmann::json const &mode_j, + std::string const &configLocation, + JSONIOHandlerImpl::DatasetMode_s &res) + { + using DatasetMode = JSONIOHandlerImpl::DatasetMode; + using SpecificationVia = JSONIOHandlerImpl::SpecificationVia; + + DatasetMode &ioMode = res.m_mode; + SpecificationVia &specificationVia = res.m_specificationVia; + bool &skipWarnings = res.m_skipWarnings; + + auto modeOption = openPMD::json::asLowerCaseStringDynamic(mode_j); + if (!modeOption.has_value()) + { + throw error::BackendConfigSchema( + {configLocation, "mode"}, + "Invalid value of non-string type (accepted values are " + "'dataset' and 'template'."); + } + auto mode = modeOption.value(); + if (mode == "dataset") + { + ioMode = DatasetMode::Dataset; + specificationVia = SpecificationVia::Manually; + } + else if (mode == "template") + { + ioMode = DatasetMode::Template; + specificationVia = SpecificationVia::Manually; + } + else if (mode == "template_no_warn") + { + ioMode = DatasetMode::Template; + specificationVia = SpecificationVia::Manually; + skipWarnings = true; + } + else + { + throw error::BackendConfigSchema( + {configLocation, "dataset", "mode"}, + "Invalid value: '" + mode + + "' (accepted values are 'dataset' and 'template'."); + } + } + + template + auto optionalOrElse(std::optional o, OrElse &&orElse) -> T + { + if (o.has_value()) + { + return *std::move(o); + } + else + { + return std::forward(orElse)(); + } + } + + void parse_external_mode( + json::TracingJSON mode, + // In read mode, the metadata section stored under 'external_storage' + // These are default values, overridable with the first argument + std::optional previousCfg, + std::string const &configLocation, + JSONIOHandlerImpl::DatasetMode_s &res) + { + using SpecificationVia = JSONIOHandlerImpl::SpecificationVia; + using ExternalBlockStorage = openPMD::ExternalBlockStorage; + + auto get_key = + [&](char const *key) -> std::optional { + if (mode.json().contains(key)) + { + return {&mode.json({key})}; + } + else if (previousCfg.has_value() && (*previousCfg)->contains(key)) + { + return {&(**previousCfg).at(key)}; + } + else + { + return std::nullopt; + } + }; + + auto get_mandatory = [&](char const *key, + bool lowercase) -> std::string { + auto const &val = + *optionalOrElse(get_key(key), [&]() -> nlohmann::json const * { + throw error::BackendConfigSchema( + {configLocation, "dataset", "mode", key}, + "Mandatory key."); + }); + return optionalOrElse( + lowercase ? openPMD::json::asLowerCaseStringDynamic(val) + : openPMD::json::asStringDynamic(val), + [&]() -> std::string { + throw error::BackendConfigSchema( + {configLocation, "dataset", "mode", key}, + "Must be of string type."); + }); + }; + auto if_contains_optional = + [&](char const *key, bool lowercase, auto &&then) { + auto const maybeVal = get_key(key); + if (!maybeVal.has_value()) + { + return; + } + auto const &val = **maybeVal; + static_cast(then)(optionalOrElse( + lowercase ? openPMD::json::asLowerCaseStringDynamic(val) + : openPMD::json::asStringDynamic(val), + [&]() -> std::string { + throw error::BackendConfigSchema( + {configLocation, "dataset", "mode", key}, + "Must be of string type."); + })); + }; + auto if_contains_optional_bool = [&](char const *key, auto &&then) { + auto const maybeVal = get_key(key); + if (!maybeVal.has_value()) + { + return; + } + auto const &val = **maybeVal; + if (!val.is_boolean()) + { + throw error::BackendConfigSchema( + {configLocation, "dataset", "mode", key}, + "Must be of boolean type."); + } + static_cast(then)(val.get()); + }; + auto modeString = get_mandatory("provider", true); + + if (modeString == "stdio") + { + auto builder = ExternalBlockStorage::makeStdioSession( + get_mandatory("directory", false)); + + if_contains_optional("open_mode", false, [&](std::string openMode) { + builder.setOpenMode(std::move(openMode)); + }); + + res.m_mode = + std::make_shared(builder.build()); + } + else if (modeString == "aws") + { + openPMD::internal::AwsBuilder builder( + // TODO: bucket_name: introduce expansion pattern for openPMD + // file name + get_mandatory("bucket", false), + get_mandatory("access_key_id", false), + get_mandatory("secret_access_key", false)); + + if_contains_optional( + "session_token", false, [&](std::string sessionToken) { + builder.setSessionToken(std::move(sessionToken)); + }); + if_contains_optional( + "endpoint", false, [&](std::string endpointOverride) { + builder.setEndpointOverride(std::move(endpointOverride)); + }); + if_contains_optional("region", false, [&](std::string region) { + builder.setRegion(std::move(region)); + }); + if_contains_optional_bool("verify_ssl", [&](bool verifySSL) { + builder.setVerifySSL(verifySSL); + }); + if_contains_optional_bool("async_io", [&](bool useAsyncIO) { + builder.setAsyncIO(useAsyncIO); + }); + if_contains_optional( + "scheme", true, [&](std::string const &scheme) { + if (scheme == "http") + { + builder.setScheme( + openPMD::internal::AwsBuilder::Scheme::HTTP); + } + else if (scheme == "https") + { + builder.setScheme( + openPMD::internal::AwsBuilder::Scheme::HTTPS); + } + else + { + throw error::BackendConfigSchema( + {configLocation, "dataset", "mode", "scheme"}, + "Must be either 'http' or 'https'."); + } + }); + + res.m_mode = + std::make_shared(builder.build()); + } + else + { + throw error::BackendConfigSchema( + {configLocation, "dataset", "mode", "provider"}, + "Must be either 'stdio' or 'aws'."); + } + + res.m_specificationVia = SpecificationVia::Manually; + } } // namespace auto JSONIOHandlerImpl::retrieveDatasetMode( - openPMD::json::TracingJSON &config) const -> DatasetMode_s + openPMD::json::TracingJSON &config, bool do_init) -> DatasetMode_s { // start with / copy from current config auto res = m_datasetMode; - DatasetMode &ioMode = res.m_mode; - SpecificationVia &specificationVia = res.m_specificationVia; - bool &skipWarnings = res.m_skipWarnings; - if (auto [configLocation, maybeConfig] = getBackendConfig(config); + + if (auto [configLocation, maybeConfig] = + getBackendConfig(config, backendConfigKey()); maybeConfig.has_value()) { auto jsonConfig = maybeConfig.value(); @@ -270,38 +510,28 @@ auto JSONIOHandlerImpl::retrieveDatasetMode( auto datasetConfig = jsonConfig["dataset"]; if (datasetConfig.json().contains("mode")) { - auto modeOption = openPMD::json::asLowerCaseStringDynamic( - datasetConfig["mode"].json()); - if (!modeOption.has_value()) + auto mode = datasetConfig["mode"]; + if (mode.json().is_object()) { - throw error::BackendConfigSchema( - {configLocation, "mode"}, - "Invalid value of non-string type (accepted values are " - "'dataset' and 'template'."); - } - auto mode = modeOption.value(); - if (mode == "dataset") - { - ioMode = DatasetMode::Dataset; - specificationVia = SpecificationVia::Manually; - } - else if (mode == "template") - { - ioMode = DatasetMode::Template; - specificationVia = SpecificationVia::Manually; - } - else if (mode == "template_no_warn") - { - ioMode = DatasetMode::Template; - specificationVia = SpecificationVia::Manually; - skipWarnings = true; + if (!do_init || + access::writeOnly(m_handler->m_backendAccess)) + { + parse_external_mode( + std::move(mode), std::nullopt, configLocation, res); + } + else + { + // sic! initialize the deferred json config as a new + // tracing object + m_deferredExternalBlockstorageConfig = + std::make_optional( + mode.json(), mode.originallySpecifiedAs); + config.declareFullyRead(); + } } else { - throw error::BackendConfigSchema( - {configLocation, "dataset", "mode"}, - "Invalid value: '" + mode + - "' (accepted values are 'dataset' and 'template'."); + parse_internal_mode(mode.json(), configLocation, res); } } } @@ -373,7 +603,13 @@ std::string JSONIOHandlerImpl::backendConfigKey() const std::pair> JSONIOHandlerImpl::getBackendConfig(openPMD::json::TracingJSON &config) const { - std::string configLocation = backendConfigKey(); + return getBackendConfig(config, backendConfigKey()); +} + +std::pair> +JSONIOHandlerImpl::getBackendConfig( + openPMD::json::TracingJSON &config, std::string const &configLocation) +{ if (config.json().contains(configLocation)) { return std::make_pair( @@ -431,7 +667,7 @@ void JSONIOHandlerImpl::init(openPMD::json::TracingJSON config) } // now modify according to config - m_datasetMode = retrieveDatasetMode(config); + m_datasetMode = retrieveDatasetMode(config, /* do_init = */ true); m_attributeMode = retrieveAttributeMode(config); if (auto [_, backendConfig] = getBackendConfig(config); @@ -457,6 +693,9 @@ std::future JSONIOHandlerImpl::flush() putJsonContents(file, false); } m_dirty.clear(); + this->m_datasetMode.mapExternalStorage([](auto &externalStorage) { + externalStorage->syncMandatoryOperations(); + }); return std::future(); } @@ -467,6 +706,14 @@ void JSONIOHandlerImpl::createFile( access::write(m_handler->m_backendAccess), "[JSON] Creating a file in read-only mode is not possible."); + if (m_deferredExternalBlockstorageConfig.has_value()) + { + throw error::Internal( + "Creation of external block storage backend was deferred until " + "opening the first file, but a file is created before any was " + "opened."); + } + /* * Need to resolve this later than init() since the openPMD version might be * specified after the creation of the IOHandler. @@ -488,6 +735,9 @@ void JSONIOHandlerImpl::createFile( if (!writable->written) { + m_datasetMode.mapExternalStorage([](auto &externalStorage) { + externalStorage->syncAllOperations(); + }); std::string name = parameters.name + m_originalExtension; auto res_pair = getPossiblyExisting(name); @@ -531,6 +781,10 @@ void JSONIOHandlerImpl::createFile( writable->written = true; writable->abstractFilePosition = std::make_shared(); } + else + { + throw error::Internal("This should not happen."); + } } void JSONIOHandlerImpl::checkFile( @@ -603,7 +857,8 @@ void JSONIOHandlerImpl::createDataset( parameter.options, /* considerFiles = */ false); // Retrieves mode from dataset-specific configuration, falls back to global // value if not defined - auto [localMode, _, skipWarnings] = retrieveDatasetMode(config); + auto [localMode, _, skipWarnings] = + retrieveDatasetMode(config, /* do_init = */ false); (void)_; // No use in introducing logic to skip warnings only for one particular // dataset. If warnings are skipped, then they are skipped consistently. @@ -633,49 +888,53 @@ void JSONIOHandlerImpl::createDataset( } setAndGetFilePosition(writable, name); auto &dset = jsonVal[name]; - dset["datatype"] = jsonDatatypeToString(parameter.dtype); + dset["datatype"] = internal::jsonDatatypeToString(parameter.dtype); + + std::visit( + auxiliary::overloaded{ + [&](DatasetMode::Dataset_t const &) { + auto extent = parameter.extent; + switch (parameter.dtype) + { + case Datatype::CFLOAT: + case Datatype::CDOUBLE: + case Datatype::CLONG_DOUBLE: { + extent.push_back(2); + break; + } + default: + break; + } + if (parameter.extent.size() != 1 || + parameter.extent[0] != Dataset::UNDEFINED_EXTENT) + { + // TOML does not support nulls, so initialize with zero + dset["data"] = initializeNDArray( + extent, + m_fileFormat == FileFormat::Json + ? std::optional{} + : parameter.dtype); + } + }, + [&](DatasetMode::Template_t const &) { + if (parameter.extent != Extent{0} && + parameter.extent[0] != Dataset::UNDEFINED_EXTENT) + { + dset["extent"] = parameter.extent; + } + else + { + // no-op + // If extent is empty or no datatype is defined, don't + // bother writing it. The datatype is written above + // anyway. + } + }, + [&](DatasetMode::External_t const &) { + dset["extent"] = parameter.extent; + }}, + localMode.as_base()); - switch (localMode) - { - case DatasetMode::Dataset: { - auto extent = parameter.extent; - switch (parameter.dtype) - { - case Datatype::CFLOAT: - case Datatype::CDOUBLE: - case Datatype::CLONG_DOUBLE: { - extent.push_back(2); - break; - } - default: - break; - } - if (parameter.extent.size() != 1 || - parameter.extent[0] != Dataset::UNDEFINED_EXTENT) - { - // TOML does not support nulls, so initialize with zero - dset["data"] = initializeNDArray( - extent, - m_fileFormat == FileFormat::Json ? std::optional{} - : parameter.dtype); - } - break; - } - case DatasetMode::Template: - if (parameter.extent != Extent{0} && - parameter.extent[0] != Dataset::UNDEFINED_EXTENT) - { - dset["extent"] = parameter.extent; - } - else - { - // no-op - // If extent is empty or no datatype is defined, don't bother - // writing it. - // The datatype is written above anyway. - } - break; - } writable->written = true; m_dirty.emplace(file); } @@ -725,7 +984,8 @@ void JSONIOHandlerImpl::extendDataset( try { Extent datasetExtent; - std::tie(datasetExtent, localIOMode) = getExtent(j); + std::tie(datasetExtent, localIOMode) = + getExtent(j, m_datasetMode.m_mode); VERIFY_ALWAYS( datasetExtent.size() == parameters.extent.size(), "[JSON] Cannot change dimensionality of a dataset") @@ -743,38 +1003,40 @@ void JSONIOHandlerImpl::extendDataset( "[JSON] The specified location contains no valid dataset"); } - switch (localIOMode) - { - case DatasetMode::Dataset: { - auto extent = parameters.extent; - auto datatype = stringToDatatype(j["datatype"].get()); - switch (datatype) - { - case Datatype::CFLOAT: - case Datatype::CDOUBLE: - case Datatype::CLONG_DOUBLE: { - extent.push_back(2); - break; - } - default: - // nothing to do - break; - } - // TOML does not support nulls, so initialize with zero - nlohmann::json newData = initializeNDArray( - extent, - m_fileFormat == FileFormat::Json ? std::optional{} - : datatype); - nlohmann::json &oldData = j["data"]; - mergeInto(newData, oldData); - j["data"] = newData; - } - break; - case DatasetMode::Template: { - j["extent"] = parameters.extent; - } - break; - } + std::visit( + auxiliary::overloaded{ + [&](DatasetMode::Dataset_t const &) { + auto extent = parameters.extent; + auto datatype = + stringToDatatype(j["datatype"].get()); + switch (datatype) + { + case Datatype::CFLOAT: + case Datatype::CDOUBLE: + case Datatype::CLONG_DOUBLE: { + extent.push_back(2); + break; + } + default: + // nothing to do + break; + } + // TOML does not support nulls, so initialize with zero + nlohmann::json newData = initializeNDArray( + extent, + m_fileFormat == FileFormat::Json ? std::optional{} + : datatype); + nlohmann::json &oldData = j["data"]; + mergeInto(newData, oldData); + j["data"] = newData; + }, + [&](DatasetMode::Template_t const &) { + j["extent"] = parameters.extent; + }, + [&](DatasetMode::External_t const &) { + j["extent"] = parameters.extent; + }}, + localIOMode.as_base()); writable->written = true; } @@ -882,9 +1144,44 @@ void JSONIOHandlerImpl::availableChunks( { refreshFileFromParent(writable); auto filePosition = setAndGetFilePosition(writable); - auto &j = obtainJsonContents(writable)["data"]; - *parameters.chunks = chunksInJSON(j); - chunk_assignment::mergeChunks(*parameters.chunks); + auto &j = obtainJsonContents(writable); + + auto [extent, datasetmode] = getExtent(j, m_datasetMode.m_mode); + + std::visit( + auxiliary::overloaded{ + [&](DatasetMode::Dataset_t const &) { + *parameters.chunks = chunksInJSON(j.at("data")); + chunk_assignment::mergeChunks(*parameters.chunks); + }, + [&](DatasetMode::Template_t const &) { + /* no-op, no chunks to be loaded */ + }, + [&](DatasetMode::External_t &) { + auto external_blocks = j.at("external_blocks"); + auto &res = *parameters.chunks; + res.reserve(external_blocks.size()); + for (auto it = external_blocks.begin(); + it != external_blocks.end(); + ++it) + { + auto const &block = it.value(); + try + { + auto const &o = block.at("offset").get(); + auto const &e = block.at("extent").get(); + res.emplace_back(o, e); + } + catch (nlohmann::json::exception const &e) + { + std::cerr << "[JSONIOHandlerImpl::availableChunks] " + "Could not parse block '" + << it.key() << "'. Original error was:\n" + << e.what(); + } + } + }}, + datasetmode.as_base()); } void JSONIOHandlerImpl::openFile( @@ -903,6 +1200,16 @@ void JSONIOHandlerImpl::openFile( auto file = std::get<0>(getPossiblyExisting(name)); + // Need to access data in order to resolve external block storage + // configuration. EBS for read modes is configured at two places: + // + // 1. In the JSON config (stored at m_deferredExternalBlockstorageConfig) + // 2. In the previous JSON file that we are now opening + // + // Since the configuration may exclusively take place in either of the two + // options, files need to be opened now in any case. + obtainJsonContents(file); + associateWithFile(writable, file); writable->written = true; @@ -970,7 +1277,7 @@ void JSONIOHandlerImpl::openDataset( *parameters.dtype = Datatype(stringToDatatype(datasetJson["datatype"].get())); - *parameters.extent = getExtent(datasetJson).first; + *parameters.extent = getExtent(datasetJson, m_datasetMode.m_mode).first; writable->written = true; } @@ -1139,6 +1446,45 @@ void JSONIOHandlerImpl::deleteAttribute( j.erase(parameters.name); } +namespace +{ + template + auto + write_rank_to_stream_with_sufficient_padding(Stream &s, int rank, int size) + -> Stream & + { + auto num_digits = [](unsigned n) -> unsigned { + constexpr auto max = std::numeric_limits::max(); + unsigned base_10 = 1; + unsigned res = 1; + while (base_10 < max) + { + base_10 *= 10; + if (n / base_10 == 0) + { + return res; + } + ++res; + } + return res; + }; + s << std::setw(num_digits(size - 1)) << std::setfill('0') << rank; + return s; + } + + struct StoreExternally + { + template + static void call(ExternalBlockStorage &blockStorage, Args &&...args) + { + blockStorage.store( + std::forward(args)...); + } + + static constexpr char const *errorMsg = "StoreExternally"; + }; +} // namespace + void JSONIOHandlerImpl::writeDataset( Writable *writable, Parameter ¶meters) { @@ -1148,25 +1494,53 @@ void JSONIOHandlerImpl::writeDataset( auto pos = setAndGetFilePosition(writable); auto file = refreshFileFromParent(writable); - auto &j = obtainJsonContents(writable); - - switch (verifyDataset(parameters, j)) - { - case DatasetMode::Dataset: - break; - case DatasetMode::Template: - if (!m_datasetMode.m_skipWarnings) - { - std::cerr - << "[JSON/TOML backend: Warning] Trying to write data to a " - "template dataset. Will skip." - << '\n'; - m_datasetMode.m_skipWarnings = true; - } - return; - } - - switchType(parameters.dtype, j, parameters); + auto filePosition = setAndGetFilePosition(writable, false); + auto &jsonRoot = *obtainJsonContents(file); + auto &j = jsonRoot[filePosition->id]; + + std::visit( + auxiliary::overloaded{ + [&](DatasetMode::Dataset_t const &) { + switchType(parameters.dtype, j, parameters); + }, + [&](DatasetMode::Template_t const &) { + if (!m_datasetMode.m_skipWarnings) + { + std::cerr << "[JSON/TOML backend: Warning] Trying to write " + "data to a " + "template dataset. Will skip." + << '\n'; + m_datasetMode.m_skipWarnings = true; + } + }, + [&](DatasetMode::External_t const &external) { + std::optional rankInfix; +#if openPMD_HAVE_MPI + if (m_communicator.has_value()) + { + auto &comm = *m_communicator; + // TODO maybe cache the result for this computation + int rank, size; + MPI_Comm_rank(comm, &rank); + MPI_Comm_size(comm, &size); + std::stringstream s; + s << "r"; + write_rank_to_stream_with_sufficient_padding(s, rank, size); + rankInfix = s.str(); + } +#endif + switchDatasetType( + parameters.dtype, + *external, + j.at("extent").get(), + parameters.offset, + parameters.extent, + jsonRoot, + filePosition->id, + std::move(rankInfix), + std::move(parameters.data)); + }}, + verifyDataset(parameters, j).as_base()); writable->written = true; } @@ -1204,7 +1578,7 @@ void JSONIOHandlerImpl::writeAttribute( { case AttributeMode::Long: (*jsonVal)[filePosition->id]["attributes"][name] = { - {"datatype", jsonDatatypeToString(parameter.dtype)}, + {"datatype", internal::jsonDatatypeToString(parameter.dtype)}, {"value", value}}; break; case AttributeMode::Short: @@ -1235,40 +1609,65 @@ namespace static constexpr char const *errorMsg = "[JSON Backend] Fill with zeroes."; }; + + struct RetrieveExternally + { + template + static void call(ExternalBlockStorage &blockStorage, Args &&...args) + { + blockStorage.read( + std::forward(args)...); + } + + static constexpr char const *errorMsg = "RetrieveExternally"; + }; } // namespace void JSONIOHandlerImpl::readDataset( Writable *writable, Parameter ¶meters) { - refreshFileFromParent(writable); - setAndGetFilePosition(writable); - auto &j = obtainJsonContents(writable); + auto file = refreshFileFromParent(writable); + auto filePosition = setAndGetFilePosition(writable); + auto &jsonRoot = *obtainJsonContents(file); + auto &j = jsonRoot[filePosition->id]; DatasetMode localMode = verifyDataset(parameters, j); - switch (localMode) - { - case DatasetMode::Template: - std::cerr << "[Warning] Cannot read chunks in Template mode of JSON " - "backend. Will fill with zeroes instead." - << '\n'; - switchNonVectorType( - parameters.dtype, parameters.data.get(), parameters.extent); - return; - case DatasetMode::Dataset: - try - { - switchType(parameters.dtype, j["data"], parameters); - } - catch (json::basic_json::type_error &) - { - throw error::ReadError( - error::AffectedObject::Dataset, - error::Reason::UnexpectedContent, - "JSON", - "The given path does not contain a valid dataset."); - } - break; - } + std::visit( + auxiliary::overloaded{ + [&](DatasetMode::Dataset_t const &) { + try + { + switchType( + parameters.dtype, j["data"], parameters); + } + catch (json::basic_json::type_error &) + { + throw error::ReadError( + error::AffectedObject::Dataset, + error::Reason::UnexpectedContent, + "JSON", + "The given path does not contain a valid dataset."); + } + }, + [&](DatasetMode::Template_t const &) { + std::cerr + << "[Warning] Cannot read chunks in Template mode of JSON " + "backend. Will fill with zeroes instead." + << '\n'; + switchNonVectorType( + parameters.dtype, parameters.data.get(), parameters.extent); + }, + [&](DatasetMode::External_t &external) { + switchDatasetType( + parameters.dtype, + *external, + parameters.offset, + parameters.extent, + jsonRoot, + filePosition->id, + parameters.data); + }}, + localMode.as_base()); } namespace @@ -1655,6 +2054,20 @@ void JSONIOHandlerImpl::touch( } } +void JSONIOHandlerImpl::advance( + Writable *w, Parameter ¶m) +{ + AbstractIOHandlerImpl::advance(w, param); + + if (access::linear(m_handler->m_backendAccess) && + access::writeOnly(m_handler->m_backendAccess)) + { + m_datasetMode.mapExternalStorage([](auto &externalStorage) { + externalStorage->syncAllOperations(); + }); + } +} + auto JSONIOHandlerImpl::getFilehandle(File const &fileName, Access access) -> std::tuple, std::istream *, std::ostream *> { @@ -1790,7 +2203,8 @@ Extent JSONIOHandlerImpl::getMultiplicators(Extent const &extent) return res; } -auto JSONIOHandlerImpl::getExtent(nlohmann::json &j) +auto JSONIOHandlerImpl::getExtent( + nlohmann::json &j, DatasetMode const &baseMode) -> std::pair { Extent res; @@ -1819,7 +2233,10 @@ auto JSONIOHandlerImpl::getExtent(nlohmann::json &j) } else if (j.contains("extent")) { - ioMode = DatasetMode::Template; + ioMode = + std::holds_alternative(baseMode.as_base()) + ? baseMode + : DatasetMode{DatasetMode::Template}; res = j["extent"].get(); } else @@ -1964,6 +2381,9 @@ JSONIOHandlerImpl::obtainJsonContents(File const &file) auto res = serialImplementation(); #endif + bool initialize_external_block_storage = + m_deferredExternalBlockstorageConfig.has_value(); + if (res->contains(JSONDefaults::openpmd_internal)) { auto const &openpmd_internal = res->at(JSONDefaults::openpmd_internal); @@ -1994,6 +2414,10 @@ JSONIOHandlerImpl::obtainJsonContents(File const &file) { m_datasetMode.m_mode = DatasetMode::Template; } + else if (modeOption.value() == "external") + { + initialize_external_block_storage = true; + } else { std::cerr << "[JSON/TOML backend] Warning: Invalid value '" @@ -2037,6 +2461,31 @@ JSONIOHandlerImpl::obtainJsonContents(File const &file) } } } + + if (initialize_external_block_storage) + { + auto previousConfig = [&]() -> std::optional { + if (res->contains("external_storage")) + { + return std::make_optional( + &res->at("external_storage")); + } + else + { + return std::nullopt; + } + }(); + auto manual_config = m_deferredExternalBlockstorageConfig.has_value() + ? std::move(*m_deferredExternalBlockstorageConfig) + : openPMD::json::TracingJSON(); + parse_external_mode( + manual_config, previousConfig, backendConfigKey(), m_datasetMode); + warnUnusedJson(manual_config, "dataset", "mode"); + m_attributeMode.m_specificationVia = SpecificationVia::Manually; + + m_deferredExternalBlockstorageConfig.reset(); + } + m_jsonVals.emplace(file, res); return res; } @@ -2062,18 +2511,25 @@ auto JSONIOHandlerImpl::putJsonContents( return it; } - switch (m_datasetMode.m_mode) - { - case DatasetMode::Dataset: - (*it->second)["platform_byte_widths"] = platformSpecifics(); - (*it->second)[JSONDefaults::openpmd_internal] - [JSONDefaults::DatasetMode] = "dataset"; - break; - case DatasetMode::Template: - (*it->second)[JSONDefaults::openpmd_internal] - [JSONDefaults::DatasetMode] = "template"; - break; - } + std::visit( + auxiliary::overloaded{ + [&](DatasetMode::Dataset_t const &) { + (*it->second)["platform_byte_widths"] = platformSpecifics(); + (*it->second)[JSONDefaults::openpmd_internal] + [JSONDefaults::DatasetMode] = "dataset"; + }, + [&](DatasetMode::Template_t const &) { + (*it->second)[JSONDefaults::openpmd_internal] + [JSONDefaults::DatasetMode] = "template"; + }, + [&](DatasetMode::External_t const &external) { + (*it->second)["platform_byte_widths"] = platformSpecifics(); + (*it->second)["external_storage"] = + external->externalStorageLocation(); + (*it->second)[JSONDefaults::openpmd_internal] + [JSONDefaults::DatasetMode] = "external"; + }}, + m_datasetMode.m_mode.as_base()); switch (m_attributeMode.m_mode) { @@ -2112,53 +2568,37 @@ auto JSONIOHandlerImpl::putJsonContents( }; #if openPMD_HAVE_MPI - auto num_digits = [](unsigned n) -> unsigned { - constexpr auto max = std::numeric_limits::max(); - unsigned base_10 = 1; - unsigned res = 1; - while (base_10 < max) - { - base_10 *= 10; - if (n / base_10 == 0) - { - return res; - } - ++res; + auto parallelImplementation = [this, &filename, &writeSingleFile]( + MPI_Comm comm) { + auto path = fullPath(*filename); + auto dirpath = path + ".parallel"; + if (!auxiliary::create_directories(dirpath)) + { + throw std::runtime_error( + "Failed creating directory '" + dirpath + + "' for parallel JSON output"); } - return res; - }; - - auto parallelImplementation = - [this, &filename, &writeSingleFile, &num_digits](MPI_Comm comm) { - auto path = fullPath(*filename); - auto dirpath = path + ".parallel"; - if (!auxiliary::create_directories(dirpath)) - { - throw std::runtime_error( - "Failed creating directory '" + dirpath + - "' for parallel JSON output"); - } - int rank = 0, size = 0; - MPI_Comm_rank(comm, &rank); - MPI_Comm_size(comm, &size); - std::stringstream subfilePath; - // writeSingleFile will prepend the base dir - subfilePath << *filename << ".parallel/mpi_rank_" - << std::setw(num_digits(size - 1)) << std::setfill('0') - << rank << [&]() { - switch (m_fileFormat) - { - case FileFormat::Json: - return ".json"; - case FileFormat::Toml: - return ".toml"; - } - throw std::runtime_error("Unreachable!"); - }(); - writeSingleFile(subfilePath.str()); - if (rank == 0) - { - constexpr char const *readme_msg = R"( + int rank = 0, size = 0; + MPI_Comm_rank(comm, &rank); + MPI_Comm_size(comm, &size); + std::stringstream subfilePath; + // writeSingleFile will prepend the base dir + subfilePath << *filename << ".parallel/mpi_rank_"; + write_rank_to_stream_with_sufficient_padding(subfilePath, rank, size) + << [&]() { + switch (m_fileFormat) + { + case FileFormat::Json: + return ".json"; + case FileFormat::Toml: + return ".toml"; + } + throw std::runtime_error("Unreachable!"); + }(); + writeSingleFile(subfilePath.str()); + if (rank == 0) + { + constexpr char const *readme_msg = R"( This folder has been created by a parallel instance of the JSON backend in openPMD. There is one JSON file for each parallel writer MPI rank. The parallel JSON backend performs no metadata or data aggregation at all. @@ -2168,26 +2608,90 @@ There is no support in the openPMD-api for reading this folder as a single dataset. For reading purposes, either pick a single .json file and read that, or merge the .json files somehow (no tooling provided for this (yet)). )"; - std::fstream readme_file; - readme_file.open( - dirpath + "/README.txt", - std::ios_base::out | std::ios_base::trunc); - readme_file << readme_msg + 1; - readme_file.close(); - if (!readme_file.good() && - !filename.fileState->printedReadmeWarningAlready) - { - std::cerr - << "[Warning] Something went wrong in trying to create " - "README file at '" - << dirpath - << "/README.txt'. Will ignore and continue. The README " - "message would have been:\n----------\n" - << readme_msg + 1 << "----------" << std::endl; - filename.fileState->printedReadmeWarningAlready = true; - } + std::fstream readme_file; + readme_file.open( + dirpath + "/README.txt", + std::ios_base::out | std::ios_base::trunc); + readme_file << &readme_msg[1]; + readme_file.close(); + if (!readme_file.good() && + !filename.fileState->printedReadmeWarningAlready) + { + std::cerr + << "[Warning] Something went wrong in trying to create " + "README file at '" + << dirpath + << "/README.txt'. Will ignore and continue. The README " + "message would have been:\n----------\n" + << readme_msg + 1 << "----------" << std::endl; + filename.fileState->printedReadmeWarningAlready = true; } - }; + + constexpr char const *merge_script = R"END( +#!/usr/bin/env bash + +set -euo pipefail + +parallel_dir="$(dirname "$BASH_SOURCE")" +parallel_dir="$(cd "$parallel_dir" && pwd)" +serial_dir="${parallel_dir%.json.parallel}" +if [[ "$serial_dir" = "$parallel_dir" ]]; then + serial_dir="$parallel_dir/merged.json" +else + serial_dir="$serial_dir.json" +fi +echo "Will merge files to '$serial_dir'." >&2 +if [[ -e "$serial_dir" ]]; then + echo "Target file already exists, aborting." >&2 + exit 1 +fi +if ! which openpmd-merge-json >/dev/null 2>&1; then + echo "Did not find 'openpmd-merge-json' on PATH, aborting." >&2 + exit 1 +fi +for file in "$parallel_dir"/mpi_rank_*.json; do + echo "$file" +done | + openpmd-merge-json >"$serial_dir" +)END"; + std::string const merge_script_path = dirpath + "/merge.sh"; + std::fstream merge_file; + merge_file.open( + merge_script_path, std::ios_base::out | std::ios_base::trunc); + merge_file << &merge_script[1]; + merge_file.close(); + + if (!merge_file.good() && + !filename.fileState->printedReadmeWarningAlready) + { + std::cerr + << "[Warning] Something went wrong in trying to create " + "merge script at '" + << merge_script_path << "'. Will ignore and continue." + << std::endl; + filename.fileState->printedReadmeWarningAlready = true; + } + +#if openPMD_USE_FILESYSTEM_HEADER + try + { + std::filesystem::permissions( + merge_script_path, + std::filesystem::perms::owner_exec | + std::filesystem::perms::owner_exec | + std::filesystem::perms::owner_exec, + std::filesystem::perm_options::add); + } + catch (std::filesystem::filesystem_error const &e) + { + std::cerr << "Failed setting executable permissions on '" + << merge_script_path + << "', will ignore. Original error was:\n" + << e.what() << std::endl; + } +#endif + } + }; std::shared_ptr res; if (m_communicator.has_value()) @@ -2318,7 +2822,7 @@ auto JSONIOHandlerImpl::verifyDataset( try { Extent datasetExtent; - std::tie(datasetExtent, res) = getExtent(j); + std::tie(datasetExtent, res) = getExtent(j, m_datasetMode.m_mode); VERIFY_ALWAYS( datasetExtent.size() == parameters.extent.size(), "[JSON] Read/Write request does not fit the dataset's dimension"); @@ -2366,7 +2870,7 @@ nlohmann::json JSONIOHandlerImpl::platformSpecifics() Datatype::BOOL}; for (auto &datatype : datatypes) { - res[jsonDatatypeToString(datatype)] = toBytes(datatype); + res[internal::jsonDatatypeToString(datatype)] = toBytes(datatype); } return res; } diff --git a/src/Series.cpp b/src/Series.cpp index 6d13de73c3..2efcec63b5 100644 --- a/src/Series.cpp +++ b/src/Series.cpp @@ -42,12 +42,17 @@ #include "openPMD/backend/Attributable.hpp" #include "openPMD/backend/Attribute.hpp" #include "openPMD/backend/Variant_internal.hpp" +#include "openPMD/config.hpp" #include "openPMD/snapshots/ContainerImpls.hpp" #include "openPMD/snapshots/ContainerTraits.hpp" #include "openPMD/snapshots/Snapshots.hpp" #include "openPMD/snapshots/StatefulIterator.hpp" #include "openPMD/version.hpp" +#if openPMD_HAVE_AWS +#include +#endif + #include #include #include @@ -1067,38 +1072,25 @@ void Series::init( } } -template -auto Series::initIOHandler( - std::string const &filepath, - std::string const &options, - Access at, - bool resolve_generic_extension, - MPI_Communicator &&...comm) - -> std::tuple, TracingJSON> +namespace { - auto &series = get(); - - json::TracingJSON optionsJson = json::parseOptions( - options, - std::forward(comm)..., - /* considerFiles = */ true); - auto input = parseInput(filepath); - if (resolve_generic_extension && input->format == Format::GENERIC && - !access::create(at)) + template + void do_resolve_generic_extension_read( + ParsedInput_t &input, std::string const &filepath, Access at) { auto isPartOfSeries = - input->iterationEncoding == IterationEncoding::fileBased + input.iterationEncoding == IterationEncoding::fileBased ? matcher( - input->filenamePrefix, - input->filenamePadding, - input->filenamePostfix, + input.filenamePrefix, + input.filenamePadding, + input.filenamePostfix, std::nullopt) - : matcher(input->name, -1, "", std::nullopt); + : matcher(input.name, -1, "", std::nullopt); std::optional extension; std::set additional_extensions; autoDetectPadding( isPartOfSeries, - input->path, + input.path, [&extension, &additional_extensions](std::string const &, Match const &match) { auto const &ext = match.extension.value(); @@ -1131,8 +1123,8 @@ auto Series::initIOHandler( std::nullopt, error.str()); } - input->filenameExtension = *extension; - input->format = determineFormat(*extension); + input.filenameExtension = *extension; + input.format = determineFormat(*extension); } else if (access::read(at)) { @@ -1144,30 +1136,70 @@ auto Series::initIOHandler( } } + template + void do_resolve_generic_extension_write(ParsedInput_t &input) + { + { + if (input.format == /* still */ Format::GENERIC) + { + throw error::WrongAPIUsage( + "Unable to automatically determine filename extension. " + "Please " + "specify in some way."); + } + else if (input.format == Format::ADIOS2_BP) + { + // Since ADIOS2 has multiple extensions depending on the engine, + // we need to pass this job on to the backend + input.filenameExtension = ".%E"; + } + else + { + input.filenameExtension = suffix(input.format); + } + } + } +} // namespace + +template +auto Series::initIOHandler( + std::string const &filepath, + std::string const &options, + Access at, + bool resolve_generic_extension, + MPI_Communicator &&...comm) + -> std::tuple, TracingJSON> +{ + auto &series = get(); + + json::TracingJSON optionsJson = json::parseOptions( + options, + std::forward(comm)..., + /* considerFiles = */ true); + auto input = parseInput(filepath); + + if (resolve_generic_extension && input->format == Format::GENERIC && + !access::create(at)) + { + do_resolve_generic_extension_read(*input, filepath, at); + } + // default options series.m_parseLazily = at == Access::READ_LINEAR; // now check for user-specified options parseJsonOptions(optionsJson, *input); +#if openPMD_HAVE_AWS + if (series.m_manageAwsAPI.has_value()) + { + Aws::InitAPI(*series.m_manageAwsAPI); + } +#endif + if (resolve_generic_extension && !input->filenameExtension.has_value()) { - if (input->format == /* still */ Format::GENERIC) - { - throw error::WrongAPIUsage( - "Unable to automatically determine filename extension. Please " - "specify in some way."); - } - else if (input->format == Format::ADIOS2_BP) - { - // Since ADIOS2 has multiple extensions depending on the engine, - // we need to pass this job on to the backend - input->filenameExtension = ".%E"; - } - else - { - input->filenameExtension = suffix(input->format); - } + do_resolve_generic_extension_write(*input); } return std::make_tuple(std::move(input), std::move(optionsJson)); } @@ -3175,6 +3207,16 @@ void Series::parseJsonOptions(TracingJSON &options, ParsedInput &input) { series.m_rankTable.m_rankTableSource = std::move(rankTableSource); } +#if openPMD_HAVE_AWS + { + bool doManageAwsAPI = false; + getJsonOption(options, "init_aws_api", doManageAwsAPI); + if (doManageAwsAPI) + { + series.m_manageAwsAPI = std::make_optional(); + } + } +#endif // backend key { std::map const backendDescriptors{ @@ -3261,7 +3303,16 @@ namespace internal // we must not throw in a destructor try { + // The order of operations is important: + // close() might need to wait for a number of remaining Aws + // operations to finish, so the AwsAPI needs to stay open for that. close(); +#if openPMD_HAVE_AWS + if (m_manageAwsAPI.has_value()) + { + Aws::ShutdownAPI(*m_manageAwsAPI); + } +#endif } catch (std::exception const &ex) { diff --git a/src/cli/convert-toml-json.cpp b/src/cli/convert-toml-json.cpp index d930fa156b..c4ad79db4c 100644 --- a/src/cli/convert-toml-json.cpp +++ b/src/cli/convert-toml-json.cpp @@ -18,58 +18,11 @@ * and the GNU Lesser General Public License along with openPMD-api. * If not, see . */ -#include -#include -#include +#include "openPMD/cli/convert-toml-json.hpp" -#include -#include -#include - -namespace json = openPMD::json; - -void parsed_main(std::string jsonOrToml) -{ - auto [config, originallySpecifiedAs] = json::parseOptions( - jsonOrToml, /* considerFiles = */ true, /* convertLowercase = */ false); - { - // NOLINTNEXTLINE(bugprone-unused-local-non-trivial-variable) - [[maybe_unused]] auto _ = std::move(jsonOrToml); - } - switch (originallySpecifiedAs) - { - using SL = json::SupportedLanguages; - case SL::JSON: { - auto asToml = json::jsonToToml(config); - std::cout << json::format_toml(asToml); - } - break; - case SL::TOML: - std::cout << config << '\n'; - break; - } -} - -int main(int argc, char const **argv) +void print_help_message(char const *program_name) { - std::string jsonOrToml; - switch (argc) - { - case 0: - case 1: - // Just read the whole stream into memory - // Not very elegant, but we'll hold the entire JSON/TOML dataset - // in memory at some point anyway, so it doesn't really matter - { - std::stringbuf readEverything; - std::cin >> &readEverything; - jsonOrToml = readEverything.str(); - } - break; - case 2: - if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-h") == 0) - { - std::cout << "Usage: " << std::string(argv[0]) << R"( [json_or_toml] + std::cout << "Usage: " << std::string(program_name) << R"( [json_or_toml] 'json_or_toml' can be a JSON or TOML dataset specified inline or a reference to a file prepended by an '@'. Inline datasets will be interpreted as JSON if they start with an '{', as TOML @@ -80,14 +33,11 @@ Inline dataset specifications can be replaced by input read from stdin. If the input is JSON, then it will be converted to TOML and written to stdout, equivalently from TOML to JSON. )"; - exit(0); - } - jsonOrToml = argv[1]; - break; - default: - throw std::runtime_error( - std::string("Usage: ") + argv[0] + - " [file location or inline JSON/TOML]"); - } - parsed_main(std::move(jsonOrToml)); +} + +int main(int argc, char const **argv) +{ + using convert = convert_json_toml; + convert::run_application( + argc, argv, convert::UseStdinAs::InlineJson, print_help_message); } diff --git a/src/cli/merge-json.cpp b/src/cli/merge-json.cpp new file mode 100644 index 0000000000..de762a0113 --- /dev/null +++ b/src/cli/merge-json.cpp @@ -0,0 +1,47 @@ +/* Copyright 2026 Franz Poeschel + * + * This file is part of openPMD-api. + * + * openPMD-api is free software: you can redistribute it and/or modify + * it under the terms of of either the GNU General Public License or + * the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * openPMD-api is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License and the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU General Public License + * and the GNU Lesser General Public License along with openPMD-api. + * If not, see . + */ +#include "openPMD/cli/convert-toml-json.hpp" + +void print_help_message(char const *program_name) +{ + std::cout << "Merge multiple JSON/TOML files into one.\nUsage: " + << std::string(program_name) << R"( [json_or_toml]+ +'json_or_toml' can be a JSON or TOML dataset specified inline or a reference +to a file prepended by an '@'. +Inline datasets will be interpreted as JSON if they start with an '{', as TOML +otherwise. Datasets from a file will be interpreted as JSON or TOML depending +on the file ending '.json' or '.toml' respectively. + +In order to support large numbers of files to be merged, the paths to those +files can also be specified line-by-line per stdin, replacing the limitations +of command line arguments. + +If the JSON/TOML files are mixed, then the output type (JSON or TOML) will be +determined by the type of the first file. +)"; +} + +int main(int argc, char const **argv) +{ + using convert = convert_json_toml; + convert::run_application( + argc, argv, convert::UseStdinAs::ListOfJson, print_help_message); +} diff --git a/src/toolkit/Aws.cpp b/src/toolkit/Aws.cpp new file mode 100644 index 0000000000..dc0afc0325 --- /dev/null +++ b/src/toolkit/Aws.cpp @@ -0,0 +1,317 @@ +/* Copyright 2026 Franz Poeschel + * + * This file is part of openPMD-api. + * + * openPMD-api is free software: you can redistribute it and/or modify + * it under the terms of of either the GNU General Public License or + * the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * openPMD-api is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License and the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU General Public License + * and the GNU Lesser General Public License along with openPMD-api. + * If not, see . + */ +#include "openPMD/config.hpp" + +#if openPMD_HAVE_AWS + +#include "openPMD/auxiliary/Memory.hpp" +#include "openPMD/auxiliary/Memory_internal.hpp" +#include "openPMD/auxiliary/Variant.hpp" +#include "openPMD/toolkit/Aws.hpp" + +#include +#include +#include +#include + +#include +#include +#include + +namespace +{ +struct membuf : std::streambuf +{ + membuf(char const *base, std::size_t size) + { + auto p = const_cast(base); + this->setg(p, p, p + size); + } +}; + +struct imemstream : std::iostream +{ + imemstream(char const *base, std::size_t size) + : std::iostream(&m_buf), m_buf(base, size) + {} + +private: + membuf m_buf; +}; +} // namespace + +namespace openPMD::internal +{ +void AwsAsyncCounter::wait() +{ + size_t target = this->request_counter; + std::unique_lock lk(this->mutex); + this->event.wait( + lk, [this, target]() { return this->completion_counter >= target; }); +} + +void AwsAsyncCounter::add_task() +{ + this->request_counter++; +} + +void AwsAsyncCounter::add_and_notify_result() +{ + std::unique_lock lk(this->mutex); + this->completion_counter++; + lk.unlock(); + this->event.notify_all(); +} + +AwsAsyncCounter::~AwsAsyncCounter() +{ + std::cerr << "Waiting for remaining tasks. Have " << completion_counter + << " of " << request_counter << std::endl; + this->wait(); + std::cerr << "Finished waiting for remaining tasks" << std::endl; +} + +ExternalBlockStorageAws::ExternalBlockStorageAws( + Aws::S3::S3Client client, + std::string bucketName, + std::optional endpoint, + bool async) + : m_client{std::move(client)} + , m_bucketName(std::move(bucketName)) + , m_endpoint(std::move(endpoint)) + , m_async(async ? std::make_optional() : std::nullopt) +{ + Aws::S3::Model::CreateBucketRequest create_request; + create_request.SetBucket(m_bucketName); + auto create_outcome = m_client.CreateBucket(create_request); + if (!create_outcome.IsSuccess()) + { + std::cerr << "[ExternalBlockStorageAws::ExternalBlockStorageAws] " + "Warning: Failed to create bucket (may already exist): " + << create_outcome.GetError().GetMessage() << std::endl; + } + else + { + std::cout << "Bucket created: " << m_bucketName << std::endl; + } +} +ExternalBlockStorageAws::~ExternalBlockStorageAws() +{ + // We need to wait for late operations before doing anything else. + m_async.reset(); +} + +auto ExternalBlockStorageAws::put( + std::string const &identifier, auxiliary::WriteBuffer data, size_t len) + -> std::string +{ + auto sanitized = !identifier.empty() && identifier.at(0) == '/' + ? identifier.substr(1) + : identifier; + + Aws::S3::Model::PutObjectRequest put_request; + put_request.SetBucket(m_bucketName); + put_request.SetKey(sanitized); + + auto input_data = Aws::MakeShared( + "PutObjectInputStream", + reinterpret_cast(data.get()), + len); + put_request.SetBody(input_data); + put_request.SetContentLength(static_cast(len)); + + if (!m_async.has_value()) + { + auto put_outcome = m_client.PutObject(put_request); + + if (put_outcome.IsSuccess()) + { + // std::cout << "File synchronously uploaded successfully to S3!" + // << std::endl; + } + else + { + std::cerr << "Synchronous upload failed: " + << put_outcome.GetError().GetMessage() << std::endl; + } + } + else + { + auto &async_counter = *std::visit( + auxiliary::overloaded{ + [this](auxiliary::WriteBuffer::CopyableUniquePtr const &) { + std::cout << "Using unique pointer" << std::endl; + return &this->m_async->unique_ptr_operations; + }, + [this](auxiliary::WriteBuffer::SharedPtr const &) { + std::cout << "Using shared pointer" << std::endl; + return &this->m_async->shared_ptr_operations; + }}, + data.as_variant()); + auto responseReceivedHandler = + [&async_counter, + sanitized, + /* + * Need to keep buffers alive until they have been asynchronously + * read. Use the closure captures for this. Wrap the WriteBuffer + * inside a shared_ptr to make the std::function copyable. + */ + keepalive = + std::make_shared(std::move(data))]( + const Aws::S3::S3Client *, + const Aws::S3::Model::PutObjectRequest &, + const Aws::S3::Model::PutObjectOutcome &put_outcome, + const std::shared_ptr + &) { + (void)keepalive; + if (put_outcome.IsSuccess()) + { + // std::cout + // << "File asynchronously uploaded successfully to S3!" + // << std::endl; + } + else + { + std::cerr << "Asynchronous upload failed for '" << sanitized + << "': " << put_outcome.GetError().GetMessage() + << std::endl; + } + async_counter.add_and_notify_result(); + }; + async_counter.add_task(); + m_client.PutObjectAsync( + put_request, std::move(responseReceivedHandler)); + } + return sanitized; +} + +void ExternalBlockStorageAws::get( + std::string const &external_ref, std::shared_ptr data, size_t len) +{ + if (len == 0) + { + return; + } + + Aws::S3::Model::GetObjectRequest get_request; + get_request.SetBucket(m_bucketName); + get_request.SetKey(external_ref); + + auto processGetOutcome = [len]( + Aws::S3::Model::GetObjectOutcome const + &get_outcome, + void *data_lambda) { + auto &body = get_outcome.GetResult().GetBody(); + body.read( + reinterpret_cast(data_lambda), + static_cast(len)); + std::streamsize read_bytes = body.gcount(); + if (read_bytes != static_cast(len)) + { + throw std::runtime_error( + "ExternalBlockStorageAws: failed to read expected number of " + "bytes " + "from S3 object"); + } + }; + + if (!m_async.has_value()) + { + auto get_outcome = m_client.GetObject(get_request); + if (!get_outcome.IsSuccess()) + { + throw std::runtime_error( + std::string("ExternalBlockStorageAws::get failed: ") + + get_outcome.GetError().GetMessage()); + } + + processGetOutcome(get_outcome, data.get()); + } + else + { + auto &async_counter = this->m_async->shared_ptr_operations; + auto responseReceivedHandler = + [&async_counter, + external_ref, + processGetOutcome_lambda = std::move(processGetOutcome), + data_lambda = std::move(data)]( + const Aws::S3::S3Client *, + const Aws::S3::Model::GetObjectRequest &, + const Aws::S3::Model::GetObjectOutcome &get_outcome, + const std::shared_ptr + &) { + if (get_outcome.IsSuccess()) + { + // std::cout << "File asynchronously downloaded successfully + // " + // "from S3!" + // << std::endl; + } + else + { + std::cerr << "Asynchronous download failed for '" + << external_ref + << "': " << get_outcome.GetError().GetMessage() + << std::endl; + } + processGetOutcome_lambda(get_outcome, data_lambda.get()); + async_counter.add_and_notify_result(); + }; + async_counter.add_task(); + m_client.GetObjectAsync( + get_request, std::move(responseReceivedHandler)); + } +} + +void ExternalBlockStorageAws::syncMandatoryOperations() +{ + if (!this->m_async.has_value()) + { + return; + } + this->m_async->shared_ptr_operations.wait(); +} + +void ExternalBlockStorageAws::syncAllOperations() +{ + if (!this->m_async.has_value()) + { + return; + } + this->m_async->shared_ptr_operations.wait(); + this->m_async->unique_ptr_operations.wait(); +} + +[[nodiscard]] auto ExternalBlockStorageAws::externalStorageLocation() const + -> nlohmann::json +{ + nlohmann::json j; + j["provider"] = "aws"; + if (m_endpoint.has_value()) + { + j["endpoint"] = *m_endpoint; + } + j["bucket"] = m_bucketName; + return j; +} + +} // namespace openPMD::internal +#endif diff --git a/src/toolkit/AwsBuilder.cpp b/src/toolkit/AwsBuilder.cpp new file mode 100644 index 0000000000..aeff7f57d9 --- /dev/null +++ b/src/toolkit/AwsBuilder.cpp @@ -0,0 +1,167 @@ +/* Copyright 2026 Franz Poeschel + * + * This file is part of openPMD-api. + * + * openPMD-api is free software: you can redistribute it and/or modify + * it under the terms of of either the GNU General Public License or + * the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * openPMD-api is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License and the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU General Public License + * and the GNU Lesser General Public License along with openPMD-api. + * If not, see . + */ +#include "openPMD/config.hpp" + +#include "openPMD/toolkit/AwsBuilder.hpp" + +#include "openPMD/toolkit/Aws.hpp" +#include "openPMD/toolkit/ExternalBlockStorage.hpp" + +#if openPMD_HAVE_AWS +#include +#include +#include +#endif + +namespace openPMD::internal +{ +AwsBuilder::AwsBuilder( + std::string bucketName, std::string accessKeyId, std::string secretKey) + : m_bucketName(std::move(bucketName)) + , m_accessKeyId(std::move(accessKeyId)) + , m_secretKey(std::move(secretKey)) +{} + +auto AwsBuilder::setBucketName(std::string bucketName) -> AwsBuilder & +{ + m_bucketName = std::move(bucketName); + return *this; +} + +auto internal::AwsBuilder::setCredentials( + std::string accessKeyId, std::string secretKey) -> AwsBuilder & +{ + m_accessKeyId = std::move(accessKeyId); + m_secretKey = std::move(secretKey); + return *this; +} + +auto AwsBuilder::setEndpointOverride(std::string endpoint) -> AwsBuilder & +{ + m_endpointOverride = std::move(endpoint); + return *this; +} + +auto AwsBuilder::setRegion(std::string regionName) -> AwsBuilder & +{ + m_region = std::move(regionName); + return *this; +} + +auto AwsBuilder::setScheme(Scheme s) -> AwsBuilder & +{ + m_scheme = s; + return *this; +} + +auto AwsBuilder::setVerifySSL(bool verify) -> AwsBuilder & +{ + m_verifySSL = verify; + return *this; +} + +auto AwsBuilder::setAsyncIO(bool useAsyncIO) -> AwsBuilder & +{ + m_useAsyncIO = useAsyncIO; + return *this; +} + +auto internal::AwsBuilder::setSessionToken(std::string sessionToken) + -> AwsBuilder & +{ + m_sessionToken = std::move(sessionToken); + return *this; +} + +AwsBuilder::operator ExternalBlockStorage() +{ +#if openPMD_HAVE_AWS + Aws::Client::ClientConfiguration config; + + if (m_endpointOverride.has_value()) + { + config.endpointOverride = *m_endpointOverride; + } + if (m_region.has_value()) + { + config.region = *m_region; + } + else + { + config.region = "us-east-1"; + } + if (m_scheme.has_value()) + { + switch (*m_scheme) + { + case Scheme::HTTP: + config.scheme = Aws::Http::Scheme::HTTP; + break; + case Scheme::HTTPS: + config.scheme = Aws::Http::Scheme::HTTPS; + break; + break; + } + } + + config.connectTimeoutMs = 5000; + config.requestTimeoutMs = 15000; + + if (m_verifySSL.has_value()) + { + config.verifySSL = *m_verifySSL; + } + + auto aws_credentials = [&]() -> Aws::Auth::AWSCredentials { + if (m_sessionToken.has_value()) + { + return {m_accessKeyId, m_secretKey, *m_sessionToken}; + } + else + { + return {m_accessKeyId, m_secretKey}; + } + }(); + + Aws::S3::S3Client s3_client( + aws_credentials, + config, + Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, + false); + + return ExternalBlockStorage{std::make_unique( + std::move(s3_client), + std::move(m_bucketName), + std::move(m_endpointOverride), + m_useAsyncIO.value_or(true))}; +#else + throw std::runtime_error( + "Method not available: openPMD-api has been built without support for " + "AWS."); +#endif +} + +auto AwsBuilder::build() -> ExternalBlockStorage +{ + return *this; +} + +} // namespace openPMD::internal diff --git a/src/toolkit/ExternalBlockStorage.cpp b/src/toolkit/ExternalBlockStorage.cpp new file mode 100644 index 0000000000..c1fae7a35b --- /dev/null +++ b/src/toolkit/ExternalBlockStorage.cpp @@ -0,0 +1,305 @@ +/* Copyright 2026 Franz Poeschel + * + * This file is part of openPMD-api. + * + * openPMD-api is free software: you can redistribute it and/or modify + * it under the terms of of either the GNU General Public License or + * the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * openPMD-api is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License and the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU General Public License + * and the GNU Lesser General Public License along with openPMD-api. + * If not, see . + */ +#include "openPMD/toolkit/ExternalBlockStorage.hpp" + +#include "openPMD/DatatypeMacros.hpp" +#include "openPMD/IO/JSON/JSONIOHandlerImpl.hpp" +#include "openPMD/auxiliary/Memory.hpp" +#include "openPMD/auxiliary/StringManip.hpp" + +#include +#include + +#include +#include +#include + +namespace openPMD::internal +{ +void ExternalBlockStorageBackend::syncMandatoryOperations() +{ + // default for non-async backends: no-op +} + +void ExternalBlockStorageBackend::syncAllOperations() +{ + // default for non-async backends: no-op +} + +ExternalBlockStorageBackend::~ExternalBlockStorageBackend() = default; +} // namespace openPMD::internal + +namespace openPMD +{ + +namespace +{ + auto flat_extent(Extent const &e) -> size_t + { + return std::accumulate( + e.begin(), e.end(), 1, [](size_t left, size_t right) { + return left * right; + }); + } + + template + void read_impl( + internal::ExternalBlockStorageBackend *backend, + nlohmann::json const &external_block, + std::shared_ptr &data, + size_t len) + { + auto const &external_ref = + external_block.at("external_ref").get(); + backend->get( + external_ref, + std::static_pointer_cast(data), + sizeof(T) * len); + } +} // namespace + +ExternalBlockStorage::ExternalBlockStorage() = default; +ExternalBlockStorage::ExternalBlockStorage( + std::unique_ptr worker) + : m_worker(std::move(worker)) +{} + +auto ExternalBlockStorage::makeStdioSession(std::string directory) + -> internal::StdioBuilder +{ + return internal::StdioBuilder{std::move(directory)}; +} + +auto ExternalBlockStorage::makeAwsSession( + std::string bucketName, std::string accessKeyId, std::string secretKey) + -> internal::AwsBuilder +{ + return internal::AwsBuilder( + std::move(bucketName), std::move(accessKeyId), std::move(secretKey)); +} + +template +auto ExternalBlockStorage::store( + Extent const &globalExtent, + Offset const &blockOffset, + Extent const &blockExtent, + nlohmann::json &fullJsonDataset, + nlohmann::json::json_pointer const &path, + std::optional infix, + auxiliary::WriteBuffer data) -> std::string +{ + auto &dataset = fullJsonDataset[path]; + + using running_index_t = uint64_t; + running_index_t running_index = [&]() -> running_index_t { + if (auto it = dataset.find("_running_index"); it != dataset.end()) + { + auto res = it->get(); + ++res; + *it = res; + return res; + } + else + { + dataset["_running_index"] = 0; + return 0; + } + }(); + + constexpr size_t padding = 6; + std::string index_as_str = [running_index, &infix]() { + auto res = std::to_string(running_index); + auto size = res.size(); + if (size >= padding) + { + return res; + } + std::stringstream padded; + if (infix.has_value()) + { + padded << *infix << "--"; + } + for (size_t i = 0; i < padding - size; ++i) + { + padded << '0'; + } + padded << res; + return padded.str(); + }(); + + if (dataset.contains(index_as_str)) + { + throw std::runtime_error( + "Inconsistent state: Index " + index_as_str + " already in use."); + } + + auto check_metadata = [&dataset](char const *key, auto const &value) { + using value_t = + std::remove_reference_t>; + if (auto it = dataset.find(key); it != dataset.end()) + { + auto const &stored_value = it->get(); + if (stored_value != value) + { + throw std::runtime_error( + "Inconsistent chunk storage in key " + std::string(key) + + "."); + } + } + else + { + dataset[key] = value; + } + }; + if (!DatatypeHandling::template encodeDatatype(dataset)) + { + throw std::runtime_error("Inconsistent chunk storage in datatype."); + } + check_metadata("byte_width", sizeof(T)); + check_metadata("extent", globalExtent); + + auto &block = dataset["external_blocks"][index_as_str]; + block["offset"] = blockOffset; + block["extent"] = blockExtent; + std::stringstream filesystem_identifier; + filesystem_identifier << path.to_string(); + filesystem_identifier << "--" << index_as_str; + auto escaped_filesystem_identifier = m_worker->put( + filesystem_identifier.str(), + std::move(data), + sizeof(T) * flat_extent(blockExtent)); + block["external_ref"] = escaped_filesystem_identifier; + return index_as_str; +} + +template +void ExternalBlockStorage::read( + [[maybe_unused]] std::string const &identifier, + [[maybe_unused]] nlohmann::json const &fullJsonDataset, + [[maybe_unused]] nlohmann::json::json_pointer const &path, + [[maybe_unused]] std::shared_ptr &data) +{ + throw std::runtime_error("Unimplemented!"); +} + +template +void ExternalBlockStorage::read( + Offset const &blockOffset, + Extent const &blockExtent, + nlohmann::json const &fullJsonDataset, + nlohmann::json::json_pointer const &path, + std::shared_ptr &data) +{ + auto &dataset = fullJsonDataset[path]; + if (!DatatypeHandling::template checkDatatype(dataset)) + { + throw std::runtime_error("Inconsistent chunk storage in datatype."); + } + auto external_blocks = dataset.at("external_blocks"); + bool found_a_precise_match = false; + for (auto it = external_blocks.begin(); it != external_blocks.end(); ++it) + { + auto const &block = it.value(); + try + { + auto const &o = block.at("offset").get(); + auto const &e = block.at("extent").get(); + // Look only for exact matches for now + if (o != blockOffset || e != blockExtent) + { + continue; + } + found_a_precise_match = true; + read_impl(m_worker.get(), block, data, flat_extent(blockExtent)); + break; + } + catch (nlohmann::json::exception const &e) + { + std::cerr << "[ExternalBlockStorage::read] Could not parse block '" + << it.key() << "'. Original error was:\n" + << e.what(); + } + } + if (!found_a_precise_match) + { + throw std::runtime_error( + "[ExternalBlockStorage::read] Unable to find a precise match for " + "offset " + + auxiliary::vec_as_string(blockOffset) + " and extent " + + auxiliary::vec_as_string(blockExtent)); + } +} + +void ExternalBlockStorage::syncMandatoryOperations() +{ + this->m_worker->syncMandatoryOperations(); +} + +void ExternalBlockStorage::syncAllOperations() +{ + this->m_worker->syncAllOperations(); +} + +[[nodiscard]] auto ExternalBlockStorage::externalStorageLocation() const + -> nlohmann::json +{ + return m_worker->externalStorageLocation(); +} + +void ExternalBlockStorage::sanitizeString(std::string &s) +{ + for (char &c : s) + { + if (c == '/' || c == '\\' || c == ':' || c == '*' || c == '?' || + c == '"' || c == '<' || c == '>' || c == '|' || c == '\n' || + c == '\r' || c == '\t' || c == '\0' || c == ' ') + { + c = '_'; + } + } +} + +#define OPENPMD_INSTANTIATE_DATATYPEHANDLING(datatypehandling, type) \ + template auto ExternalBlockStorage::store( \ + Extent const &globalExtent, \ + Offset const &blockOffset, \ + Extent const &blockExtent, \ + nlohmann::json &fullJsonDataset, \ + nlohmann::json::json_pointer const &path, \ + std::optional infix, \ + auxiliary::WriteBuffer) -> std::string; \ + template void ExternalBlockStorage::read( \ + std::string const &identifier, \ + nlohmann::json const &fullJsonDataset, \ + nlohmann::json::json_pointer const &path, \ + std::shared_ptr &data); \ + template void ExternalBlockStorage::read( \ + Offset const &blockOffset, \ + Extent const &blockExtent, \ + nlohmann::json const &fullJsonDataset, \ + nlohmann::json::json_pointer const &path, \ + std::shared_ptr &data); +#define OPENPMD_INSTANTIATE(type) \ + OPENPMD_INSTANTIATE_DATATYPEHANDLING(internal::JsonDatatypeHandling, type) +OPENPMD_FOREACH_DATASET_DATATYPE(OPENPMD_INSTANTIATE) +#undef OPENPMD_INSTANTIATE + +} // namespace openPMD diff --git a/src/toolkit/Stdio.cpp b/src/toolkit/Stdio.cpp new file mode 100644 index 0000000000..4acf2748be --- /dev/null +++ b/src/toolkit/Stdio.cpp @@ -0,0 +1,167 @@ +/* Copyright 2026 Franz Poeschel + * + * This file is part of openPMD-api. + * + * openPMD-api is free software: you can redistribute it and/or modify + * it under the terms of of either the GNU General Public License or + * the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * openPMD-api is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License and the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU General Public License + * and the GNU Lesser General Public License along with openPMD-api. + * If not, see . + */ +#include "openPMD/toolkit/Stdio.hpp" + +#include "openPMD/auxiliary/Filesystem.hpp" + +#include +#include + +namespace +{ +auto concat_filepath(std::string const &s1, std::string const &s2) + -> std::string +{ + if (s1.empty()) + { + return s2; + } + if (s2.empty()) + { + return s1; + } + bool ends_with_slash = + *s1.crbegin() == openPMD::auxiliary::directory_separator; + bool starts_with_slash = + *s2.cbegin() == openPMD::auxiliary::directory_separator; + + if (ends_with_slash ^ starts_with_slash) + { + return s1 + s2; + } + else if (ends_with_slash && starts_with_slash) + { + return s1 + (s2.c_str() + 1); + } + else + { + return s1 + openPMD::auxiliary::directory_separator + s2; + } +} +} // namespace + +namespace openPMD::internal +{ +ExternalBlockStorageStdio::ExternalBlockStorageStdio( + std::string directory, std::string openMode) + : m_directory(std::move(directory)), m_openMode(std::move(openMode)) +{ + if (m_directory.empty()) + { + throw std::invalid_argument( + "ExternalBlockStorageStdio: directory cannot be empty"); + } + + if (!auxiliary::create_directories(m_directory)) + { + throw std::runtime_error( + "ExternalBlockStorageStdio: failed to create or access " + "directory: " + + m_directory); + } +} + +ExternalBlockStorageStdio::~ExternalBlockStorageStdio() = default; + +auto ExternalBlockStorageStdio::put( + std::string const &identifier, auxiliary::WriteBuffer data, size_t len) + -> std::string +{ + auto sanitized = identifier + ".dat"; + ExternalBlockStorage::sanitizeString(sanitized); + std::string filepath = concat_filepath(m_directory, sanitized); + + if (len == 0) + { + return filepath; + } + + FILE *file = std::fopen(filepath.c_str(), m_openMode.c_str()); + if (!file) + { + throw std::runtime_error( + "ExternalBlockStorageStdio: failed to open file for writing: " + + filepath); + } + + size_t written = std::fwrite(data.get(), 1, len, file); + if (written != len) + { + throw std::runtime_error( + "ExternalBlockStorageStdio: failed to write full data to file: " + + filepath); + } + + if (std::fclose(file) != 0) + { + throw std::runtime_error( + "ExternalBlockStorageStdio: failed to close file after writing: " + + filepath); + } + + return sanitized; +} + +void ExternalBlockStorageStdio::get( + std::string const &external_ref, std::shared_ptr data, size_t len) +{ + if (len == 0) + { + return; + } + + std::string filepath = concat_filepath(m_directory, external_ref); + + FILE *file = std::fopen(filepath.c_str(), "rb"); + if (!file) + { + throw std::runtime_error( + "ExternalBlockStorageStdio: failed to open file for reading: " + + filepath); + } + + size_t read = std::fread(data.get(), 1, len, file); + if (read != len) + { + std::fclose(file); + throw std::runtime_error( + "ExternalBlockStorageStdio: failed to read full data from file: " + + filepath); + } + + if (std::fclose(file) != 0) + { + throw std::runtime_error( + "ExternalBlockStorageStdio: failed to close file after reading: " + + filepath); + } +} + +[[nodiscard]] auto ExternalBlockStorageStdio::externalStorageLocation() const + -> nlohmann::json +{ + nlohmann::json j; + j["provider"] = "stdio"; + j["directory"] = m_directory; + j["open_mode"] = m_openMode; + return j; +} +} // namespace openPMD::internal diff --git a/src/toolkit/StdioBuilder.cpp b/src/toolkit/StdioBuilder.cpp new file mode 100644 index 0000000000..eca2e239ad --- /dev/null +++ b/src/toolkit/StdioBuilder.cpp @@ -0,0 +1,51 @@ +/* Copyright 2026 Franz Poeschel + * + * This file is part of openPMD-api. + * + * openPMD-api is free software: you can redistribute it and/or modify + * it under the terms of of either the GNU General Public License or + * the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * openPMD-api is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License and the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU General Public License + * and the GNU Lesser General Public License along with openPMD-api. + * If not, see . + */ +#include "openPMD/toolkit/StdioBuilder.hpp" + +#include "openPMD/toolkit/ExternalBlockStorage.hpp" +#include "openPMD/toolkit/Stdio.hpp" + +#include + +namespace openPMD::internal +{ +auto StdioBuilder::setDirectory(std::string directory) -> StdioBuilder & +{ + m_directory = std::move(directory); + return *this; +} +auto StdioBuilder::setOpenMode(std::string openMode) -> StdioBuilder & +{ + m_openMode = std::move(openMode); + return *this; +} + +StdioBuilder::operator ExternalBlockStorage() +{ + return ExternalBlockStorage{std::make_unique( + std::move(m_directory), std::move(m_openMode).value_or("wb"))}; +} + +auto StdioBuilder::build() -> ExternalBlockStorage +{ + return *this; +} +} // namespace openPMD::internal diff --git a/test/SerialIOTest.cpp b/test/SerialIOTest.cpp index 7fd13822f3..24fd529060 100644 --- a/test/SerialIOTest.cpp +++ b/test/SerialIOTest.cpp @@ -5272,6 +5272,27 @@ TEST_CASE("bp4_steps", "[serial][adios2]") void serial_iterator(std::string const &file) { + auto const write_config = R"( +init_aws_api = true +rank_table = "posix_hostname" + +[json.attribute] +mode = "short" + +[json.dataset.mode] +provider = "aws" +access_key_id = "test" +secret_access_key = "test" +endpoint = "http://localhost:4566" +bucket = "simdata" + )"; + auto const read_config = R"( +init_aws_api = true + +[json.dataset.mode] +access_key_id = "test" +secret_access_key = "test" + )"; constexpr Extent::value_type extent = 1000; { Series writeSeries( @@ -5279,7 +5300,7 @@ void serial_iterator(std::string const &file) Access::CREATE_LINEAR #ifndef _WIN32 , - R"({"rank_table": "posix_hostname"})" + write_config #endif ); auto iterations = writeSeries.snapshots(); @@ -5294,7 +5315,7 @@ void serial_iterator(std::string const &file) } } - Series readSeries(file, Access::READ_ONLY); + Series readSeries(file, Access::READ_ONLY, read_config); size_t last_iteration_index = 0; size_t numberOfIterations = 0; @@ -5330,19 +5351,23 @@ void serial_iterator(std::string const &file) TEST_CASE("serial_iterator", "[serial][adios2]") { - for (auto const &t : testedFileExtensions()) - { -#ifdef _WIN32 - serial_iterator("../samples/serial_iterator_filebased_%T." + t); - serial_iterator("../samples/serial_iterator_groupbased." + t); -#else - // Add some regex characters into the file names to see that we can deal - // with that. Don't do that on Windows because Windows does not like - // those characters within file paths. - serial_iterator("../samples/serial_iterator_filebased_+?_%T." + t); - serial_iterator("../samples/serial_iterator_groupbased_+?." + t); -#endif - } + serial_iterator("../samples/serial_iterator.json"); + // for (auto const &t : testedFileExtensions()) + // { + // #ifdef _WIN32 + // serial_iterator("../samples/serial_iterator_filebased_%T." + t); + // serial_iterator("../samples/serial_iterator_groupbased." + t); + // #else + // // Add some regex characters into the file names to see that we + // can deal + // // with that. Don't do that on Windows because Windows does not + // like + // // those characters within file paths. + // serial_iterator("../samples/serial_iterator_filebased_+?_%T." + + // t); serial_iterator("../samples/serial_iterator_groupbased_+?." + + // t); + // #endif + // } } void variableBasedSingleIteration(std::string const &file)