From d0834aea01d93d2f62e1fbec60ffacb65708cf50 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Mon, 27 Jan 2025 16:06:19 +0100 Subject: [PATCH 1/3] Signpost for TableTreeHelpers --- Framework/Core/src/TableTreeHelpers.cxx | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/Framework/Core/src/TableTreeHelpers.cxx b/Framework/Core/src/TableTreeHelpers.cxx index d0fdd0ced5779..2f23c07aea451 100644 --- a/Framework/Core/src/TableTreeHelpers.cxx +++ b/Framework/Core/src/TableTreeHelpers.cxx @@ -11,6 +11,7 @@ #include "Framework/TableTreeHelpers.h" #include "Framework/Logger.h" #include "Framework/Endian.h" +#include "Framework/Signpost.h" #include "arrow/type_traits.h" #include @@ -21,6 +22,9 @@ #include #include + +O2_DECLARE_DYNAMIC_LOG(tabletree_helpers); + namespace TableTreeHelpers { static constexpr char const* sizeBranchSuffix = "_size"; @@ -134,6 +138,7 @@ BranchToColumn::BranchToColumn(TBranch* branch, bool VLA, std::string name, EDat std::pair, std::shared_ptr> BranchToColumn::read(TBuffer* buffer) { + O2_SIGNPOST_ID_FROM_POINTER(sid, tabletree_helpers, buffer); auto totalEntries = mBranch->GetEntries(); arrow::Status status; int readEntries = 0; @@ -170,7 +175,9 @@ std::pair, std::shared_ptr> B } } else { // other types: use serialized read to build arrays directly + size_t branchSize = mBranch->GetTotBytes(); auto&& result = arrow::AllocateResizableBuffer(mBranch->GetTotBytes(), mPool); + O2_SIGNPOST_EVENT_EMIT(tabletree_helpers, sid, "BranchToColumn", "Allocating %ld bytes for %{public}s", branchSize, mBranch->GetName()); if (!result.ok()) { throw runtime_error("Cannot allocate values buffer"); } @@ -526,17 +533,20 @@ void TreeToTable::setLabel(const char* label) mTableLabel = label; } -void TreeToTable::fill(TTree*) +void TreeToTable::fill(TTree*tree) { std::vector> columns; std::vector> fields; static TBufferFile buffer{TBuffer::EMode::kWrite, 4 * 1024 * 1024}; + O2_SIGNPOST_ID_FROM_POINTER(sid, tabletree_helpers, &buffer); + O2_SIGNPOST_START(tabletree_helpers, sid, "TreeToTable", "Filling %{public}s", tree->GetName()); for (auto& reader : mBranchReaders) { buffer.Reset(); auto arrayAndField = reader->read(&buffer); columns.push_back(arrayAndField.first); fields.push_back(arrayAndField.second); } + O2_SIGNPOST_END(tabletree_helpers, sid, "TreeToTable", "Done filling."); auto schema = std::make_shared(fields, std::make_shared(std::vector{std::string{"label"}}, std::vector{mTableLabel})); mTable = arrow::Table::Make(schema, columns); From 70b4151a69c152dba780f3a0ed805f29d50c84b9 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Mon, 27 Jan 2025 16:06:19 +0100 Subject: [PATCH 2/3] DPL: Move DataInputDirector to arrow::Dataset API --- .../src/AODJAlienReaderHelpers.cxx | 16 +- .../AnalysisSupport/src/DataInputDirector.cxx | 184 ++++++++++++------ .../AnalysisSupport/src/DataInputDirector.h | 20 +- Framework/AnalysisSupport/src/Plugin.cxx | 40 +++- Framework/AnalysisSupport/src/TTreePlugin.cxx | 28 ++- .../include/Framework/RootArrowFilesystem.h | 2 + Framework/Core/src/RootArrowFilesystem.cxx | 13 +- 7 files changed, 211 insertions(+), 92 deletions(-) diff --git a/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx b/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx index 9c19de85739ce..f8a9705e4eb62 100644 --- a/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx +++ b/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx @@ -10,10 +10,12 @@ // or submit itself to any jurisdiction. #include "AODJAlienReaderHelpers.h" +#include #include "Framework/TableTreeHelpers.h" #include "Framework/AnalysisHelpers.h" #include "Framework/DataProcessingStats.h" #include "Framework/RootTableBuilderHelpers.h" +#include "Framework/RootArrowFilesystem.h" #include "Framework/AlgorithmSpec.h" #include "Framework/ConfigParamRegistry.h" #include "Framework/ControlService.h" @@ -41,6 +43,8 @@ #include #include #include +#include +#include using namespace o2; using namespace o2::aod; @@ -272,11 +276,13 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const // Origin file name for derived output map auto o2 = Output(TFFileNameHeader); auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf); - std::string currentFilename(fileAndFolder.file->GetName()); - if (strcmp(fileAndFolder.file->GetEndpointUrl()->GetProtocol(), "file") == 0 && fileAndFolder.file->GetEndpointUrl()->GetFile()[0] != '/') { + auto rootFS = std::dynamic_pointer_cast(fileAndFolder.filesystem()); + auto* f = dynamic_cast(rootFS->GetFile()); + std::string currentFilename(f->GetFile()->GetName()); + if (strcmp(f->GetEndpointUrl()->GetProtocol(), "file") == 0 && f->GetEndpointUrl()->GetFile()[0] != '/') { // This is not an absolute local path. Make it absolute. static std::string pwd = gSystem->pwd() + std::string("/"); - currentFilename = pwd + std::string(fileAndFolder.file->GetName()); + currentFilename = pwd + std::string(f->GetName()); } outputs.make(o2) = currentFilename; } @@ -312,7 +318,9 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const auto concrete = DataSpecUtils::asConcreteDataMatcher(firstRoute.matcher); auto dh = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec); auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf); - if (!fileAndFolder.file) { + + // In case the filesource is empty, move to the next one. + if (fileAndFolder.path().empty()) { fcnt += 1; ntf = 0; if (didir->atEnd(fcnt)) { diff --git a/Framework/AnalysisSupport/src/DataInputDirector.cxx b/Framework/AnalysisSupport/src/DataInputDirector.cxx index 172ecd66c0e64..a07c3f76ad736 100644 --- a/Framework/AnalysisSupport/src/DataInputDirector.cxx +++ b/Framework/AnalysisSupport/src/DataInputDirector.cxx @@ -11,6 +11,8 @@ #include "DataInputDirector.h" #include "Framework/DataDescriptorQueryBuilder.h" #include "Framework/Logger.h" +#include "Framework/PluginManager.h" +#include "Framework/RootArrowFilesystem.h" #include "Framework/AnalysisDataModelHelpers.h" #include "Framework/Output.h" #include "Headers/DataHeader.h" @@ -26,8 +28,12 @@ #include "TGrid.h" #include "TObjString.h" #include "TMap.h" +#include "TFile.h" +#include +#include #include +#include #if __has_include() #include @@ -47,12 +53,27 @@ FileNameHolder* makeFileNameHolder(std::string fileName) return fileNameHolder; } -DataInputDescriptor::DataInputDescriptor(bool alienSupport, int level, o2::monitoring::Monitoring* monitoring, int allowedParentLevel, std::string parentFileReplacement) : mAlienSupport(alienSupport), - mMonitoring(monitoring), - mAllowedParentLevel(allowedParentLevel), - mParentFileReplacement(std::move(parentFileReplacement)), - mLevel(level) +DataInputDescriptor::DataInputDescriptor(bool alienSupport, int level, o2::monitoring::Monitoring* monitoring, int allowedParentLevel, std::string parentFileReplacement) + : mAlienSupport(alienSupport), + mMonitoring(monitoring), + mAllowedParentLevel(allowedParentLevel), + mParentFileReplacement(std::move(parentFileReplacement)), + mLevel(level) { + std::vector capabilitiesSpecs = { + "O2Framework:RNTupleObjectReadingCapability", + "O2Framework:TTreeObjectReadingCapability", + }; + + std::vector plugins; + for (auto spec : capabilitiesSpecs) { + auto morePlugins = PluginManager::parsePluginSpecString(spec); + for (auto& extra : morePlugins) { + plugins.push_back(extra); + } + } + + PluginManager::loadFromPlugin(plugins, mFactory.capabilities); } void DataInputDescriptor::printOut() @@ -108,20 +129,22 @@ bool DataInputDescriptor::setFile(int counter) // open file auto filename = mfilenames[counter]->fileName; - if (mcurrentFile) { - if (mcurrentFile->GetName() == filename) { + auto rootFS = std::dynamic_pointer_cast(mCurrentFilesystem); + if (rootFS.get()) { + if (rootFS->GetFile()->GetName() == filename) { return true; } closeInputFile(); } - mcurrentFile = TFile::Open(filename.c_str()); - if (!mcurrentFile) { + + mCurrentFilesystem = std::make_shared(TFile::Open(filename.c_str()), 50 * 1024 * 1024, mFactory); + if (!mCurrentFilesystem.get()) { throw std::runtime_error(fmt::format("Couldn't open file \"{}\"!", filename)); } - mcurrentFile->SetReadaheadSize(50 * 1024 * 1024); + rootFS = std::dynamic_pointer_cast(mCurrentFilesystem); // get the parent file map if exists - mParentFileMap = (TMap*)mcurrentFile->Get("parentFiles"); // folder name (DF_XXX) --> parent file (absolute path) + mParentFileMap = (TMap*)rootFS->GetFile()->Get("parentFiles"); // folder name (DF_XXX) --> parent file (absolute path) if (mParentFileMap && !mParentFileReplacement.empty()) { auto pos = mParentFileReplacement.find(';'); if (pos == std::string::npos) { @@ -140,16 +163,28 @@ bool DataInputDescriptor::setFile(int counter) // get the directory names if (mfilenames[counter]->numberOfTimeFrames <= 0) { - std::regex TFRegex = std::regex("DF_[0-9]+"); - TList* keyList = mcurrentFile->GetListOfKeys(); + const std::regex TFRegex = std::regex("/?DF_([0-9]+)(|-.*)$"); + TList* keyList = rootFS->GetFile()->GetListOfKeys(); + std::vector finalList; // extract TF numbers and sort accordingly + // We use an extra seen set to make sure we preserve the order in which + // we instert things in the final list and to make sure we do not have duplicates. + // Multiple folder numbers can happen if we use a flat structure /DF_- + std::unordered_set seen; for (auto key : *keyList) { - if (std::regex_match(((TObjString*)key)->GetString().Data(), TFRegex)) { - auto folderNumber = std::stoul(std::string(((TObjString*)key)->GetString().Data()).substr(3)); - mfilenames[counter]->listOfTimeFrameNumbers.emplace_back(folderNumber); + std::smatch matchResult; + std::string keyName = ((TObjString*)key)->GetString().Data(); + bool match = std::regex_match(keyName, matchResult, TFRegex); + if (match) { + auto folderNumber = std::stoul(matchResult[1].str()); + if (seen.find(folderNumber) == seen.end()) { + seen.insert(folderNumber); + mfilenames[counter]->listOfTimeFrameNumbers.emplace_back(folderNumber); + } } } + if (mParentFileMap != nullptr) { // If we have a parent map, we should not process in DF alphabetical order but according to parent file to avoid swapping between files std::sort(mfilenames[counter]->listOfTimeFrameNumbers.begin(), mfilenames[counter]->listOfTimeFrameNumbers.end(), @@ -162,12 +197,8 @@ bool DataInputDescriptor::setFile(int counter) std::sort(mfilenames[counter]->listOfTimeFrameNumbers.begin(), mfilenames[counter]->listOfTimeFrameNumbers.end()); } - for (auto folderNumber : mfilenames[counter]->listOfTimeFrameNumbers) { - auto folderName = "DF_" + std::to_string(folderNumber); - mfilenames[counter]->listOfTimeFrameKeys.emplace_back(folderName); - mfilenames[counter]->alreadyRead.emplace_back(false); - } - mfilenames[counter]->numberOfTimeFrames = mfilenames[counter]->listOfTimeFrameKeys.size(); + mfilenames[counter]->alreadyRead.resize(mfilenames[counter]->alreadyRead.size() + mfilenames[counter]->listOfTimeFrameNumbers.size(), false); + mfilenames[counter]->numberOfTimeFrames = mfilenames[counter]->listOfTimeFrameNumbers.size(); } mCurrentFileID = counter; @@ -193,26 +224,21 @@ uint64_t DataInputDescriptor::getTimeFrameNumber(int counter, int numTF) return (mfilenames[counter]->listOfTimeFrameNumbers)[numTF]; } -FileAndFolder DataInputDescriptor::getFileFolder(int counter, int numTF) +arrow::dataset::FileSource DataInputDescriptor::getFileFolder(int counter, int numTF) { - FileAndFolder fileAndFolder; - // open file if (!setFile(counter)) { - return fileAndFolder; + return {}; } // no TF left if (mfilenames[counter]->numberOfTimeFrames > 0 && numTF >= mfilenames[counter]->numberOfTimeFrames) { - return fileAndFolder; + return {}; } - fileAndFolder.file = mcurrentFile; - fileAndFolder.folderName = (mfilenames[counter]->listOfTimeFrameKeys)[numTF]; - mfilenames[counter]->alreadyRead[numTF] = true; - return fileAndFolder; + return {fmt::format("DF_{}", mfilenames[counter]->listOfTimeFrameNumbers[numTF]), mCurrentFilesystem}; } DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF, std::string treename) @@ -221,17 +247,19 @@ DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF, // This file has no parent map return nullptr; } - auto folderName = (mfilenames[counter]->listOfTimeFrameKeys)[numTF]; + auto folderName = fmt::format("DF_{}", mfilenames[counter]->listOfTimeFrameNumbers[numTF]); auto parentFileName = (TObjString*)mParentFileMap->GetValue(folderName.c_str()); + // The current DF is not found in the parent map (this should not happen and is a fatal error) + auto rootFS = std::dynamic_pointer_cast(mCurrentFilesystem); if (!parentFileName) { - // The current DF is not found in the parent map (this should not happen and is a fatal error) - throw std::runtime_error(fmt::format(R"(parent file map exists but does not contain the current DF "{}" in file "{}")", folderName.c_str(), mcurrentFile->GetName())); + throw std::runtime_error(fmt::format(R"(parent file map exists but does not contain the current DF "{}" in file "{}")", folderName.c_str(), rootFS->GetFile()->GetName())); return nullptr; } if (mParentFile) { // Is this still the corresponding to the correct file? - if (parentFileName->GetString().CompareTo(mParentFile->mcurrentFile->GetName()) == 0) { + auto parentRootFS = std::dynamic_pointer_cast(mParentFile->mCurrentFilesystem); + if (parentFileName->GetString().CompareTo(parentRootFS->GetFile()->GetName()) == 0) { return mParentFile; } else { mParentFile->closeInputFile(); @@ -241,7 +269,8 @@ DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF, } if (mLevel == mAllowedParentLevel) { - throw std::runtime_error(fmt::format(R"(while looking for tree "{}", the parent file was requested but we are already at level {} of maximal allowed level {} for DF "{}" in file "{}")", treename.c_str(), mLevel, mAllowedParentLevel, folderName.c_str(), mcurrentFile->GetName())); + throw std::runtime_error(fmt::format(R"(while looking for tree "{}", the parent file was requested but we are already at level {} of maximal allowed level {} for DF "{}" in file "{}")", treename.c_str(), mLevel, mAllowedParentLevel, folderName.c_str(), + rootFS->GetFile()->GetName())); } LOGP(info, "Opening parent file {} for DF {}", parentFileName->GetString().Data(), folderName.c_str()); @@ -270,11 +299,13 @@ void DataInputDescriptor::printFileStatistics() if (wait_time < 0) { wait_time = 0; } - std::string monitoringInfo(fmt::format("lfn={},size={},total_df={},read_df={},read_bytes={},read_calls={},io_time={:.1f},wait_time={:.1f},level={}", mcurrentFile->GetName(), - mcurrentFile->GetSize(), getTimeFramesInFile(mCurrentFileID), getReadTimeFramesInFile(mCurrentFileID), mcurrentFile->GetBytesRead(), mcurrentFile->GetReadCalls(), + auto rootFS = std::dynamic_pointer_cast(mCurrentFilesystem); + auto f = dynamic_cast(rootFS->GetFile()); + std::string monitoringInfo(fmt::format("lfn={},size={},total_df={},read_df={},read_bytes={},read_calls={},io_time={:.1f},wait_time={:.1f},level={}", f->GetName(), + f->GetSize(), getTimeFramesInFile(mCurrentFileID), getReadTimeFramesInFile(mCurrentFileID), f->GetBytesRead(), f->GetReadCalls(), ((float)mIOTime / 1e9), ((float)wait_time / 1e9), mLevel)); #if __has_include() - auto alienFile = dynamic_cast(mcurrentFile); + auto alienFile = dynamic_cast(f); if (alienFile) { monitoringInfo += fmt::format(",se={},open_time={:.1f}", alienFile->GetSE(), alienFile->GetElapsed()); } @@ -285,7 +316,7 @@ void DataInputDescriptor::printFileStatistics() void DataInputDescriptor::closeInputFile() { - if (mcurrentFile) { + if (mCurrentFilesystem.get()) { if (mParentFile) { mParentFile->closeInputFile(); delete mParentFile; @@ -296,9 +327,7 @@ void DataInputDescriptor::closeInputFile() mParentFileMap = nullptr; printFileStatistics(); - mcurrentFile->Close(); - delete mcurrentFile; - mcurrentFile = nullptr; + mCurrentFilesystem.reset(); } } @@ -346,8 +375,8 @@ int DataInputDescriptor::fillInputfiles() int DataInputDescriptor::findDFNumber(int file, std::string dfName) { - auto dfList = mfilenames[file]->listOfTimeFrameKeys; - auto it = std::find(dfList.begin(), dfList.end(), dfName); + auto dfList = mfilenames[file]->listOfTimeFrameNumbers; + auto it = std::find_if(dfList.begin(), dfList.end(), [dfName](size_t i) { return fmt::format("DF_{}", i) == dfName; }); if (it == dfList.end()) { return -1; } @@ -358,40 +387,67 @@ bool DataInputDescriptor::readTree(DataAllocator& outputs, header::DataHeader dh { auto ioStart = uv_hrtime(); - auto fileAndFolder = getFileFolder(counter, numTF); - if (!fileAndFolder.file) { + auto folder = getFileFolder(counter, numTF); + if (!folder.filesystem()) { return false; } - auto fullpath = fileAndFolder.folderName + "/" + treename; - auto tree = (TTree*)fileAndFolder.file->Get(fullpath.c_str()); + auto rootFS = std::dynamic_pointer_cast(folder.filesystem()); + + if (!rootFS) { + throw std::runtime_error(fmt::format(R"(Not a TFile filesystem!)")); + } + // FIXME: Ugly. We should detect the format from the treename, good enough for now. + std::shared_ptr format; + + auto fullpath = arrow::dataset::FileSource{folder.path() + "/" + treename, folder.filesystem()}; + + for (auto& capability : mFactory.capabilities) { + auto objectPath = capability.lfn2objectPath(fullpath.path()); + void* handle = capability.getHandle(rootFS->GetFile(), objectPath); + if (handle) { + format = capability.factory().format(); + break; + } + } - if (!tree) { - LOGP(debug, "Could not find tree {}. Trying in parent file.", fullpath.c_str()); + if (!format) { + throw std::runtime_error(fmt::format(R"(Cannot find a viable format for object {}!)", fullpath.path())); + } + + auto schemaOpt = format->Inspect(fullpath); + auto schema = *schemaOpt; + + auto fragment = format->MakeFragment(fullpath, {}, schema); + + if (!fragment.ok()) { + LOGP(debug, "Could not find tree {}. Trying in parent file.", fullpath.path()); auto parentFile = getParentFile(counter, numTF, treename); if (parentFile != nullptr) { - int parentNumTF = parentFile->findDFNumber(0, fileAndFolder.folderName); + int parentNumTF = parentFile->findDFNumber(0, folder.path()); if (parentNumTF == -1) { - throw std::runtime_error(fmt::format(R"(DF {} listed in parent file map but not found in the corresponding file "{}")", fileAndFolder.folderName, parentFile->mcurrentFile->GetName())); + auto parentRootFS = std::dynamic_pointer_cast(parentFile->mCurrentFilesystem); + throw std::runtime_error(fmt::format(R"(DF {} listed in parent file map but not found in the corresponding file "{}")", folder.path(), parentRootFS->GetFile()->GetName())); } // first argument is 0 as the parent file object contains only 1 file return parentFile->readTree(outputs, dh, 0, parentNumTF, treename, totalSizeCompressed, totalSizeUncompressed); } - throw std::runtime_error(fmt::format(R"(Couldn't get TTree "{}" from "{}". Please check https://aliceo2group.github.io/analysis-framework/docs/troubleshooting/#tree-not-found for more information.)", fileAndFolder.folderName + "/" + treename, fileAndFolder.file->GetName())); + auto rootFS = std::dynamic_pointer_cast(mCurrentFilesystem); + throw std::runtime_error(fmt::format(R"(Couldn't get TTree "{}" from "{}". Please check https://aliceo2group.github.io/analysis-framework/docs/troubleshooting/#tree-not-found for more information.)", fullpath.path(), rootFS->GetFile()->GetName())); } // create table output auto o = Output(dh); - auto t2t = outputs.make(o); - // add branches to read - // fill the table - t2t->setLabel(tree->GetName()); - totalSizeCompressed += tree->GetZipBytes(); - totalSizeUncompressed += tree->GetTotBytes(); - t2t->addAllColumns(tree); - t2t->fill(tree); - delete tree; + // FIXME: This should allow me to create a memory pool + // which I can then use to scan the dataset. + // + auto f2b = outputs.make(o); + + //// add branches to read + //// fill the table + f2b->setLabel(treename.c_str()); + f2b->fill(*fragment, schema, format); mIOTime += (uv_hrtime() - ioStart); @@ -693,7 +749,7 @@ DataInputDescriptor* DataInputDirector::getDataInputDescriptor(header::DataHeade return result; } -FileAndFolder DataInputDirector::getFileFolder(header::DataHeader dh, int counter, int numTF) +arrow::dataset::FileSource DataInputDirector::getFileFolder(header::DataHeader dh, int counter, int numTF) { auto didesc = getDataInputDescriptor(dh); // if NOT match then use defaultDataInputDescriptor diff --git a/Framework/AnalysisSupport/src/DataInputDirector.h b/Framework/AnalysisSupport/src/DataInputDirector.h index eca0ef195d111..9bab29db3ff24 100644 --- a/Framework/AnalysisSupport/src/DataInputDirector.h +++ b/Framework/AnalysisSupport/src/DataInputDirector.h @@ -15,6 +15,10 @@ #include "Framework/DataDescriptorMatcher.h" #include "Framework/DataAllocator.h" +#include "Framework/RootArrowFilesystem.h" + +#include +#include #include #include "rapidjson/fwd.h" @@ -31,16 +35,10 @@ struct FileNameHolder { std::string fileName; int numberOfTimeFrames = 0; std::vector listOfTimeFrameNumbers; - std::vector listOfTimeFrameKeys; std::vector alreadyRead; }; FileNameHolder* makeFileNameHolder(std::string fileName); -struct FileAndFolder { - TFile* file = nullptr; - std::string folderName = ""; -}; - class DataInputDescriptor { /// Holds information concerning the reading of an aod table. @@ -52,7 +50,6 @@ class DataInputDescriptor std::string treename = ""; std::unique_ptr matcher; - DataInputDescriptor() = default; DataInputDescriptor(bool alienSupport, int level, o2::monitoring::Monitoring* monitoring = nullptr, int allowedParentLevel = 0, std::string parentFileReplacement = ""); void printOut(); @@ -78,7 +75,7 @@ class DataInputDescriptor int findDFNumber(int file, std::string dfName); uint64_t getTimeFrameNumber(int counter, int numTF); - FileAndFolder getFileFolder(int counter, int numTF); + arrow::dataset::FileSource getFileFolder(int counter, int numTF); DataInputDescriptor* getParentFile(int counter, int numTF, std::string treename); int getTimeFramesInFile(int counter); int getReadTimeFramesInFile(int counter); @@ -90,6 +87,7 @@ class DataInputDescriptor bool isAlienSupportOn() { return mAlienSupport; } private: + o2::framework::RootObjectReadingFactory mFactory; std::string minputfilesFile = ""; std::string* minputfilesFilePtr = nullptr; std::string mFilenameRegex = ""; @@ -98,7 +96,7 @@ class DataInputDescriptor std::string mParentFileReplacement; std::vector mfilenames; std::vector* mdefaultFilenamesPtr = nullptr; - TFile* mcurrentFile = nullptr; + std::shared_ptr mCurrentFilesystem; int mCurrentFileID = -1; bool mAlienSupport = false; @@ -127,7 +125,6 @@ class DataInputDirector ~DataInputDirector(); void reset(); - void createDefaultDataInputDescriptor(); void printOut(); bool atEnd(int counter); @@ -140,10 +137,11 @@ class DataInputDirector // getters DataInputDescriptor* getDataInputDescriptor(header::DataHeader dh); int getNumberInputDescriptors() { return mdataInputDescriptors.size(); } + void createDefaultDataInputDescriptor(); bool readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, size_t& totalSizeCompressed, size_t& totalSizeUncompressed); uint64_t getTimeFrameNumber(header::DataHeader dh, int counter, int numTF); - FileAndFolder getFileFolder(header::DataHeader dh, int counter, int numTF); + arrow::dataset::FileSource getFileFolder(header::DataHeader dh, int counter, int numTF); int getTimeFramesInFile(header::DataHeader dh, int counter); uint64_t getTotalSizeCompressed(); diff --git a/Framework/AnalysisSupport/src/Plugin.cxx b/Framework/AnalysisSupport/src/Plugin.cxx index e3a39761e8049..033adc461c600 100644 --- a/Framework/AnalysisSupport/src/Plugin.cxx +++ b/Framework/AnalysisSupport/src/Plugin.cxx @@ -85,20 +85,48 @@ std::vector getListOfTables(std::unique_ptr& f) { std::vector r; TList* keyList = f->GetListOfKeys(); + // We should handle two cases, one where the list of tables in a TDirectory, + // the other one where the dataframe number is just a prefix + std::string first = ""; for (auto key : *keyList) { - if (!std::string_view(key->GetName()).starts_with("DF_")) { + if (!std::string_view(key->GetName()).starts_with("DF_") && !std::string_view(key->GetName()).starts_with("/DF_")) { continue; } - auto* d = (TDirectory*)f->Get(key->GetName()); - TList* branchList = d->GetListOfKeys(); - for (auto b : *branchList) { - r.emplace_back(b->GetName()); + auto* d = (TDirectory*)f->GetObjectChecked(key->GetName(), TClass::GetClass("TDirectory")); + // Objects are in a folder, list it. + if (d) { + TList* branchList = d->GetListOfKeys(); + for (auto b : *branchList) { + r.emplace_back(b->GetName()); + } + break; + } + + void* v = f->GetObjectChecked(key->GetName(), TClass::GetClass("ROOT::Experimental::RNTuple")); + if (v) { + std::string s = key->GetName(); + size_t pos = s.find('-'); + // Check if '-' is found + // Skip metaData and parentFiles + if (pos == std::string::npos) { + continue; + } + std::string t = s.substr(pos + 1); + // If we find a duplicate table name, it means we are in the next DF and we can stop. + if (t == first) { + break; + } + if (first.empty()) { + first = t; + } + // Create a new string starting after the '-' + r.emplace_back(t); } - break; } return r; } + auto readMetadata(std::unique_ptr& currentFile) -> std::vector { // Get the metadata, if any diff --git a/Framework/AnalysisSupport/src/TTreePlugin.cxx b/Framework/AnalysisSupport/src/TTreePlugin.cxx index e84a053d58d60..94491e565b9d2 100644 --- a/Framework/AnalysisSupport/src/TTreePlugin.cxx +++ b/Framework/AnalysisSupport/src/TTreePlugin.cxx @@ -13,6 +13,7 @@ #include "Framework/Plugins.h" #include "Framework/Signpost.h" #include "Framework/Endian.h" +#include #include #include #include @@ -245,6 +246,8 @@ struct TTreeObjectReadingImplementation : public RootArrowFactoryPlugin { } }; +O2_DECLARE_DYNAMIC_LOG(ttree_format); + arrow::Result TTreeFileFormat::ScanBatchesAsync( const std::shared_ptr& options, const std::shared_ptr& fragment) const @@ -268,13 +271,17 @@ arrow::Result TTreeFileFormat::ScanBatchesAsync( int64_t rows = -1; auto& tree = fs->GetTree(treeFragment->source()); + O2_SIGNPOST_ID_FROM_POINTER(sid, ttree_format, tree.get()); + O2_SIGNPOST_START(ttree_format, sid, "TTreeFormat", "Scanning Batches for %{public}s (cache %zu)", tree->GetName(), (size_t)tree->GetCacheSize()); for (auto& field : fields) { // The field actually on disk auto physicalField = physical_schema->GetFieldByName(field->name()); TBranch* branch = tree->GetBranch(physicalField->name().c_str()); assert(branch); buffer.Reset(); - auto totalEntries = branch->GetEntries(); + size_t totalEntries = branch->GetEntries(); + O2_SIGNPOST_EVENT_EMIT(ttree_format, sid, "TTreeFormat", "Reading %zu entries from branch %{public}s to field %{public}s", + totalEntries, branch->GetName(), field->name().c_str()); if (rows == -1) { rows = totalEntries; } @@ -344,12 +351,13 @@ arrow::Result TTreeFileFormat::ScanBatchesAsync( auto typeSize = physicalField->type()->byte_width(); // This is needed for branches which have not been persisted. auto bytes = branch->GetTotBytes(); - auto branchSize = bytes ? bytes : 1000000; - auto&& result = arrow::AllocateResizableBuffer(branchSize, pool); + size_t branchSize = bytes ? bytes : 1000000; + O2_SIGNPOST_EVENT_EMIT(ttree_format, sid, "TTreeFormat", "Allocating buffer for branch %{public}s %zu.", physicalField->name().c_str(), branchSize); + arrow::Result>&& result = arrow::AllocateResizableBuffer(branchSize, pool); if (!result.ok()) { throw runtime_error("Cannot allocate values buffer"); } - std::shared_ptr arrowValuesBuffer = std::move(result).ValueUnsafe(); + std::shared_ptr arrowValuesBuffer = result.MoveValueUnsafe(); auto ptr = arrowValuesBuffer->mutable_data(); if (ptr == nullptr) { throw runtime_error("Invalid buffer"); @@ -379,7 +387,7 @@ arrow::Result TTreeFileFormat::ScanBatchesAsync( if (!result.ok()) { throw runtime_error("Cannot allocate offset buffer"); } - arrowOffsetBuffer = std::move(result).ValueUnsafe(); + arrowOffsetBuffer = result.MoveValueUnsafe(); unsigned char* ptrOffset = arrowOffsetBuffer->mutable_data(); auto* tPtrOffset = reinterpret_cast(ptrOffset); offsets = std::span{tPtrOffset, tPtrOffset + totalEntries + 1}; @@ -434,6 +442,7 @@ arrow::Result TTreeFileFormat::ScanBatchesAsync( columns.push_back(array); } + O2_SIGNPOST_END(ttree_format, sid, "TTreeFormat", "Done creating batches %{public}s.", tree->GetName()); auto batch = arrow::RecordBatch::Make(dataset_schema, rows, columns); totalCompressedSize += tree->GetZipBytes(); totalUncompressedSize += tree->GetTotBytes(); @@ -545,6 +554,9 @@ arrow::Result> TTreeFileFormat::Inspect(const arr } auto& tree = treeFs->GetTree(source); + O2_SIGNPOST_ID_FROM_POINTER(sid, ttree_format, tree.get()); + O2_SIGNPOST_START(ttree_format, sid, "TTreeFileFormat::Inspect", "Starting inspection of source %{public}s", source.path().c_str()); + auto branches = tree->GetListOfBranches(); auto n = branches->GetEntries(); @@ -581,18 +593,24 @@ arrow::Result> TTreeFileFormat::Inspect(const arr auto field = std::make_shared(bi.ptr->GetName(), arrowTypeFromROOT(type, listSize)); fields.push_back(field); + O2_SIGNPOST_EVENT_EMIT(ttree_format, sid, "TTreeFileFormat::Inspect", "Adding branch %{public}s to cache", bi.ptr->GetName()); tree->AddBranchToCache(bi.ptr); if (strncmp(bi.ptr->GetName(), "fIndexArray", strlen("fIndexArray")) == 0) { std::string sizeBranchName = bi.ptr->GetName(); sizeBranchName += "_size"; auto* sizeBranch = (TBranch*)tree->GetBranch(sizeBranchName.c_str()); if (sizeBranch) { + O2_SIGNPOST_EVENT_EMIT(ttree_format, sid, "TTreeFileFormat::Inspect", "Adding branch %{public}s to cache", sizeBranch->GetName()); tree->AddBranchToCache(sizeBranch); + } else { + O2_SIGNPOST_EVENT_EMIT(ttree_format, sid, "TTreeFileFormat::Inspect", "Branch %{public}s not added to cache", sizeBranchName.c_str()); } } } tree->StopCacheLearningPhase(); + O2_SIGNPOST_END(ttree_format, sid, "TTreeFileFormat::Inspect", "Done inspection %zu fields found. Using %zu bytes for cache", fields.size(), + (size_t)tree->GetCacheSize()); return std::make_shared(fields); } diff --git a/Framework/Core/include/Framework/RootArrowFilesystem.h b/Framework/Core/include/Framework/RootArrowFilesystem.h index feab713b445fe..415296a1f9850 100644 --- a/Framework/Core/include/Framework/RootArrowFilesystem.h +++ b/Framework/Core/include/Framework/RootArrowFilesystem.h @@ -140,6 +140,8 @@ class TFileFileSystem : public VirtualRootFileSystemBase private: TDirectoryFile* mFile; RootObjectReadingFactory& mObjectFactory; + arrow::dataset::FileSource mCachedSource; + std::shared_ptr mCachedFS; }; class TBufferFileFS : public VirtualRootFileSystemBase diff --git a/Framework/Core/src/RootArrowFilesystem.cxx b/Framework/Core/src/RootArrowFilesystem.cxx index 4a1286515508c..740ad073d3f8d 100644 --- a/Framework/Core/src/RootArrowFilesystem.cxx +++ b/Framework/Core/src/RootArrowFilesystem.cxx @@ -43,6 +43,11 @@ TFileFileSystem::TFileFileSystem(TDirectoryFile* f, size_t readahead, RootObject std::shared_ptr TFileFileSystem::GetSubFilesystem(arrow::dataset::FileSource source) { + // If the filesystem was already found last time we got called, lets return a copy + // cached version of it. + if (mCachedSource.path() == source.path()) { + return mCachedFS; + } // We use a plugin to create the actual objects inside the // file, so that we can support TTree and RNTuple at the same time // without having to depend on both. @@ -53,13 +58,17 @@ std::shared_ptr TFileFileSystem::GetSubFilesystem(arr continue; } if (handle) { - return capability.factory().getSubFilesystem(handle); + mCachedFS = capability.factory().getSubFilesystem(handle); + mCachedSource = source; + return mCachedFS; } } auto directory = (TDirectoryFile*)mFile->GetObjectChecked(source.path().c_str(), TClass::GetClass()); if (directory) { - return std::shared_ptr(new TFileFileSystem(directory, 50 * 1024 * 1024, mObjectFactory)); + mCachedFS = std::shared_ptr(new TFileFileSystem(directory, 50 * 1024 * 1024, mObjectFactory)); + mCachedSource = source; + return mCachedFS; } throw runtime_error_f("Unsupported file layout"); } From b2b9148295fdfa1f6d65b42fefefbe33bbd33039 Mon Sep 17 00:00:00 2001 From: ALICE Action Bot Date: Mon, 27 Jan 2025 15:07:05 +0000 Subject: [PATCH 3/3] Please consider the following formatting changes --- Framework/Core/src/TableTreeHelpers.cxx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Framework/Core/src/TableTreeHelpers.cxx b/Framework/Core/src/TableTreeHelpers.cxx index 2f23c07aea451..19075e84f43ba 100644 --- a/Framework/Core/src/TableTreeHelpers.cxx +++ b/Framework/Core/src/TableTreeHelpers.cxx @@ -533,7 +533,7 @@ void TreeToTable::setLabel(const char* label) mTableLabel = label; } -void TreeToTable::fill(TTree*tree) +void TreeToTable::fill(TTree* tree) { std::vector> columns; std::vector> fields;