diff --git a/Framework/Core/include/Framework/DataOutputDirector.h b/Framework/Core/include/Framework/DataOutputDirector.h index bdcb8faf976c0..ee8166eb7f9c5 100644 --- a/Framework/Core/include/Framework/DataOutputDirector.h +++ b/Framework/Core/include/Framework/DataOutputDirector.h @@ -70,14 +70,16 @@ struct DataOutputDirector { void readSpecs(std::vector inputs); // fill the DataOutputDirector with information from a json file - std::tuple readJson(std::string const& fnjson); - std::tuple readJsonString(std::string const& stjson); + std::tuple readJson(std::string const& fnjson); + std::tuple readJsonString(std::string const& stjson); // read/write private members int getNumberTimeFramesToMerge() { return mnumberTimeFramesToMerge; } void setNumberTimeFramesToMerge(int ntfmerge) { mnumberTimeFramesToMerge = ntfmerge > 0 ? ntfmerge : 1; } std::string getFileMode() { return mfileMode; } void setFileMode(std::string filemode) { mfileMode = filemode; } + uint64_t getDFOffset() { return dataFrameOffset; } + void setDFOffset(uint64_t offset) { offset > 0 ? dataFrameOffset = offset : 0; } // get matching DataOutputDescriptors std::vector getDataOutputDescriptors(header::DataHeader dh); @@ -111,10 +113,11 @@ struct DataOutputDirector { int mfileCounter = 1; float mmaxfilesize = -1.; int mnumberTimeFramesToMerge = 1; + uint64_t dataFrameOffset = 0; std::string mfileMode = "RECREATE"; - std::tuple readJsonDocument(Document* doc); - const std::tuple memptyanswer = std::make_tuple(std::string(""), std::string(""), std::string(""), -1., -1); + std::tuple readJsonDocument(Document* doc); + const std::tuple memptyanswer = std::make_tuple(std::string(""), std::string(""), std::string(""), -1., -1, 0); }; } // namespace o2::framework diff --git a/Framework/Core/src/AnalysisSupportHelpers.cxx b/Framework/Core/src/AnalysisSupportHelpers.cxx index 3613bfedb887a..5a101ceef2575 100644 --- a/Framework/Core/src/AnalysisSupportHelpers.cxx +++ b/Framework/Core/src/AnalysisSupportHelpers.cxx @@ -426,7 +426,7 @@ DataProcessorSpec // get TF number from startTime auto it = tfNumbers.find(startTime); if (it != tfNumbers.end()) { - tfNumber = (it->second / dod->getNumberTimeFramesToMerge()) * dod->getNumberTimeFramesToMerge(); + tfNumber = (it->second / dod->getNumberTimeFramesToMerge()) * dod->getNumberTimeFramesToMerge() + dod->getDFOffset(); } else { LOGP(fatal, "No time frame number found for output with start time {}", startTime); throw std::runtime_error("Processing is stopped!"); diff --git a/Framework/Core/src/DataOutputDirector.cxx b/Framework/Core/src/DataOutputDirector.cxx index 4b803e1050817..4d90dc7da8930 100644 --- a/Framework/Core/src/DataOutputDirector.cxx +++ b/Framework/Core/src/DataOutputDirector.cxx @@ -215,7 +215,7 @@ void DataOutputDirector::readSpecs(std::vector inputs) } } -std::tuple DataOutputDirector::readJson(std::string const& fnjson) +std::tuple DataOutputDirector::readJson(std::string const& fnjson) { // open the file FILE* fjson = fopen(fnjson.c_str(), "r"); @@ -231,28 +231,26 @@ std::tuple DataOutputDirector // parse the json file Document jsonDocument; jsonDocument.ParseStream(jsonStream); - auto [rdn, dfn, fmode, mfs, ntfm] = readJsonDocument(&jsonDocument); + auto [rdn, dfn, fmode, mfs, ntfm, offset] = readJsonDocument(&jsonDocument); // clean up fclose(fjson); - return std::make_tuple(rdn, dfn, fmode, mfs, ntfm); + return std::make_tuple(rdn, dfn, fmode, mfs, ntfm, offset); } -std::tuple DataOutputDirector::readJsonString(std::string const& jsonString) +std::tuple DataOutputDirector::readJsonString(std::string const& jsonString) { // parse the json string Document jsonDocument; jsonDocument.Parse(jsonString.c_str()); - auto [rdn, dfn, fmode, mfs, ntfm] = readJsonDocument(&jsonDocument); + auto [rdn, dfn, fmode, mfs, ntfm, offset] = readJsonDocument(&jsonDocument); - return std::make_tuple(rdn, dfn, fmode, mfs, ntfm); + return std::make_tuple(rdn, dfn, fmode, mfs, ntfm, offset); } -std::tuple DataOutputDirector::readJsonDocument(Document* jsonDocument) +std::tuple DataOutputDirector::readJsonDocument(Document* jsonDocument) { - std::string smc(":"); - std::string slh("/"); const char* itemName; // initialisations @@ -261,6 +259,7 @@ std::tuple DataOutputDirector std::string fmode(""); float maxfs = -1.; int ntfm = -1; + uint64_t offset = 0; // is it a proper json document? if (jsonDocument->HasParseError()) { @@ -351,8 +350,21 @@ std::tuple DataOutputDirector } } + itemName = "offset"; + if (dodirItem.HasMember(itemName)) { + if (dodirItem[itemName].IsNumber()) { + offset = dodirItem[itemName].GetUint64(); + setDFOffset(offset); + } else { + LOGP(error, "Check the JSON document! Item \"{}\" must be a number!", itemName); + return memptyanswer; + } + } + itemName = "OutputDescriptors"; if (dodirItem.HasMember(itemName)) { + std::string slh("/"); + std::string smc(":"); if (!dodirItem[itemName].IsArray()) { LOGP(error, "Check the JSON document! Item \"{}\" must be an array!", itemName); return memptyanswer; @@ -419,7 +431,7 @@ std::tuple DataOutputDirector printOut(); } - return std::make_tuple(resdir, dfn, fmode, maxfs, ntfm); + return std::make_tuple(resdir, dfn, fmode, maxfs, ntfm, offset); } std::vector DataOutputDirector::getDataOutputDescriptors(header::DataHeader dh) diff --git a/Framework/Core/src/WorkflowCustomizationHelpers.cxx b/Framework/Core/src/WorkflowCustomizationHelpers.cxx index 05abb5dab98cd..7440510cd38c7 100644 --- a/Framework/Core/src/WorkflowCustomizationHelpers.cxx +++ b/Framework/Core/src/WorkflowCustomizationHelpers.cxx @@ -66,6 +66,7 @@ std::vector WorkflowCustomizationHelpers::requiredWorkflowOptio {"aod-writer-resmode", VariantType::String, "RECREATE", {"Creation mode of the result files: NEW, CREATE, RECREATE, UPDATE"}}, {"aod-writer-ntfmerge", VariantType::Int, -1, {"Number of time frames to merge into one file"}}, {"aod-writer-keep", VariantType::String, "", {"Comma separated list of ORIGIN/DESCRIPTION/SUBSPECIFICATION:treename:col1/col2/..:filename"}}, + {"aod-writer-df-offset", VariantType::UInt64, 0UL, {"Offset for dataframe numbering"}}, {"fairmq-rate-logging", VariantType::Int, 0, {"Rate logging for FairMQ channels"}}, {"fairmq-recv-buffer-size", VariantType::Int, 4, {"recvBufferSize option for FairMQ channels"}}, diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index 6349bd5889eba..11abe2a1c774d 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -994,12 +994,13 @@ std::shared_ptr WorkflowHelpers::getDataOutputDirector(Confi float mfs, maxfilesize(-1.); std::string fmo, filemode("RECREATE"); int ntfm, ntfmerge = 1; + uint64_t offset = 0; // values from json if (options.isSet("aod-writer-json")) { auto fnjson = options.get("aod-writer-json"); if (!fnjson.empty()) { - std::tie(rdn, fnb, fmo, mfs, ntfm) = dod->readJson(fnjson); + std::tie(rdn, fnb, fmo, mfs, ntfm, offset) = dod->readJson(fnjson); if (!rdn.empty()) { resdir = rdn; } @@ -1070,11 +1071,18 @@ std::shared_ptr WorkflowHelpers::getDataOutputDirector(Confi } } } + if (options.isSet("aod-writer-df-offset")) { + auto off = options.get("aod-writer-df-offset"); + if (off > 0) { + offset = off; + } + } dod->setResultDir(resdir); dod->setFilenameBase(fnbase); dod->setFileMode(filemode); dod->setMaximumFileSize(maxfilesize); dod->setNumberTimeFramesToMerge(ntfmerge); + dod->setDFOffset(offset); return dod; } diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index 58e669e127f03..0faa535f0b671 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -2027,6 +2027,7 @@ int runStateMachine(DataProcessorSpecs const& workflow, "--aod-writer-resmode", "--aod-writer-maxfilesize", "--aod-writer-keep", + "--aod-writer-df-offset", "--aod-max-io-rate", "--aod-parent-access-level", "--aod-parent-base-path-replacement", diff --git a/Framework/Core/test/test_DataOutputDirector.cxx b/Framework/Core/test/test_DataOutputDirector.cxx index 1834dd0781897..40461edb453f9 100644 --- a/Framework/Core/test/test_DataOutputDirector.cxx +++ b/Framework/Core/test/test_DataOutputDirector.cxx @@ -50,6 +50,7 @@ TEST_CASE("TestDataOutputDirector") std::string fmode(""); float mfs = -1.; int ntf = -1; + uint64_t offset = 0; dh = DataHeader(DataDescription{"DUE"}, DataOrigin{"AOD"}, @@ -57,7 +58,7 @@ TEST_CASE("TestDataOutputDirector") std::string jsonString(R"({"OutputDirector": {"resfile": "defresults", "resfilemode": "RECREATE", "ntfmerge": 10, "OutputDescriptors": [{"table": "AOD/UNO/0", "columns": ["fEta1","fMom1"], "treename": "uno", "filename": "unoresults"}, {"table": "AOD/DUE/0", "columns": ["fPhi2"], "treename": "due"}]}})"); dod.reset(); - std::tie(rdn, dfn, fmode, mfs, ntf) = dod.readJsonString(jsonString); + std::tie(rdn, dfn, fmode, mfs, ntf, offset) = dod.readJsonString(jsonString); ds = dod.getDataOutputDescriptors(dh); REQUIRE(ds.size() == 1); @@ -78,6 +79,7 @@ TEST_CASE("TestDataOutputDirector") jf << R"( "resfile": "defresults",)" << std::endl; jf << R"( "resfilemode": "NEW",)" << std::endl; jf << R"( "ntfmerge": 10,)" << std::endl; + jf << R"( "offset": 10,)" << std::endl; jf << R"( "OutputDescriptors": [)" << std::endl; jf << R"( {)" << std::endl; jf << R"( "table": "AOD/DUE/0",)" << std::endl; @@ -102,7 +104,7 @@ TEST_CASE("TestDataOutputDirector") jf.close(); dod.reset(); - std::tie(rdn, dfn, fmode, mfs, ntf) = dod.readJson(jsonFile); + std::tie(rdn, dfn, fmode, mfs, ntf, offset) = dod.readJson(jsonFile); dod.setFilenameBase("AnalysisResults"); ds = dod.getDataOutputDescriptors(dh); @@ -110,6 +112,7 @@ TEST_CASE("TestDataOutputDirector") REQUIRE(dfn == std::string("defresults")); REQUIRE(fmode == std::string("NEW")); REQUIRE(ntf == 10); + REQUIRE(offset == 10); REQUIRE(ds[0]->getFilenameBase() == std::string("unoresults")); REQUIRE(ds[0]->tablename == std::string("DUE"));