9 #include <boost/python.hpp>
11 #include <daq/storage/modules/StorageRootOutputModule.h>
13 #include <framework/io/RootIOUtilities.h>
14 #include <framework/core/FileCatalog.h>
15 #include <framework/core/MetadataService.h>
16 #include <framework/core/RandomNumbers.h>
17 #include <framework/database/Database.h>
19 #include <framework/core/ModuleParam.templateDetails.h>
20 #include <framework/utilities/EnvironmentVariables.h>
22 #include <boost/format.hpp>
23 #include <boost/algorithm/string.hpp>
25 #include <TClonesArray.h>
27 #include <nlohmann/json.hpp>
34 #include <boost/date_time/posix_time/posix_time.hpp>
35 #include <daq/slc/base/ConfigFile.h>
36 #include <daq/slc/psql/PostgreSQLInterface.h>
37 #include <daq/slc/database/DBHandlerException.h>
38 #include <daq/slc/base/StringUtil.h>
43 using namespace RootIOUtilities;
55 m_eventLow(0), m_experimentHigh(0), m_runHigh(0), m_eventHigh(0)
58 setDescription(
"Writes DataStore objects into a .root file. Data is stored in a TTree 'tree' for event-dependent and in 'persistent' for peristent data. You can use RootInput to read the files back into basf2.");
59 setPropertyFlags(c_Output);
62 addParam(
"outputFileName", m_outputFileName,
"Name of the output file. Can be overridden using the -o argument to basf2.",
63 string(
"RootOutput.root"));
64 addParam(
"ignoreCommandLineOverride", m_ignoreCommandLineOverride,
65 "Ignore override of file name via command line argument -o. Useful if you have multiple output modules in one path.",
false);
66 addParam(
"compressionLevel", m_compressionLevel,
67 "0 for no, 1 for low, 9 for high compression. Level 1 usually reduces size by >50%, higher levels have no noticeable effect. On typical hard disks, disabling compression reduces write time by 10-20 %, but almost doubles read time, so you probably should leave this turned on.",
69 addParam(
"compressionAlgorithm", m_compressionAlgorithm,
70 "Set the Compression algorithm. Recommended values are 0 for default, 1 for zlib and 4 for lz4\n\n"
71 ".. versionadded:: release-03-00-00", m_compressionAlgorithm);
72 addParam(
"splitLevel", m_splitLevel,
73 "Branch split level: determines up to which depth object members will be saved in separate sub-branches in the tree. For arrays or objects with custom streamers, -1 is used instead to ensure the streamers are used. The default (99) usually gives the highest read performance with RootInput.",
75 addParam(
"updateFileCatalog", m_updateFileCatalog, R
"DOC(
76 Flag that specifies whether the file metadata catalog is updated or created.
77 This is only necessary in special cases and can always be done afterwards using
78 ``b2file-catalog-add filename.root``"
80 (You can also set the ``BELLE2_FILECATALOG`` environment variable to NONE to get
81 the same effect as setting this to false))DOC", false);
83 vector<string> emptyvector;
84 addParam(c_SteerBranchNames[0], m_branchNames[0],
85 "Names of event durability branches to be saved. Empty means all branches. Objects with c_DontWriteOut flag added here will also be saved. (EventMetaData is always saved)",
87 addParam(c_SteerBranchNames[1], m_branchNames[1],
88 "Names of persistent durability branches to be saved. Empty means all branches. Objects with c_DontWriteOut flag added here will also be saved. (FileMetaData is always saved)",
90 addParam(c_SteerAdditionalBranchNames[0], m_additionalBranchNames[0],
91 "Add additional event branch names without the need to specify all branchnames.",
93 addParam(c_SteerAdditionalBranchNames[1], m_additionalBranchNames[1],
94 "Add additional persistent branch names without the need to specify all branchnames.",
96 addParam(c_SteerExcludeBranchNames[0], m_excludeBranchNames[0],
97 "Names of event durability branches NOT to be saved. Branches also in branchNames are not saved.", emptyvector);
98 addParam(c_SteerExcludeBranchNames[1], m_excludeBranchNames[1],
99 "Names of persistent durability branches NOT to be saved. Branches also in branchNamesPersistent are not saved.", emptyvector);
100 addParam(
"autoFlushSize", m_autoflush,
101 "Value for TTree SetAutoFlush(): a positive value tells ROOT to flush all baskets to disk after n entries, a negative value to flush after -n bytes",
103 addParam(
"autoSaveSize", m_autosave,
104 "Value for TTree SetAutoSave(): a positive value tells ROOT to write the TTree metadata after n entries, a negative value to write the metadata after -n bytes",
106 addParam(
"basketSize", m_basketsize,
"Basketsize for Branches in the Tree in bytes", 32000);
107 addParam(
"additionalDataDescription", m_additionalDataDescription,
"Additional dictionary of "
108 "name->value pairs to be added to the file metadata to describe the data",
109 m_additionalDataDescription);
110 addParam(
"buildIndex", m_buildIndex,
"Build Event Index for faster finding of events by exp/run/event number", m_buildIndex);
111 addParam(
"keepParents", m_keepParents,
"Keep parents files of input files, input files will not be added as output file's parents",
113 addParam(
"outputSplitSize", m_outputSplitSize, R
"DOC(
114 If given split the output file once the file has reached the given size in MB.
115 If set the filename will end in ``.f{index:05d}.root``. So if for example
116 ``outputFileName`` is set to "RootOutput.root" then the files will be named
117 ``RootOutput.f00000.root``, ``RootOutput.f00001.root``,
118 ``RootOutput.f00002.root``, ...
120 All created output files are complete and independent files and can
121 subsequently processed completely independent.
124 The output files will be approximately of the size given by
125 ``outputSplitSize`` but they will be slightly larger since
126 additional information has to be written at the end of the file. If necessary
127 please account for this. Also, using ``buildIndex=False`` might be beneficial
128 to reduce the overshoot.
131 This will set the amount of generated events stored in the file metadata to
132 zero as it is not possible to determine which fraction ends up in which
135 .. versionadded:: release-03-00-00
136 )DOC", m_outputSplitSize);
139 addParam(
"runType", m_runType,
"Run type",
string(
"null"));
140 addParam(
"HLTName", m_HLTName,
"HLT name",
string(
"HLT00"));
141 addParam(
"nDisk", m_nDisk,
"The number of paratitions", 3);
142 addParam(
"skipFirstEvent", m_firstEvent,
"Boolean to skip the first event or not. "
143 "If the module is used inside the hbasf2, like HLT storage, the first event need to be skipped.", m_firstEvent);
144 addParam(
"ramdiskBuffer", m_ramdiskBuffer,
"Boolean to make small ramdisk buffer setup. "
145 "If this is false, assuming the buffer disks are large SSD.", m_ramdiskBuffer);
149 StorageRootOutputModule::~StorageRootOutputModule() =
default;
151 void StorageRootOutputModule::initialize()
155 TTree::SetMaxTreeSize(1000 * 1000 * 100000000000LL);
158 m_fileMetaData.registerInDataStore();
160 m_eventMetaData.isRequired();
163 if (m_outputSplitSize) {
164 if (*m_outputSplitSize == 0) B2ERROR(
"outputSplitSize must be set to a positive value");
166 if (*m_outputSplitSize >= 1024*1024) B2WARNING(
"outputSplitSize set to " << *m_outputSplitSize <<
" MB, please make sure the units are correct");
168 *m_outputSplitSize *= 1024 * 1024;
174 std::regex protocol(
"^([A-Za-z]*)://");
175 if(std::smatch m; std::regex_search(m_outputFileName, m, protocol)) {
178 m_outputFileName = std::regex_replace(m_outputFileName, protocol,
"");
181 m_regularFile =
false;
190 config.get(
"database.dbname"),
191 config.get(
"database.user"),
192 config.get(
"database.password"),
193 config.getInt(
"database.port"));
197 void StorageRootOutputModule::openFile()
202 TDirectory* dir = gDirectory;
203 std::filesystem::path out{m_outputFileName};
204 if (m_outputSplitSize) {
210 TUrl fileUrl(m_outputFileName.c_str(), m_regularFile);
211 std::filesystem::path file{fileUrl.GetFile()};
212 file.replace_extension((boost::format(
"f%05d.root") % m_fileIndex).str());
213 fileUrl.SetFile(file.c_str());
215 out = m_regularFile? fileUrl.GetFileAndOptions() : fileUrl.GetUrl();
219 std::filesystem::path out_notmp = out;
220 out = std::filesystem::path{std::string(
"/buffer") + out.generic_string()};
222 m_file = TFile::Open(out.c_str(),
"RECREATE",
"basf2 Event File");
223 if ((!m_file || m_file->IsZombie()) && m_regularFile) {
225 auto dirpath = out.parent_path();
227 if (std::filesystem::create_directories(dirpath)) {
228 B2INFO(
"Created missing directory " << dirpath <<
".");
230 m_file = TFile::Open(out.c_str(),
"RECREATE",
"basf2 Event File");
237 while (!m_file || m_file->IsZombie()) {
238 m_file = TFile::Open(out.c_str(),
"RECREATE",
"basf2 Event File");
239 B2INFO(
"process(" << m_processNumber <<
"), Try open: " << ++openCounter);
242 if (!m_file || m_file->IsZombie()) {
243 B2FATAL(
"Couldn't open file " << out <<
" for writing!");
245 m_file->SetCompressionAlgorithm(m_compressionAlgorithm);
246 m_file->SetCompressionLevel(m_compressionLevel);
248 for (
int durability = 0; durability < DataStore::c_NDurabilityTypes; durability++) {
250 set<string> branchList;
251 for (
const auto& pair : map)
252 branchList.insert(pair.first);
254 branchList =
filterBranches(branchList, m_branchNames[durability], m_excludeBranchNames[durability], durability);
258 m_tree[durability]->SetAutoFlush(m_autoflush);
259 m_tree[durability]->SetAutoSave(m_autosave);
260 for (
auto & iter : map) {
261 const std::string& branchName = iter.first;
263 if (iter.second.dontWriteOut
264 && find(m_branchNames[durability].begin(), m_branchNames[durability].end(), branchName) == m_branchNames[durability].end()
265 && find(m_additionalBranchNames[durability].begin(), m_additionalBranchNames[durability].end(),
266 branchName) == m_additionalBranchNames[durability].end())
269 if (branchList.count(branchName) == 0) {
271 if (((branchName !=
"FileMetaData") || (durability == DataStore::c_Event)) &&
272 ((branchName !=
"EventMetaData") || (durability == DataStore::c_Persistent))) {
278 if(durability == DataStore::c_Persistent and m_outputSplitSize and m_fileIndex==0 and
279 (branchName !=
"FileMetaData" and branchName !=
"ProcessStatistics")) {
280 B2WARNING(
"Persistent branches might not be stored as expected when splitting the output by size" <<
LogVar(
"branch", branchName));
283 TClass* entryClass = iter.second.objClass;
292 if (!entryClass->HasDictionary()) {
293 if (m_fileIndex == 0) {
294 B2WARNING(
"No dictionary found, object will not be saved (This is probably an obsolete class that is still present in the input file.)"
295 <<
LogVar(
"class", entryClass->GetName()) <<
LogVar(
"branch", branchName));
301 B2ERROR(
"The version number in the ClassDef() macro must be at least 1 to enable I/O!" <<
LogVar(
"class", entryClass->GetName()));
304 int splitLevel = m_splitLevel;
306 B2DEBUG(38,
"Class has custom streamer, setting split level -1 for this branch." <<
LogVar(
"class", entryClass->GetName()));
309 if (iter.second.isArray) {
311 static_cast<TClonesArray*
>(iter.second.object)->BypassStreamer(kFALSE);
314 m_tree[durability]->Branch(branchName.c_str(), &iter.second.object, m_basketsize, splitLevel);
315 m_entries[durability].push_back(&iter.second);
316 B2DEBUG(39,
"The branch " << branchName <<
" was created.");
319 if (m_fileIndex == 0) {
321 iter.second.isArray));
327 if (m_outputSplitSize) {
328 B2INFO(getName() <<
": Opened " << (m_fileIndex > 0 ?
"new " :
"") <<
"file for writing" <<
LogVar(
"filename", out));
334 m_db->execute(
"INSERT INTO datafiles_root "
335 "(name, path, host, disk, runtype, expno, runno, compression_level, compression_algorithm, time_open, "
336 "size, nevents, nfullevents, correct_close, renamed, used_for_merge, removed) "
337 "VALUES ('%s', '%s', '%s', '%s','%s', %d, %d, %d, %d, '%s', 0, 0, 0, FALSE, FALSE, FALSE, FALSE);",
338 out_notmp.filename().c_str(), out_notmp.c_str(), m_HLTName.c_str(), m_disk.c_str(),
339 m_runType.c_str(), m_expno, m_runno,
340 m_compressionLevel, m_compressionAlgorithm,
341 (boost::posix_time::to_iso_extended_string(boost::posix_time::microsec_clock::local_time())+std::string(
"+09")).c_str());
348 void StorageRootOutputModule::event()
353 m_firstEvent =
false;
358 if ((m_expno > m_eventMetaData->getExperiment()) ||
359 (m_expno == m_eventMetaData->getExperiment() && m_runno > m_eventMetaData->getRun())) {
365 if (m_expno != m_eventMetaData->getExperiment() || m_runno != m_eventMetaData->getRun()) {
367 while (m_file) closeFile();
374 m_expno = m_eventMetaData->getExperiment();
375 m_runno = m_eventMetaData->getRun();
376 m_disk = StringUtil::form(
"disk%02d", (m_processNumber%m_nDisk)+1);
377 m_processNumber = atoi(getName().substr(0, getName().find(std::string(
"_"))).c_str());
378 m_outputFileName = StringUtil::form(
"/rawdata/%s/belle/Raw/%4.4d/%5.5d/%s.%4.4d.%5.5d.%s.p%02d.root",
379 m_disk.c_str(), m_expno, m_runno, m_runType.c_str(),
380 m_expno, m_runno, m_HLTName.c_str(), m_processNumber);
384 if (!m_file) openFile();
386 if (!m_keepParents) {
387 if (m_fileMetaData) {
388 m_eventMetaData->setParentLfn(m_fileMetaData->getLfn());
393 fillTree(DataStore::c_Event);
395 if (m_fileMetaData) {
397 for (
int iparent = 0; iparent < m_fileMetaData->getNParents(); iparent++) {
398 string lfn = m_fileMetaData->getParent(iparent);
399 if (!lfn.empty() && (m_parentLfns.empty() || (m_parentLfns.back() != lfn))) {
400 m_parentLfns.push_back(lfn);
404 string lfn = m_fileMetaData->getLfn();
405 if (!lfn.empty() && (m_parentLfns.empty() || (m_parentLfns.back() != lfn))) {
406 m_parentLfns.push_back(lfn);
412 unsigned long experiment = m_eventMetaData->getExperiment();
413 unsigned long run = m_eventMetaData->getRun();
414 unsigned long event = m_eventMetaData->getEvent();
415 if (m_experimentLow > m_experimentHigh) {
416 m_experimentLow = m_experimentHigh = experiment;
417 m_runLow = m_runHigh = run;
418 m_eventLow = m_eventHigh = event;
420 if ((experiment < m_experimentLow) || ((experiment == m_experimentLow) && ((run < m_runLow) || ((run == m_runLow)
421 && (event < m_eventLow))))) {
422 m_experimentLow = experiment;
426 if ((experiment > m_experimentHigh) || ((experiment == m_experimentHigh) && ((run > m_runHigh) || ((run == m_runHigh)
427 && (event > m_eventHigh))))) {
428 m_experimentHigh = experiment;
435 if (m_eventMetaData->getErrorFlag() == 0)
439 if (m_outputSplitSize and (uint64_t)m_file->GetEND() > *m_outputSplitSize) {
441 B2INFO(getName() <<
": Output size limit reached, closing file ...");
444 while (m_file) closeFile();
448 void StorageRootOutputModule::fillFileMetaData()
450 bool isMC = (m_fileMetaData) ? m_fileMetaData->isMC() :
true;
456 m_fileMetaData.create(
true);
457 if (!isMC) m_fileMetaData->declareRealData();
459 if (m_tree[DataStore::c_Event]) {
461 TTree* tree = m_tree[DataStore::c_Event];
462 unsigned long numEntries = tree->GetEntries();
463 m_fileMetaData->setNFullEvents(m_nFullEvents);
464 if (m_buildIndex && numEntries > 0) {
465 if (numEntries > 10000000) {
467 B2WARNING(
"Not building TTree index because of large number of events. The index object would conflict with ROOT limits on object size and cause problems.");
468 }
else if (tree->GetBranch(
"EventMetaData")) {
469 tree->SetBranchAddress(
"EventMetaData",
nullptr);
470 RootIOUtilities::buildIndex(tree);
474 m_fileMetaData->setNEvents(numEntries);
475 if (m_experimentLow > m_experimentHigh) {
477 m_fileMetaData->setLow(-1, -1, 0);
478 m_fileMetaData->setHigh(-1, -1, 0);
480 m_fileMetaData->setLow(m_experimentLow, m_runLow, m_eventLow);
481 m_fileMetaData->setHigh(m_experimentHigh, m_runHigh, m_eventHigh);
486 m_fileMetaData->setParents(m_parentLfns);
487 RootIOUtilities::setCreationData(*m_fileMetaData);
488 m_fileMetaData->setRandomSeed(RandomNumbers::getSeed());
489 m_fileMetaData->setSteering(Environment::Instance().getSteering());
490 auto mcEvents = Environment::Instance().getNumberOfMCEvents();
491 if(m_outputSplitSize and mcEvents > 0) {
492 if(m_fileIndex == 0) B2WARNING(
"Number of MC Events cannot be saved when splitting output files by size, setting to 0");
495 m_fileMetaData->setMcEvents(mcEvents);
496 m_fileMetaData->setDatabaseGlobalTag(Database::Instance().getGlobalTags());
497 for (
const auto& item : m_additionalDataDescription) {
498 m_fileMetaData->setDataDescription(item.first, item.second);
501 std::string lfn = m_file->GetName();
503 lfn = std::filesystem::absolute(lfn).string();
506 std::string format = EnvironmentVariables::get(
"BELLE2_LFN_FORMATSTRING",
"");
507 if (!format.empty()) {
508 auto format_filename = boost::python::import(
"B2Tools.format").attr(
"format_filename");
509 lfn = boost::python::extract<std::string>(format_filename(format, m_outputFileName, m_fileMetaData->getJsonStr()));
511 m_fileMetaData->setLfn(lfn);
513 if (m_updateFileCatalog) {
514 FileCatalog::Instance().registerFile(m_file->GetName(), *m_fileMetaData);
516 m_outputFileMetaData = *m_fileMetaData;
520 void StorageRootOutputModule::terminate()
524 while (m_file) closeFile();
527 void StorageRootOutputModule::closeFile()
531 std::unique_ptr<FileMetaData> old;
532 if (m_fileMetaData) old = std::make_unique<FileMetaData>(*m_fileMetaData);
537 fillTree(DataStore::c_Persistent);
540 if (old) *m_fileMetaData = *old;
544 TDirectory* dir = gDirectory;
546 for (
int durability = 0; durability < DataStore::c_NDurabilityTypes; ++durability) {
547 if (m_tree[durability]) {
548 B2DEBUG(30,
"Write TTree " <<
c_treeNames[durability]);
549 m_tree[durability]->Write(
c_treeNames[durability].c_str(), TObject::kWriteDelete);
550 delete m_tree[durability];
552 m_tree[durability] =
nullptr;
556 const std::string filename = m_file->GetName();
557 if (m_outputSplitSize) {
558 B2INFO(getName() <<
": Finished writing file." <<
LogVar(
"filename", filename));
562 const std::filesystem::path filename_path{filename};
563 std::string filename_notmp = m_file->GetName();
564 filename_notmp.erase(0, 7);
565 const std::filesystem::path filename_notmp_path{filename_notmp};
568 m_db->execute(
"UPDATE datafiles_root SET "
569 "correct_close = TRUE, "
572 "nfullevents = %lu, "
575 "WHERE name = '%s' AND host = '%s';",
576 ((boost::optional<uint64_t>)(m_file->GetSize()*2) < m_outputSplitSize || m_ramdiskBuffer) ?
"true" :
"false",
577 m_outputFileMetaData.getNEvents(), m_outputFileMetaData.getNFullEvents(), m_file->GetSize(),
578 (boost::posix_time::to_iso_extended_string(boost::posix_time::microsec_clock::local_time())+std::string(
"+09")).c_str(),
579 filename_path.filename().c_str(), m_HLTName.c_str());
589 MetadataService::Instance().addRootOutputFile(filename, &m_outputFileMetaData);
592 for (
auto & entry : m_entries) {
595 m_parentLfns.clear();
597 m_experimentHigh = 0;
606 if (m_ramdiskBuffer) {
607 const std::string rsync_cmd = std::string(
"/usr/bin/rsync -a --remove-source-files --recursive ") + filename + std::string(
" ") + filename_notmp_path.parent_path().generic_string() + std::string(
"/ &");
608 B2INFO(getName() <<
": system(" << rsync_cmd <<
")");
609 system(rsync_cmd.c_str());
616 if (!m_tree[durability])
return;
618 TTree& tree = *m_tree[durability];
619 for(
auto* entry: m_entries[durability]) {
623 entry->object->SetBit(kInvalidObject);
626 tree.SetBranchAddress(entry->name.c_str(), &entry->object);
629 for (
auto* entry: m_entries[durability]) {
630 entry->object->ResetBit(kInvalidObject);
633 const bool writeError = m_file->TestBit(TFile::kWriteError);
636 const std::string filename = m_file->GetName();
638 B2FATAL(
"A write error occured while saving '" << filename <<
"', please check if enough disk space is available.");
EDurability
Durability types.
std::map< std::string, StoreEntry > StoreEntryMap
Map for StoreEntries.
Write objects from DataStore into a ROOT file.
Base class for StoreObjPtr and StoreArray for easier common treatment.
Class to store variables with their name which were sent to the logging service.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
bool hasCustomStreamer(const TClass *cl)
Returns true if and only if 'cl' has a user-defined streamer.
const std::string c_treeNames[]
Names of trees.
std::set< std::string > filterBranches(const std::set< std::string > &branchesToFilter, const std::vector< std::string > &branches, const std::vector< std::string > &excludeBranches, int durability, bool quiet=false)
Given a list of input branches and lists of branches to include/exclude, returns a list of branches t...
bool hasStreamer(const TClass *cl)
Returns true if and only if 'cl' or one of its bases has I/O streamers.
Abstract base class for different kinds of events.