9 #include <boost/python.hpp>
11 #include <daq/storage/modules/StorageRootOutputModule.h>
13 #include <framework/io/RootIOUtilities.h>
14 #include <framework/core/FileCatalog.h>
15 #include <framework/core/MetadataService.h>
16 #include <framework/core/RandomNumbers.h>
17 #include <framework/database/Database.h>
19 #include <framework/core/ModuleParam.templateDetails.h>
20 #include <framework/utilities/EnvironmentVariables.h>
22 #include <boost/format.hpp>
23 #include <boost/algorithm/string.hpp>
25 #include <TClonesArray.h>
27 #include <nlohmann/json.hpp>
34 #include <boost/date_time/posix_time/posix_time.hpp>
35 #include <daq/slc/base/ConfigFile.h>
36 #include <daq/slc/psql/PostgreSQLInterface.h>
37 #include <daq/slc/database/DBHandlerException.h>
38 #include <daq/slc/base/StringUtil.h>
43 using namespace RootIOUtilities;
55 m_eventLow(0), m_experimentHigh(0), m_runHigh(0), m_eventHigh(0)
58 setDescription(
"Writes DataStore objects into a .root file. Data is stored in a TTree 'tree' for event-dependent and in 'persistent' for peristent data. You can use RootInput to read the files back into basf2.");
59 setPropertyFlags(c_Output);
62 addParam(
"outputFileName", m_outputFileName,
"Name of the output file. Can be overridden using the -o argument to basf2.",
63 string(
"RootOutput.root"));
64 addParam(
"ignoreCommandLineOverride", m_ignoreCommandLineOverride,
65 "Ignore override of file name via command line argument -o. Useful if you have multiple output modules in one path.",
false);
66 addParam(
"compressionLevel", m_compressionLevel,
67 "0 for no, 1 for low, 9 for high compression. Level 1 usually reduces size by >50%, higher levels have no noticeable effect. On typical hard disks, disabling compression reduces write time by 10-20 %, but almost doubles read time, so you probably should leave this turned on.",
69 addParam(
"compressionAlgorithm", m_compressionAlgorithm,
70 "Set the Compression algorithm. Recommended values are 0 for default, 1 for zlib and 4 for lz4\n\n"
71 ".. versionadded:: release-03-00-00", m_compressionAlgorithm);
72 addParam(
"splitLevel", m_splitLevel,
73 "Branch split level: determines up to which depth object members will be saved in separate sub-branches in the tree. For arrays or objects with custom streamers, -1 is used instead to ensure the streamers are used. The default (99) usually gives the highest read performance with RootInput.",
75 addParam(
"updateFileCatalog", m_updateFileCatalog, R
"DOC(
76 Flag that specifies whether the file metadata catalog is updated or created.
77 This is only necessary in special cases and can always be done afterwards using
78 ``b2file-catalog-add filename.root``"
80 (You can also set the ``BELLE2_FILECATALOG`` environment variable to NONE to get
81 the same effect as setting this to false))DOC", false);
83 vector<string> emptyvector;
84 addParam(c_SteerBranchNames[0], m_branchNames[0],
85 "Names of event durability branches to be saved. Empty means all branches. Objects with c_DontWriteOut flag added here will also be saved. (EventMetaData is always saved)",
87 addParam(c_SteerBranchNames[1], m_branchNames[1],
88 "Names of persistent durability branches to be saved. Empty means all branches. Objects with c_DontWriteOut flag added here will also be saved. (FileMetaData is always saved)",
90 addParam(c_SteerAdditionalBranchNames[0], m_additionalBranchNames[0],
91 "Add additional event branch names without the need to specify all branchnames.",
93 addParam(c_SteerAdditionalBranchNames[1], m_additionalBranchNames[1],
94 "Add additional persistent branch names without the need to specify all branchnames.",
96 addParam(c_SteerExcludeBranchNames[0], m_excludeBranchNames[0],
97 "Names of event durability branches NOT to be saved. Branches also in branchNames are not saved.", emptyvector);
98 addParam(c_SteerExcludeBranchNames[1], m_excludeBranchNames[1],
99 "Names of persistent durability branches NOT to be saved. Branches also in branchNamesPersistent are not saved.", emptyvector);
100 addParam(
"autoFlushSize", m_autoflush,
101 "Value for TTree SetAutoFlush(): a positive value tells ROOT to flush all baskets to disk after n entries, a negative value to flush after -n bytes",
103 addParam(
"autoSaveSize", m_autosave,
104 "Value for TTree SetAutoSave(): a positive value tells ROOT to write the TTree metadata after n entries, a negative value to write the metadata after -n bytes",
106 addParam(
"basketSize", m_basketsize,
"Basketsize for Branches in the Tree in bytes", 32000);
107 addParam(
"additionalDataDescription", m_additionalDataDescription,
"Additional dictionary of "
108 "name->value pairs to be added to the file metadata to describe the data",
109 m_additionalDataDescription);
110 addParam(
"buildIndex", m_buildIndex,
"Build Event Index for faster finding of events by exp/run/event number", m_buildIndex);
111 addParam(
"keepParents", m_keepParents,
"Keep parents files of input files, input files will not be added as output file's parents",
113 addParam(
"outputSplitSize", m_outputSplitSize, R
"DOC(
114 If given split the output file once the file has reached the given size in MB.
115 If set the filename will end in ``.f{index:05d}.root``. So if for example
116 ``outputFileName`` is set to "RootOutput.root" then the files will be named
117 ``RootOutput.f00000.root``, ``RootOutput.f00001.root``,
118 ``RootOutput.f00002.root``, ...
120 All created output files are complete and independent files and can
121 subsequently processed completely independent.
124 The output files will be approximately of the size given by
125 ``outputSplitSize`` but they will be slightly larger since
126 additional information has to be written at the end of the file. If necessary
127 please account for this. Also, using ``buildIndex=False`` might be beneficial
128 to reduce the overshoot.
131 This will set the amount of generated events stored in the file metadata to
132 zero as it is not possible to determine which fraction ends up in which
135 .. versionadded:: release-03-00-00
136 )DOC", m_outputSplitSize);
141 addParam(
"runType", m_runType,
"Run type",
string(
"null"));
142 addParam(
"HLTName", m_HLTName,
"HLT name",
string(
"HLT00"));
143 addParam(
"nDisk", m_nDisk,
"The number of paratitions", 3);
144 addParam(
"skipFirstEvent", m_firstEvent,
"Boolean to skip the first event or not. "
145 "If the module is used inside the hbasf2, like HLT storage, the first event need to be skipped.", m_firstEvent);
146 addParam(
"ramdiskBuffer", m_ramdiskBuffer,
"Boolean to make small ramdisk buffer setup. "
147 "If this is false, assuming the buffer disks are large SSD.", m_ramdiskBuffer);
151 StorageRootOutputModule::~StorageRootOutputModule()
153 delete m_outputFileMetaData;
156 void StorageRootOutputModule::initialize()
160 TTree::SetMaxTreeSize(1000 * 1000 * 100000000000LL);
163 m_eventMetaData.isRequired();
166 if (m_outputSplitSize) {
167 if (*m_outputSplitSize == 0) B2ERROR(
"outputSplitSize must be set to a positive value");
169 if (*m_outputSplitSize >= 1024*1024) B2WARNING(
"outputSplitSize set to " << *m_outputSplitSize <<
" MB, please make sure the units are correct");
171 *m_outputSplitSize *= 1024 * 1024;
177 std::regex protocol(
"^([A-Za-z]*)://");
178 if(std::smatch m; std::regex_search(m_outputFileName, m, protocol)) {
181 m_outputFileName = std::regex_replace(m_outputFileName, protocol,
"");
184 m_regularFile =
false;
193 config.get(
"database.dbname"),
194 config.get(
"database.user"),
195 config.get(
"database.password"),
196 config.getInt(
"database.port"));
201 void StorageRootOutputModule::endRun() {
204 while (m_file) closeFile();
210 void StorageRootOutputModule::openFile()
215 TDirectory* dir = gDirectory;
216 std::filesystem::path out{m_outputFileName};
217 if (m_outputSplitSize) {
223 TUrl fileUrl(m_outputFileName.c_str(), m_regularFile);
224 std::filesystem::path file{fileUrl.GetFile()};
225 file.replace_extension((boost::format(
"f%05d.root") % m_fileIndex).str());
226 fileUrl.SetFile(file.c_str());
228 out = m_regularFile? fileUrl.GetFileAndOptions() : fileUrl.GetUrl();
232 std::filesystem::path out_notmp = out;
233 out = std::filesystem::path{std::string(
"/buffer") + out.generic_string()};
235 m_file = TFile::Open(out.c_str(),
"RECREATE",
"basf2 Event File");
236 if ((!m_file || m_file->IsZombie()) && m_regularFile) {
238 auto dirpath = out.parent_path();
240 if (std::filesystem::create_directories(dirpath)) {
241 B2INFO(
"Created missing directory " << dirpath <<
".");
243 m_file = TFile::Open(out.c_str(),
"RECREATE",
"basf2 Event File");
250 while (!m_file || m_file->IsZombie()) {
251 m_file = TFile::Open(out.c_str(),
"RECREATE",
"basf2 Event File");
252 B2INFO(
"process(" << m_processNumber <<
"), Try open: " << ++openCounter);
255 if (!m_file || m_file->IsZombie()) {
256 B2FATAL(
"Couldn't open file " << out <<
" for writing!");
258 m_file->SetCompressionAlgorithm(m_compressionAlgorithm);
259 m_file->SetCompressionLevel(m_compressionLevel);
261 for (
int durability = 0; durability < DataStore::c_NDurabilityTypes; durability++) {
263 set<string> branchList;
264 for (
const auto& pair : map)
265 branchList.insert(pair.first);
267 branchList =
filterBranches(branchList, m_branchNames[durability], m_excludeBranchNames[durability], durability);
271 m_tree[durability]->SetAutoFlush(m_autoflush);
272 m_tree[durability]->SetAutoSave(m_autosave);
273 for (
auto & iter : map) {
274 const std::string& branchName = iter.first;
276 if (iter.second.dontWriteOut
277 && find(m_branchNames[durability].begin(), m_branchNames[durability].end(), branchName) == m_branchNames[durability].end()
278 && find(m_additionalBranchNames[durability].begin(), m_additionalBranchNames[durability].end(),
279 branchName) == m_additionalBranchNames[durability].end())
282 if (branchList.count(branchName) == 0) {
284 if (((branchName !=
"FileMetaData") || (durability == DataStore::c_Event)) &&
285 ((branchName !=
"EventMetaData") || (durability == DataStore::c_Persistent))) {
291 if(durability == DataStore::c_Persistent and m_outputSplitSize and m_fileIndex==0 and
292 (branchName !=
"FileMetaData" and branchName !=
"ProcessStatistics")) {
293 B2WARNING(
"Persistent branches might not be stored as expected when splitting the output by size" <<
LogVar(
"branch", branchName));
296 TClass* entryClass = iter.second.objClass;
305 if (!entryClass->HasDictionary()) {
306 if (m_fileIndex == 0) {
307 B2WARNING(
"No dictionary found, object will not be saved (This is probably an obsolete class that is still present in the input file.)"
308 <<
LogVar(
"class", entryClass->GetName()) <<
LogVar(
"branch", branchName));
314 B2ERROR(
"The version number in the ClassDef() macro must be at least 1 to enable I/O!" <<
LogVar(
"class", entryClass->GetName()));
317 int splitLevel = m_splitLevel;
319 B2DEBUG(38,
"Class has custom streamer, setting split level -1 for this branch." <<
LogVar(
"class", entryClass->GetName()));
322 if (iter.second.isArray) {
324 static_cast<TClonesArray*
>(iter.second.object)->BypassStreamer(kFALSE);
327 m_tree[durability]->Branch(branchName.c_str(), &iter.second.object, m_basketsize, splitLevel);
328 m_entries[durability].push_back(&iter.second);
329 B2DEBUG(39,
"The branch " << branchName <<
" was created.");
332 if (m_fileIndex == 0) {
334 iter.second.isArray));
340 TBranch* fileMetaDataBranch = m_tree[DataStore::c_Persistent]->GetBranch(
"FileMetaData");
341 if (fileMetaDataBranch) {
342 fileMetaDataBranch->SetAddress(&m_outputFileMetaData);
344 m_tree[DataStore::c_Persistent]->Branch(
"FileMetaData", &m_outputFileMetaData, m_basketsize, m_splitLevel);
348 if (m_outputSplitSize) {
349 B2INFO(getName() <<
": Opened " << (m_fileIndex > 0 ?
"new " :
"") <<
"file for writing" <<
LogVar(
"filename", out));
355 m_db->execute(
"INSERT INTO datafiles_root "
356 "(name, path, host, disk, runtype, expno, runno, compression_level, compression_algorithm, time_open, "
357 "size, nevents, nfullevents, correct_close, renamed, used_for_merge, removed) "
358 "VALUES ('%s', '%s', '%s', '%s','%s', %d, %d, %d, %d, '%s', 0, 0, 0, FALSE, FALSE, FALSE, FALSE);",
359 out_notmp.filename().c_str(), out_notmp.c_str(), m_HLTName.c_str(), m_disk.c_str(),
360 m_runType.c_str(), m_expno, m_runno,
361 m_compressionLevel, m_compressionAlgorithm,
362 (boost::posix_time::to_iso_extended_string(boost::posix_time::microsec_clock::local_time())+std::string(
"+09")).c_str());
369 void StorageRootOutputModule::event()
374 m_firstEvent =
false;
379 if ((m_expno > m_eventMetaData->getExperiment()) ||
380 (m_expno == m_eventMetaData->getExperiment() && m_runno > m_eventMetaData->getRun())) {
386 if (m_expno != m_eventMetaData->getExperiment() || m_runno != m_eventMetaData->getRun()) {
388 while (m_file) closeFile();
395 m_expno = m_eventMetaData->getExperiment();
396 m_runno = m_eventMetaData->getRun();
397 m_processNumber = atoi(getName().substr(0, getName().find(std::string(
"_"))).c_str());
398 m_disk = StringUtil::form(
"disk%02d", (m_processNumber%m_nDisk)+1);
399 m_outputFileName = StringUtil::form(
"/rawdata/%s/belle/Raw/%4.4d/%5.5d/%s.%4.4d.%5.5d.%s.p%02d.root",
400 m_disk.c_str(), m_expno, m_runno, m_runType.c_str(),
401 m_expno, m_runno, m_HLTName.c_str(), m_processNumber);
405 if (!m_file) openFile();
407 if (!m_keepParents) {
408 if (m_fileMetaData) {
409 m_eventMetaData->setParentLfn(m_fileMetaData->getLfn());
414 fillTree(DataStore::c_Event);
416 if (m_fileMetaData) {
418 for (
int iparent = 0; iparent < m_fileMetaData->getNParents(); iparent++) {
419 string lfn = m_fileMetaData->getParent(iparent);
420 if (!lfn.empty() && (m_parentLfns.empty() || (m_parentLfns.back() != lfn))) {
421 m_parentLfns.push_back(lfn);
425 string lfn = m_fileMetaData->getLfn();
426 if (!lfn.empty() && (m_parentLfns.empty() || (m_parentLfns.back() != lfn))) {
427 m_parentLfns.push_back(lfn);
433 unsigned long experiment = m_eventMetaData->getExperiment();
434 unsigned long run = m_eventMetaData->getRun();
435 unsigned long event = m_eventMetaData->getEvent();
436 if (m_experimentLow > m_experimentHigh) {
437 m_experimentLow = m_experimentHigh = experiment;
438 m_runLow = m_runHigh = run;
439 m_eventLow = m_eventHigh = event;
441 if ((experiment < m_experimentLow) || ((experiment == m_experimentLow) && ((run < m_runLow) || ((run == m_runLow)
442 && (event < m_eventLow))))) {
443 m_experimentLow = experiment;
447 if ((experiment > m_experimentHigh) || ((experiment == m_experimentHigh) && ((run > m_runHigh) || ((run == m_runHigh)
448 && (event > m_eventHigh))))) {
449 m_experimentHigh = experiment;
456 if (m_eventMetaData->getErrorFlag() == 0)
460 if (m_outputSplitSize and (uint64_t)m_file->GetEND() > *m_outputSplitSize) {
462 B2INFO(getName() <<
": Output size limit reached, closing file ...");
465 while (m_file) closeFile();
469 void StorageRootOutputModule::fillFileMetaData()
471 bool isMC = (m_fileMetaData) ? m_fileMetaData->isMC() :
true;
480 if (m_tree[DataStore::c_Event]) {
482 TTree* tree = m_tree[DataStore::c_Event];
483 unsigned long numEntries = tree->GetEntries();
484 m_outputFileMetaData->setNFullEvents(m_nFullEvents);
485 if (m_buildIndex && numEntries > 0) {
486 if (numEntries > 10000000) {
488 B2WARNING(
"Not building TTree index because of large number of events. The index object would conflict with ROOT limits on object size and cause problems.");
489 }
else if (tree->GetBranch(
"EventMetaData")) {
490 tree->SetBranchAddress(
"EventMetaData",
nullptr);
491 RootIOUtilities::buildIndex(tree);
495 m_outputFileMetaData->setNEvents(numEntries);
496 if (m_experimentLow > m_experimentHigh) {
498 m_outputFileMetaData->setLow(-1, -1, 0);
499 m_outputFileMetaData->setHigh(-1, -1, 0);
501 m_outputFileMetaData->setLow(m_experimentLow, m_runLow, m_eventLow);
502 m_outputFileMetaData->setHigh(m_experimentHigh, m_runHigh, m_eventHigh);
507 m_outputFileMetaData->setParents(m_parentLfns);
508 RootIOUtilities::setCreationData(*m_outputFileMetaData);
509 m_outputFileMetaData->setRandomSeed(RandomNumbers::getSeed());
510 m_outputFileMetaData->setSteering(Environment::Instance().getSteering());
511 auto mcEvents = Environment::Instance().getNumberOfMCEvents();
512 if(m_outputSplitSize and mcEvents > 0) {
513 if(m_fileIndex == 0) B2WARNING(
"Number of MC Events cannot be saved when splitting output files by size, setting to 0");
516 m_outputFileMetaData->setMcEvents(mcEvents);
517 m_outputFileMetaData->setDatabaseGlobalTag(Database::Instance().getGlobalTags());
518 for (
const auto& item : m_additionalDataDescription) {
519 m_outputFileMetaData->setDataDescription(item.first, item.second);
522 std::string lfn = m_file->GetName();
524 lfn = std::filesystem::absolute(lfn).string();
527 std::string format = EnvironmentVariables::get(
"BELLE2_LFN_FORMATSTRING",
"");
528 if (!format.empty()) {
529 auto format_filename = boost::python::import(
"B2Tools.format").attr(
"format_filename");
530 lfn = boost::python::extract<std::string>(format_filename(format, m_outputFileName, m_outputFileMetaData->getJsonStr()));
532 m_outputFileMetaData->setLfn(lfn);
534 if (m_updateFileCatalog) {
535 FileCatalog::Instance().registerFile(m_file->GetName(), *m_outputFileMetaData);
540 void StorageRootOutputModule::terminate()
544 while (m_file) closeFile();
547 void StorageRootOutputModule::closeFile()
554 fillTree(DataStore::c_Persistent);
557 TDirectory* dir = gDirectory;
559 for (
int durability = 0; durability < DataStore::c_NDurabilityTypes; ++durability) {
560 if (m_tree[durability]) {
561 B2DEBUG(30,
"Write TTree " <<
c_treeNames[durability]);
562 m_tree[durability]->Write(
c_treeNames[durability].c_str(), TObject::kWriteDelete);
563 delete m_tree[durability];
565 m_tree[durability] =
nullptr;
569 const std::string filename = m_file->GetName();
570 if (m_outputSplitSize) {
571 B2INFO(getName() <<
": Finished writing file." <<
LogVar(
"filename", filename));
575 const std::filesystem::path filename_path{filename};
576 std::string filename_notmp = m_file->GetName();
577 filename_notmp.erase(0, 7);
578 const std::filesystem::path filename_notmp_path{filename_notmp};
581 m_db->execute(
"UPDATE datafiles_root SET "
582 "correct_close = TRUE, "
585 "nfullevents = %lu, "
588 "WHERE name = '%s' AND host = '%s';",
589 ((boost::optional<uint64_t>)(m_file->GetSize()*2) < m_outputSplitSize || m_ramdiskBuffer) ?
"true" :
"false",
590 m_outputFileMetaData->getNEvents(), m_outputFileMetaData->getNFullEvents(), m_file->GetSize(),
591 (boost::posix_time::to_iso_extended_string(boost::posix_time::microsec_clock::local_time())+std::string(
"+09")).c_str(),
592 filename_path.filename().c_str(), m_HLTName.c_str());
602 MetadataService::Instance().addRootOutputFile(filename, m_outputFileMetaData);
605 for (
auto & entry : m_entries) {
608 m_parentLfns.clear();
610 m_experimentHigh = 0;
619 if (m_ramdiskBuffer) {
620 const std::string rsync_cmd = std::string(
"/usr/bin/rsync -a --remove-source-files --recursive ") + filename + std::string(
" ") + filename_notmp_path.parent_path().generic_string() + std::string(
"/ &");
621 B2INFO(getName() <<
": system(" << rsync_cmd <<
")");
622 system(rsync_cmd.c_str());
629 if (!m_tree[durability])
return;
631 TTree& tree = *m_tree[durability];
632 for(
auto* entry: m_entries[durability]) {
636 entry->object->SetBit(kInvalidObject);
639 if (entry->name ==
"FileMetaData") {
640 tree.SetBranchAddress(entry->name.c_str(), &m_outputFileMetaData);
642 tree.SetBranchAddress(entry->name.c_str(), &entry->object);
646 for (
auto* entry: m_entries[durability]) {
647 entry->object->ResetBit(kInvalidObject);
650 const bool writeError = m_file->TestBit(TFile::kWriteError);
653 const std::string filename = m_file->GetName();
655 B2FATAL(
"A write error occured while saving '" << filename <<
"', please check if enough disk space is available.");
EDurability
Durability types.
std::map< std::string, StoreEntry > StoreEntryMap
Map for StoreEntries.
Write objects from DataStore into a ROOT file.
Base class for StoreObjPtr and StoreArray for easier common treatment.
Class to store variables with their name which were sent to the logging service.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
bool hasCustomStreamer(const TClass *cl)
Returns true if and only if 'cl' has a user-defined streamer.
const std::string c_treeNames[]
Names of trees.
std::set< std::string > filterBranches(const std::set< std::string > &branchesToFilter, const std::vector< std::string > &branches, const std::vector< std::string > &excludeBranches, int durability, bool quiet=false)
Given a list of input branches and lists of branches to include/exclude, returns a list of branches t...
bool hasStreamer(const TClass *cl)
Returns true if and only if 'cl' or one of its bases has I/O streamers.
Abstract base class for different kinds of events.