9 #include <boost/python.hpp>
11 #include <daq/storage/modules/StorageRootOutputModule.h>
13 #include <framework/io/RootIOUtilities.h>
14 #include <framework/core/FileCatalog.h>
15 #include <framework/core/MetadataService.h>
16 #include <framework/core/RandomNumbers.h>
17 #include <framework/database/Database.h>
19 #include <framework/core/ModuleParam.templateDetails.h>
20 #include <framework/utilities/EnvironmentVariables.h>
22 #include <boost/format.hpp>
23 #include <boost/algorithm/string.hpp>
25 #include <TClonesArray.h>
27 #include <nlohmann/json.hpp>
34 #include <boost/date_time/posix_time/posix_time.hpp>
35 #include <daq/slc/base/ConfigFile.h>
36 #include <daq/slc/psql/PostgreSQLInterface.h>
37 #include <daq/slc/database/DBHandlerException.h>
38 #include <daq/slc/base/StringUtil.h>
43 using namespace RootIOUtilities;
55 m_eventLow(0), m_experimentHigh(0), m_runHigh(0), m_eventHigh(0)
58 setDescription(
"Writes DataStore objects into a .root file. Data is stored in a TTree 'tree' for event-dependent and in 'persistent' for peristent data. You can use RootInput to read the files back into basf2.");
59 setPropertyFlags(c_Output);
62 addParam(
"outputFileName", m_outputFileName,
"Name of the output file. Can be overridden using the -o argument to basf2.",
63 string(
"RootOutput.root"));
64 addParam(
"ignoreCommandLineOverride", m_ignoreCommandLineOverride,
65 "Ignore override of file name via command line argument -o. Useful if you have multiple output modules in one path.",
false);
66 addParam(
"compressionLevel", m_compressionLevel,
67 "0 for no, 1 for low, 9 for high compression. Level 1 usually reduces size by >50%, higher levels have no noticeable effect. On typical hard disks, disabling compression reduces write time by 10-20 %, but almost doubles read time, so you probably should leave this turned on.",
69 addParam(
"compressionAlgorithm", m_compressionAlgorithm,
70 "Set the Compression algorithm. Recommended values are 0 for default, 1 for zlib and 4 for lz4\n\n"
71 ".. versionadded:: release-03-00-00", m_compressionAlgorithm);
72 addParam(
"splitLevel", m_splitLevel,
73 "Branch split level: determines up to which depth object members will be saved in separate sub-branches in the tree. For arrays or objects with custom streamers, -1 is used instead to ensure the streamers are used. The default (99) usually gives the highest read performance with RootInput.",
75 addParam(
"updateFileCatalog", m_updateFileCatalog, R
"DOC(
76 Flag that specifies whether the file metadata catalog is updated or created.
77 This is only necessary in special cases and can always be done afterwards using
78 ``b2file-catalog-add filename.root``"
80 (You can also set the ``BELLE2_FILECATALOG`` environment variable to NONE to get
81 the same effect as setting this to false))DOC", false);
83 vector<string> emptyvector;
84 addParam(c_SteerBranchNames[0], m_branchNames[0],
85 "Names of event durability branches to be saved. Empty means all branches. Objects with c_DontWriteOut flag added here will also be saved. (EventMetaData is always saved)",
87 addParam(c_SteerBranchNames[1], m_branchNames[1],
88 "Names of persistent durability branches to be saved. Empty means all branches. Objects with c_DontWriteOut flag added here will also be saved. (FileMetaData is always saved)",
90 addParam(c_SteerAdditionalBranchNames[0], m_additionalBranchNames[0],
91 "Add additional event branch names without the need to specify all branchnames.",
93 addParam(c_SteerAdditionalBranchNames[1], m_additionalBranchNames[1],
94 "Add additional persistent branch names without the need to specify all branchnames.",
96 addParam(c_SteerExcludeBranchNames[0], m_excludeBranchNames[0],
97 "Names of event durability branches NOT to be saved. Branches also in branchNames are not saved.", emptyvector);
98 addParam(c_SteerExcludeBranchNames[1], m_excludeBranchNames[1],
99 "Names of persistent durability branches NOT to be saved. Branches also in branchNamesPersistent are not saved.", emptyvector);
100 addParam(
"autoFlushSize", m_autoflush,
101 "Value for TTree SetAutoFlush(): a positive value tells ROOT to flush all baskets to disk after n entries, a negative value to flush after -n bytes",
103 addParam(
"autoSaveSize", m_autosave,
104 "Value for TTree SetAutoSave(): a positive value tells ROOT to write the TTree metadata after n entries, a negative value to write the metadata after -n bytes",
106 addParam(
"basketSize", m_basketsize,
"Basketsize for Branches in the Tree in bytes", 32000);
107 addParam(
"additionalDataDescription", m_additionalDataDescription,
"Additional dictionary of "
108 "name->value pairs to be added to the file metadata to describe the data",
109 m_additionalDataDescription);
110 addParam(
"buildIndex", m_buildIndex,
"Build Event Index for faster finding of events by exp/run/event number", m_buildIndex);
111 addParam(
"keepParents", m_keepParents,
"Keep parents files of input files, input files will not be added as output file's parents",
113 addParam(
"outputSplitSize", m_outputSplitSize, R
"DOC(
114 If given split the output file once the file has reached the given size in MB.
115 If set the filename will end in ``.f{index:05d}.root``. So if for example
116 ``outputFileName`` is set to "RootOutput.root" then the files will be named
117 ``RootOutput.f00000.root``, ``RootOutput.f00001.root``,
118 ``RootOutput.f00002.root``, ...
120 All created output files are complete and independent files and can
121 subsequently processed completely independent.
124 The output files will be approximately of the size given by
125 ``outputSplitSize`` but they will be slightly larger since
126 additional information has to be written at the end of the file. If necessary
127 please account for this. Also, using ``buildIndex=False`` might be beneficial
128 to reduce the overshoot.
131 This will set the amount of generated events stored in the file metadata to
132 zero as it is not possible to determine which fraction ends up in which
135 .. versionadded:: release-03-00-00
136 )DOC", m_outputSplitSize);
141 addParam(
"runType", m_runType,
"Run type",
string(
"null"));
142 addParam(
"HLTName", m_HLTName,
"HLT name",
string(
"HLT00"));
143 addParam(
"nDisk", m_nDisk,
"The number of paratitions", 3);
144 addParam(
"skipFirstEvent", m_firstEvent,
"Boolean to skip the first event or not. "
145 "If the module is used inside the hbasf2, like HLT storage, the first event need to be skipped.", m_firstEvent);
146 addParam(
"ramdiskBuffer", m_ramdiskBuffer,
"Boolean to make small ramdisk buffer setup. "
147 "If this is false, assuming the buffer disks are large SSD.", m_ramdiskBuffer);
151 StorageRootOutputModule::~StorageRootOutputModule() =
default;
153 void StorageRootOutputModule::initialize()
157 TTree::SetMaxTreeSize(1000 * 1000 * 100000000000LL);
160 m_eventMetaData.isRequired();
163 if (m_outputSplitSize) {
164 if (*m_outputSplitSize == 0) B2ERROR(
"outputSplitSize must be set to a positive value");
166 if (*m_outputSplitSize >= 1024*1024) B2WARNING(
"outputSplitSize set to " << *m_outputSplitSize <<
" MB, please make sure the units are correct");
168 *m_outputSplitSize *= 1024 * 1024;
174 std::regex protocol(
"^([A-Za-z]*)://");
175 if(std::smatch m; std::regex_search(m_outputFileName, m, protocol)) {
178 m_outputFileName = std::regex_replace(m_outputFileName, protocol,
"");
181 m_regularFile =
false;
190 config.get(
"database.dbname"),
191 config.get(
"database.user"),
192 config.get(
"database.password"),
193 config.getInt(
"database.port"));
198 void StorageRootOutputModule::endRun() {
201 while (m_file) closeFile();
206 void StorageRootOutputModule::openFile()
211 TDirectory* dir = gDirectory;
212 std::filesystem::path out{m_outputFileName};
213 if (m_outputSplitSize) {
219 TUrl fileUrl(m_outputFileName.c_str(), m_regularFile);
220 std::filesystem::path file{fileUrl.GetFile()};
221 file.replace_extension((boost::format(
"f%05d.root") % m_fileIndex).str());
222 fileUrl.SetFile(file.c_str());
224 out = m_regularFile? fileUrl.GetFileAndOptions() : fileUrl.GetUrl();
228 std::filesystem::path out_notmp = out;
229 out = std::filesystem::path{std::string(
"/buffer") + out.generic_string()};
231 m_file = TFile::Open(out.c_str(),
"RECREATE",
"basf2 Event File");
232 if ((!m_file || m_file->IsZombie()) && m_regularFile) {
234 auto dirpath = out.parent_path();
236 if (std::filesystem::create_directories(dirpath)) {
237 B2INFO(
"Created missing directory " << dirpath <<
".");
239 m_file = TFile::Open(out.c_str(),
"RECREATE",
"basf2 Event File");
246 while (!m_file || m_file->IsZombie()) {
247 m_file = TFile::Open(out.c_str(),
"RECREATE",
"basf2 Event File");
248 B2INFO(
"process(" << m_processNumber <<
"), Try open: " << ++openCounter);
251 if (!m_file || m_file->IsZombie()) {
252 B2FATAL(
"Couldn't open file " << out <<
" for writing!");
254 m_file->SetCompressionAlgorithm(m_compressionAlgorithm);
255 m_file->SetCompressionLevel(m_compressionLevel);
257 for (
int durability = 0; durability < DataStore::c_NDurabilityTypes; durability++) {
259 set<string> branchList;
260 for (
const auto& pair : map)
261 branchList.insert(pair.first);
263 branchList =
filterBranches(branchList, m_branchNames[durability], m_excludeBranchNames[durability], durability);
267 m_tree[durability]->SetAutoFlush(m_autoflush);
268 m_tree[durability]->SetAutoSave(m_autosave);
269 for (
auto & iter : map) {
270 const std::string& branchName = iter.first;
272 if (iter.second.dontWriteOut
273 && find(m_branchNames[durability].begin(), m_branchNames[durability].end(), branchName) == m_branchNames[durability].end()
274 && find(m_additionalBranchNames[durability].begin(), m_additionalBranchNames[durability].end(),
275 branchName) == m_additionalBranchNames[durability].end())
278 if (branchList.count(branchName) == 0) {
280 if (((branchName !=
"FileMetaData") || (durability == DataStore::c_Event)) &&
281 ((branchName !=
"EventMetaData") || (durability == DataStore::c_Persistent))) {
287 if(durability == DataStore::c_Persistent and m_outputSplitSize and m_fileIndex==0 and
288 (branchName !=
"FileMetaData" and branchName !=
"ProcessStatistics")) {
289 B2WARNING(
"Persistent branches might not be stored as expected when splitting the output by size" <<
LogVar(
"branch", branchName));
292 TClass* entryClass = iter.second.objClass;
301 if (!entryClass->HasDictionary()) {
302 if (m_fileIndex == 0) {
303 B2WARNING(
"No dictionary found, object will not be saved (This is probably an obsolete class that is still present in the input file.)"
304 <<
LogVar(
"class", entryClass->GetName()) <<
LogVar(
"branch", branchName));
310 B2ERROR(
"The version number in the ClassDef() macro must be at least 1 to enable I/O!" <<
LogVar(
"class", entryClass->GetName()));
313 int splitLevel = m_splitLevel;
315 B2DEBUG(38,
"Class has custom streamer, setting split level -1 for this branch." <<
LogVar(
"class", entryClass->GetName()));
318 if (iter.second.isArray) {
320 static_cast<TClonesArray*
>(iter.second.object)->BypassStreamer(kFALSE);
323 m_tree[durability]->Branch(branchName.c_str(), &iter.second.object, m_basketsize, splitLevel);
324 m_entries[durability].push_back(&iter.second);
325 B2DEBUG(39,
"The branch " << branchName <<
" was created.");
328 if (m_fileIndex == 0) {
330 iter.second.isArray));
336 TBranch* fileMetaDataBranch = m_tree[DataStore::c_Persistent]->GetBranch(
"FileMetaData");
337 if (fileMetaDataBranch) {
338 fileMetaDataBranch->SetAddress(&m_outputFileMetaData);
340 m_tree[DataStore::c_Persistent]->Branch(
"FileMetaData", &m_outputFileMetaData, m_basketsize, m_splitLevel);
344 if (m_outputSplitSize) {
345 B2INFO(getName() <<
": Opened " << (m_fileIndex > 0 ?
"new " :
"") <<
"file for writing" <<
LogVar(
"filename", out));
351 m_db->execute(
"INSERT INTO datafiles_root "
352 "(name, path, host, disk, runtype, expno, runno, compression_level, compression_algorithm, time_open, "
353 "size, nevents, nfullevents, correct_close, renamed, used_for_merge, removed) "
354 "VALUES ('%s', '%s', '%s', '%s','%s', %d, %d, %d, %d, '%s', 0, 0, 0, FALSE, FALSE, FALSE, FALSE);",
355 out_notmp.filename().c_str(), out_notmp.c_str(), m_HLTName.c_str(), m_disk.c_str(),
356 m_runType.c_str(), m_expno, m_runno,
357 m_compressionLevel, m_compressionAlgorithm,
358 (boost::posix_time::to_iso_extended_string(boost::posix_time::microsec_clock::local_time())+std::string(
"+09")).c_str());
365 void StorageRootOutputModule::event()
370 m_firstEvent =
false;
375 if ((m_expno > m_eventMetaData->getExperiment()) ||
376 (m_expno == m_eventMetaData->getExperiment() && m_runno > m_eventMetaData->getRun())) {
382 if (m_expno != m_eventMetaData->getExperiment() || m_runno != m_eventMetaData->getRun()) {
384 while (m_file) closeFile();
391 m_expno = m_eventMetaData->getExperiment();
392 m_runno = m_eventMetaData->getRun();
393 m_processNumber = atoi(getName().substr(0, getName().find(std::string(
"_"))).c_str());
394 m_disk = StringUtil::form(
"disk%02d", (m_processNumber%m_nDisk)+1);
395 m_outputFileName = StringUtil::form(
"/rawdata/%s/belle/Raw/%4.4d/%5.5d/%s.%4.4d.%5.5d.%s.p%02d.root",
396 m_disk.c_str(), m_expno, m_runno, m_runType.c_str(),
397 m_expno, m_runno, m_HLTName.c_str(), m_processNumber);
401 if (!m_file) openFile();
403 if (!m_keepParents) {
404 if (m_fileMetaData) {
405 m_eventMetaData->setParentLfn(m_fileMetaData->getLfn());
410 fillTree(DataStore::c_Event);
412 if (m_fileMetaData) {
414 for (
int iparent = 0; iparent < m_fileMetaData->getNParents(); iparent++) {
415 string lfn = m_fileMetaData->getParent(iparent);
416 if (!lfn.empty() && (m_parentLfns.empty() || (m_parentLfns.back() != lfn))) {
417 m_parentLfns.push_back(lfn);
421 string lfn = m_fileMetaData->getLfn();
422 if (!lfn.empty() && (m_parentLfns.empty() || (m_parentLfns.back() != lfn))) {
423 m_parentLfns.push_back(lfn);
429 unsigned long experiment = m_eventMetaData->getExperiment();
430 unsigned long run = m_eventMetaData->getRun();
431 unsigned long event = m_eventMetaData->getEvent();
432 if (m_experimentLow > m_experimentHigh) {
433 m_experimentLow = m_experimentHigh = experiment;
434 m_runLow = m_runHigh = run;
435 m_eventLow = m_eventHigh = event;
437 if ((experiment < m_experimentLow) || ((experiment == m_experimentLow) && ((run < m_runLow) || ((run == m_runLow)
438 && (event < m_eventLow))))) {
439 m_experimentLow = experiment;
443 if ((experiment > m_experimentHigh) || ((experiment == m_experimentHigh) && ((run > m_runHigh) || ((run == m_runHigh)
444 && (event > m_eventHigh))))) {
445 m_experimentHigh = experiment;
452 if (m_eventMetaData->getErrorFlag() == 0)
456 if (m_outputSplitSize and (uint64_t)m_file->GetEND() > *m_outputSplitSize) {
458 B2INFO(getName() <<
": Output size limit reached, closing file ...");
461 while (m_file) closeFile();
465 void StorageRootOutputModule::fillFileMetaData()
467 bool isMC = (m_fileMetaData) ? m_fileMetaData->isMC() :
true;
476 if (m_tree[DataStore::c_Event]) {
478 TTree* tree = m_tree[DataStore::c_Event];
479 unsigned long numEntries = tree->GetEntries();
480 m_outputFileMetaData->setNFullEvents(m_nFullEvents);
481 if (m_buildIndex && numEntries > 0) {
482 if (numEntries > 10000000) {
484 B2WARNING(
"Not building TTree index because of large number of events. The index object would conflict with ROOT limits on object size and cause problems.");
485 }
else if (tree->GetBranch(
"EventMetaData")) {
486 tree->SetBranchAddress(
"EventMetaData",
nullptr);
487 RootIOUtilities::buildIndex(tree);
491 m_outputFileMetaData->setNEvents(numEntries);
492 if (m_experimentLow > m_experimentHigh) {
494 m_outputFileMetaData->setLow(-1, -1, 0);
495 m_outputFileMetaData->setHigh(-1, -1, 0);
497 m_outputFileMetaData->setLow(m_experimentLow, m_runLow, m_eventLow);
498 m_outputFileMetaData->setHigh(m_experimentHigh, m_runHigh, m_eventHigh);
503 m_outputFileMetaData->setParents(m_parentLfns);
504 RootIOUtilities::setCreationData(*m_outputFileMetaData);
505 m_outputFileMetaData->setRandomSeed(RandomNumbers::getSeed());
506 m_outputFileMetaData->setSteering(Environment::Instance().getSteering());
507 auto mcEvents = Environment::Instance().getNumberOfMCEvents();
508 if(m_outputSplitSize and mcEvents > 0) {
509 if(m_fileIndex == 0) B2WARNING(
"Number of MC Events cannot be saved when splitting output files by size, setting to 0");
512 m_outputFileMetaData->setMcEvents(mcEvents);
513 m_outputFileMetaData->setDatabaseGlobalTag(Database::Instance().getGlobalTags());
514 for (
const auto& item : m_additionalDataDescription) {
515 m_outputFileMetaData->setDataDescription(item.first, item.second);
518 std::string lfn = m_file->GetName();
520 lfn = std::filesystem::absolute(lfn).string();
523 std::string format = EnvironmentVariables::get(
"BELLE2_LFN_FORMATSTRING",
"");
524 if (!format.empty()) {
525 auto format_filename = boost::python::import(
"B2Tools.format").attr(
"format_filename");
526 lfn = boost::python::extract<std::string>(format_filename(format, m_outputFileName, m_outputFileMetaData->getJsonStr()));
528 m_outputFileMetaData->setLfn(lfn);
530 if (m_updateFileCatalog) {
531 FileCatalog::Instance().registerFile(m_file->GetName(), *m_outputFileMetaData);
536 void StorageRootOutputModule::terminate()
540 while (m_file) closeFile();
543 void StorageRootOutputModule::closeFile()
550 fillTree(DataStore::c_Persistent);
553 TDirectory* dir = gDirectory;
555 for (
int durability = 0; durability < DataStore::c_NDurabilityTypes; ++durability) {
556 if (m_tree[durability]) {
557 B2DEBUG(30,
"Write TTree " <<
c_treeNames[durability]);
558 m_tree[durability]->Write(
c_treeNames[durability].c_str(), TObject::kWriteDelete);
559 delete m_tree[durability];
561 m_tree[durability] =
nullptr;
565 const std::string filename = m_file->GetName();
566 if (m_outputSplitSize) {
567 B2INFO(getName() <<
": Finished writing file." <<
LogVar(
"filename", filename));
571 const std::filesystem::path filename_path{filename};
572 std::string filename_notmp = m_file->GetName();
573 filename_notmp.erase(0, 7);
574 const std::filesystem::path filename_notmp_path{filename_notmp};
577 m_db->execute(
"UPDATE datafiles_root SET "
578 "correct_close = TRUE, "
581 "nfullevents = %lu, "
584 "WHERE name = '%s' AND host = '%s';",
585 ((boost::optional<uint64_t>)(m_file->GetSize()*2) < m_outputSplitSize || m_ramdiskBuffer) ?
"true" :
"false",
586 m_outputFileMetaData->getNEvents(), m_outputFileMetaData->getNFullEvents(), m_file->GetSize(),
587 (boost::posix_time::to_iso_extended_string(boost::posix_time::microsec_clock::local_time())+std::string(
"+09")).c_str(),
588 filename_path.filename().c_str(), m_HLTName.c_str());
598 MetadataService::Instance().addRootOutputFile(filename, m_outputFileMetaData);
601 for (
auto & entry : m_entries) {
604 m_parentLfns.clear();
606 m_experimentHigh = 0;
615 if (m_ramdiskBuffer) {
616 const std::string rsync_cmd = std::string(
"/usr/bin/rsync -a --remove-source-files --recursive ") + filename + std::string(
" ") + filename_notmp_path.parent_path().generic_string() + std::string(
"/ &");
617 B2INFO(getName() <<
": system(" << rsync_cmd <<
")");
618 system(rsync_cmd.c_str());
625 if (!m_tree[durability])
return;
627 TTree& tree = *m_tree[durability];
628 for(
auto* entry: m_entries[durability]) {
632 entry->object->SetBit(kInvalidObject);
635 if (entry->name ==
"FileMetaData") {
636 tree.SetBranchAddress(entry->name.c_str(), &m_outputFileMetaData);
638 tree.SetBranchAddress(entry->name.c_str(), &entry->object);
642 for (
auto* entry: m_entries[durability]) {
643 entry->object->ResetBit(kInvalidObject);
646 const bool writeError = m_file->TestBit(TFile::kWriteError);
649 const std::string filename = m_file->GetName();
651 B2FATAL(
"A write error occured while saving '" << filename <<
"', please check if enough disk space is available.");
EDurability
Durability types.
std::map< std::string, StoreEntry > StoreEntryMap
Map for StoreEntries.
Write objects from DataStore into a ROOT file.
Base class for StoreObjPtr and StoreArray for easier common treatment.
Class to store variables with their name which were sent to the logging service.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
bool hasCustomStreamer(const TClass *cl)
Returns true if and only if 'cl' has a user-defined streamer.
const std::string c_treeNames[]
Names of trees.
std::set< std::string > filterBranches(const std::set< std::string > &branchesToFilter, const std::vector< std::string > &branches, const std::vector< std::string > &excludeBranches, int durability, bool quiet=false)
Given a list of input branches and lists of branches to include/exclude, returns a list of branches t...
bool hasStreamer(const TClass *cl)
Returns true if and only if 'cl' or one of its bases has I/O streamers.
Abstract base class for different kinds of events.