Belle II Software  release-08-01-08
StorageRootOutputModule.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <boost/python.hpp>
10 
11 #include <daq/storage/modules/StorageRootOutputModule.h>
12 
13 #include <framework/io/RootIOUtilities.h>
14 #include <framework/core/FileCatalog.h>
15 #include <framework/core/MetadataService.h>
16 #include <framework/core/RandomNumbers.h>
17 #include <framework/database/Database.h>
18 // needed for complex module parameter
19 #include <framework/core/ModuleParam.templateDetails.h>
20 #include <framework/utilities/EnvironmentVariables.h>
21 
22 #include <boost/format.hpp>
23 #include <boost/algorithm/string.hpp>
24 
25 #include <TClonesArray.h>
26 
27 #include <nlohmann/json.hpp>
28 
29 #include <memory>
30 #include <regex>
31 #include <filesystem>
32 
33 // For online storage
34 #include <boost/date_time/posix_time/posix_time.hpp>
35 #include <daq/slc/base/ConfigFile.h>
36 #include <daq/slc/psql/PostgreSQLInterface.h>
37 #include <daq/slc/database/DBHandlerException.h>
38 #include <daq/slc/base/StringUtil.h>
39 
40 
41 using namespace std;
42 using namespace Belle2;
43 using namespace RootIOUtilities;
44 
45 //-----------------------------------------------------------------
46 // Register the Module
47 //-----------------------------------------------------------------
48 REG_MODULE(StorageRootOutput)
49 
50 //-----------------------------------------------------------------
51 // Implementation
52 //-----------------------------------------------------------------
53 
54 StorageRootOutputModule::StorageRootOutputModule() : Module(), m_file(nullptr), m_tree{0}, m_experimentLow(1), m_runLow(0),
55  m_eventLow(0), m_experimentHigh(0), m_runHigh(0), m_eventHigh(0)
56 {
57  //Set module properties
58  setDescription("Writes DataStore objects into a .root file. Data is stored in a TTree 'tree' for event-dependent and in 'persistent' for peristent data. You can use RootInput to read the files back into basf2.");
59  setPropertyFlags(c_Output);
60 
61  //Parameter definition
62  addParam("outputFileName", m_outputFileName, "Name of the output file. Can be overridden using the -o argument to basf2.",
63  string("RootOutput.root"));
64  addParam("ignoreCommandLineOverride", m_ignoreCommandLineOverride,
65  "Ignore override of file name via command line argument -o. Useful if you have multiple output modules in one path.", false);
66  addParam("compressionLevel", m_compressionLevel,
67  "0 for no, 1 for low, 9 for high compression. Level 1 usually reduces size by >50%, higher levels have no noticeable effect. On typical hard disks, disabling compression reduces write time by 10-20 %, but almost doubles read time, so you probably should leave this turned on.",
68  m_compressionLevel);
69  addParam("compressionAlgorithm", m_compressionAlgorithm,
70  "Set the Compression algorithm. Recommended values are 0 for default, 1 for zlib and 4 for lz4\n\n"
71  ".. versionadded:: release-03-00-00", m_compressionAlgorithm);
72  addParam("splitLevel", m_splitLevel,
73  "Branch split level: determines up to which depth object members will be saved in separate sub-branches in the tree. For arrays or objects with custom streamers, -1 is used instead to ensure the streamers are used. The default (99) usually gives the highest read performance with RootInput.",
74  99);
75  addParam("updateFileCatalog", m_updateFileCatalog, R"DOC(
76 Flag that specifies whether the file metadata catalog is updated or created.
77 This is only necessary in special cases and can always be done afterwards using
78 ``b2file-catalog-add filename.root``"
79 
80 (You can also set the ``BELLE2_FILECATALOG`` environment variable to NONE to get
81 the same effect as setting this to false))DOC", false);
82 
83  vector<string> emptyvector;
84  addParam(c_SteerBranchNames[0], m_branchNames[0],
85  "Names of event durability branches to be saved. Empty means all branches. Objects with c_DontWriteOut flag added here will also be saved. (EventMetaData is always saved)",
86  emptyvector);
87  addParam(c_SteerBranchNames[1], m_branchNames[1],
88  "Names of persistent durability branches to be saved. Empty means all branches. Objects with c_DontWriteOut flag added here will also be saved. (FileMetaData is always saved)",
89  emptyvector);
90  addParam(c_SteerAdditionalBranchNames[0], m_additionalBranchNames[0],
91  "Add additional event branch names without the need to specify all branchnames.",
92  emptyvector);
93  addParam(c_SteerAdditionalBranchNames[1], m_additionalBranchNames[1],
94  "Add additional persistent branch names without the need to specify all branchnames.",
95  emptyvector);
96  addParam(c_SteerExcludeBranchNames[0], m_excludeBranchNames[0],
97  "Names of event durability branches NOT to be saved. Branches also in branchNames are not saved.", emptyvector);
98  addParam(c_SteerExcludeBranchNames[1], m_excludeBranchNames[1],
99  "Names of persistent durability branches NOT to be saved. Branches also in branchNamesPersistent are not saved.", emptyvector);
100  addParam("autoFlushSize", m_autoflush,
101  "Value for TTree SetAutoFlush(): a positive value tells ROOT to flush all baskets to disk after n entries, a negative value to flush after -n bytes",
102  -10000000);
103  addParam("autoSaveSize", m_autosave,
104  "Value for TTree SetAutoSave(): a positive value tells ROOT to write the TTree metadata after n entries, a negative value to write the metadata after -n bytes",
105  -10000000);
106  addParam("basketSize", m_basketsize, "Basketsize for Branches in the Tree in bytes", 32000);
107  addParam("additionalDataDescription", m_additionalDataDescription, "Additional dictionary of "
108  "name->value pairs to be added to the file metadata to describe the data",
109  m_additionalDataDescription);
110  addParam("buildIndex", m_buildIndex, "Build Event Index for faster finding of events by exp/run/event number", m_buildIndex);
111  addParam("keepParents", m_keepParents, "Keep parents files of input files, input files will not be added as output file's parents",
112  m_keepParents);
113  addParam("outputSplitSize", m_outputSplitSize, R"DOC(
114 If given split the output file once the file has reached the given size in MB.
115 If set the filename will end in ``.f{index:05d}.root``. So if for example
116 ``outputFileName`` is set to "RootOutput.root" then the files will be named
117 ``RootOutput.f00000.root``, ``RootOutput.f00001.root``,
118 ``RootOutput.f00002.root``, ...
119 
120 All created output files are complete and independent files and can
121 subsequently processed completely independent.
122 
123 Note:
124  The output files will be approximately of the size given by
125  ``outputSplitSize`` but they will be slightly larger since
126  additional information has to be written at the end of the file. If necessary
127  please account for this. Also, using ``buildIndex=False`` might be beneficial
128  to reduce the overshoot.
129 
130 Warning:
131  This will set the amount of generated events stored in the file metadata to
132  zero as it is not possible to determine which fraction ends up in which
133  output file.
134 
135 .. versionadded:: release-03-00-00
136 )DOC", m_outputSplitSize);
137 
138  m_outputFileMetaData = new FileMetaData;
139 
140  // Parameters for online storage
141  addParam("runType", m_runType, "Run type", string("null"));
142  addParam("HLTName", m_HLTName, "HLT name", string("HLT00"));
143  addParam("nDisk", m_nDisk, "The number of paratitions", 3);
144  addParam("skipFirstEvent", m_firstEvent, "Boolean to skip the first event or not. "
145  "If the module is used inside the hbasf2, like HLT storage, the first event need to be skipped.", m_firstEvent);
146  addParam("ramdiskBuffer", m_ramdiskBuffer, "Boolean to make small ramdisk buffer setup. "
147  "If this is false, assuming the buffer disks are large SSD.", m_ramdiskBuffer);
148 }
149 
150 
151 StorageRootOutputModule::~StorageRootOutputModule() = default;
152 
153 void StorageRootOutputModule::initialize()
154 {
155  //ROOT has a default maximum size of 100GB for trees??? For larger trees it creates a new file and does other things that finally produce crashes.
156  //Let's set this to 100PB, that should last a bit longer.
157  TTree::SetMaxTreeSize(1000 * 1000 * 100000000000LL);
158 
159  //make sure we have event meta data
160  m_eventMetaData.isRequired();
161 
162  //check outputSplitSize
163  if (m_outputSplitSize) {
164  if (*m_outputSplitSize == 0) B2ERROR("outputSplitSize must be set to a positive value");
165  // Warn is splitsize is >= 1TB ... because this seems weirdly like size was given in bytes
166  if (*m_outputSplitSize >= 1024*1024) B2WARNING("outputSplitSize set to " << *m_outputSplitSize << " MB, please make sure the units are correct");
167  // convert to bytes
168  *m_outputSplitSize *= 1024 * 1024;
169  }
170 
171  getFileNames();
172 
173  // Now check if the file has a protocol like file:// or http:// in front
174  std::regex protocol("^([A-Za-z]*)://");
175  if(std::smatch m; std::regex_search(m_outputFileName, m, protocol)) {
176  if(m[1] == "file") {
177  // file protocol: treat as local and just remove it from the filename
178  m_outputFileName = std::regex_replace(m_outputFileName, protocol, "");
179  } else {
180  // any other protocol: not local, don't create directories
181  m_regularFile = false;
182  }
183  }
184  // For online storage
185  // Do not open file in basf2 initialize
186  // openFile();
187 
188  ConfigFile config("slowcontrol");
189  PostgreSQLInterface *db = new PostgreSQLInterface(config.get("database.host"),
190  config.get("database.dbname"),
191  config.get("database.user"),
192  config.get("database.password"),
193  config.getInt("database.port"));
194  m_db = db;
195 }
196 
197 // For online storage
198 void StorageRootOutputModule::endRun() {
199  if (m_file) {
200  closeFile();
201  while (m_file) closeFile(); // I hope that this will not be failed
202  m_fileIndex = 0;
203  }
204 }
205 
206 void StorageRootOutputModule::openFile()
207 {
208  // Since we open a new file, we also have to reset the number of full events
209  m_nFullEvents = 0;
210  // Continue with opening the file
211  TDirectory* dir = gDirectory;
212  std::filesystem::path out{m_outputFileName};
213  if (m_outputSplitSize) {
214  // Mangle the filename to add the fNNNNN part. However we need to be
215  // careful since the file name could be non-local and have some options or
216  // anchor information attached (like
217  // http://mydomain.org/filename.root?foo=bar#baz). So use "TUrl" *sigh* to
218  // do the parsing and only replace the extension of the file part.
219  TUrl fileUrl(m_outputFileName.c_str(), m_regularFile);
220  std::filesystem::path file{fileUrl.GetFile()};
221  file.replace_extension((boost::format("f%05d.root") % m_fileIndex).str());
222  fileUrl.SetFile(file.c_str());
223  // In case of regular files we don't want the protocol or anything, just the file
224  out = m_regularFile? fileUrl.GetFileAndOptions() : fileUrl.GetUrl();
225  }
226 
227  // For online storage buffer disks
228  std::filesystem::path out_notmp = out;
229  out = std::filesystem::path{std::string("/buffer") + out.generic_string()};
230 
231  m_file = TFile::Open(out.c_str(), "RECREATE", "basf2 Event File");
232  if ((!m_file || m_file->IsZombie()) && m_regularFile) {
233  //try creating necessary directories since this is a local file
234  auto dirpath = out.parent_path();
235 
236  if (std::filesystem::create_directories(dirpath)) {
237  B2INFO("Created missing directory " << dirpath << ".");
238  //try again
239  m_file = TFile::Open(out.c_str(), "RECREATE", "basf2 Event File");
240  }
241 
242  }
243 
244  // Retry file open for online storage, with ramdisk, may not be needed
245  int openCounter = 0;
246  while (!m_file || m_file->IsZombie()) {
247  m_file = TFile::Open(out.c_str(), "RECREATE", "basf2 Event File");
248  B2INFO("process(" << m_processNumber << "), Try open: " << ++openCounter);
249  }
250 
251  if (!m_file || m_file->IsZombie()) {
252  B2FATAL("Couldn't open file " << out << " for writing!");
253  }
254  m_file->SetCompressionAlgorithm(m_compressionAlgorithm);
255  m_file->SetCompressionLevel(m_compressionLevel);
256 
257  for (int durability = 0; durability < DataStore::c_NDurabilityTypes; durability++) {
258  DataStore::StoreEntryMap& map = DataStore::Instance().getStoreEntryMap(DataStore::EDurability(durability));
259  set<string> branchList;
260  for (const auto& pair : map)
261  branchList.insert(pair.first);
262  //skip branches the user doesn't want
263  branchList = filterBranches(branchList, m_branchNames[durability], m_excludeBranchNames[durability], durability);
264 
265  //create the tree and branches
266  m_tree[durability] = new TTree(c_treeNames[durability].c_str(), c_treeNames[durability].c_str());
267  m_tree[durability]->SetAutoFlush(m_autoflush);
268  m_tree[durability]->SetAutoSave(m_autosave);
269  for (auto & iter : map) {
270  const std::string& branchName = iter.first;
271  //skip transient entries (allow overriding via branchNames)
272  if (iter.second.dontWriteOut
273  && find(m_branchNames[durability].begin(), m_branchNames[durability].end(), branchName) == m_branchNames[durability].end()
274  && find(m_additionalBranchNames[durability].begin(), m_additionalBranchNames[durability].end(),
275  branchName) == m_additionalBranchNames[durability].end())
276  continue;
277  //skip branches the user doesn't want
278  if (branchList.count(branchName) == 0) {
279  //make sure FileMetaData and EventMetaData are always included in the output
280  if (((branchName != "FileMetaData") || (durability == DataStore::c_Event)) &&
281  ((branchName != "EventMetaData") || (durability == DataStore::c_Persistent))) {
282  continue;
283  }
284  }
285 
286  // Warn for anything other than FileMetaData and ProcessStatistics ...
287  if(durability == DataStore::c_Persistent and m_outputSplitSize and m_fileIndex==0 and
288  (branchName != "FileMetaData" and branchName != "ProcessStatistics")) {
289  B2WARNING("Persistent branches might not be stored as expected when splitting the output by size" << LogVar("branch", branchName));
290  }
291 
292  TClass* entryClass = iter.second.objClass;
293 
294  //I want to do this in the input module, but I apparently I cannot disable reading those branches.
295  //isabling reading the branch by not calling SetBranchAddress() for it results in the following crashes. Calling SetBranchStatus(..., 0) doesn't help, either.
296  //reported to ROOT devs, let's see if it gets fixed.
297  //
298  //HasDictionary() is a new function in root 6
299  //using it instead of GetClassInfo() avoids having to parse header files (and
300  //the associated memory cost)
301  if (!entryClass->HasDictionary()) {
302  if (m_fileIndex == 0) {
303  B2WARNING("No dictionary found, object will not be saved (This is probably an obsolete class that is still present in the input file.)"
304  << LogVar("class", entryClass->GetName()) << LogVar("branch", branchName));
305  }
306  continue;
307  }
308 
309  if (!hasStreamer(entryClass)) {
310  B2ERROR("The version number in the ClassDef() macro must be at least 1 to enable I/O!" << LogVar("class", entryClass->GetName()));
311  }
312 
313  int splitLevel = m_splitLevel;
314  if (hasCustomStreamer(entryClass)) {
315  B2DEBUG(38, "Class has custom streamer, setting split level -1 for this branch." << LogVar("class", entryClass->GetName()));
316 
317  splitLevel = -1;
318  if (iter.second.isArray) {
319  //for arrays, we also don't want TClonesArray to go around our streamer
320  static_cast<TClonesArray*>(iter.second.object)->BypassStreamer(kFALSE);
321  }
322  }
323  m_tree[durability]->Branch(branchName.c_str(), &iter.second.object, m_basketsize, splitLevel);
324  m_entries[durability].push_back(&iter.second);
325  B2DEBUG(39, "The branch " << branchName << " was created.");
326 
327  //Tell DataStore that we are using this entry
328  if (m_fileIndex == 0) {
329  DataStore::Instance().optionalInput(StoreAccessorBase(branchName, (DataStore::EDurability)durability, entryClass,
330  iter.second.isArray));
331  }
332  }
333  }
334 
335  // set the address of the FileMetaData branch for the output to a separate one from the input
336  TBranch* fileMetaDataBranch = m_tree[DataStore::c_Persistent]->GetBranch("FileMetaData");
337  if (fileMetaDataBranch) {
338  fileMetaDataBranch->SetAddress(&m_outputFileMetaData);
339  } else {
340  m_tree[DataStore::c_Persistent]->Branch("FileMetaData", &m_outputFileMetaData, m_basketsize, m_splitLevel);
341  }
342 
343  dir->cd();
344  if (m_outputSplitSize) {
345  B2INFO(getName() << ": Opened " << (m_fileIndex > 0 ? "new " : "") << "file for writing" << LogVar("filename", out));
346  }
347 
348  // Insert file entry into the DAQ DB while open new file, online storage feature
349  try {
350  m_db->connect();
351  m_db->execute("INSERT INTO datafiles_root "
352  "(name, path, host, disk, runtype, expno, runno, compression_level, compression_algorithm, time_open, "
353  "size, nevents, nfullevents, correct_close, renamed, used_for_merge, removed) "
354  "VALUES ('%s', '%s', '%s', '%s','%s', %d, %d, %d, %d, '%s', 0, 0, 0, FALSE, FALSE, FALSE, FALSE);",
355  out_notmp.filename().c_str(), out_notmp.c_str(), m_HLTName.c_str(), m_disk.c_str(),
356  m_runType.c_str(), m_expno, m_runno,
357  m_compressionLevel, m_compressionAlgorithm,
358  (boost::posix_time::to_iso_extended_string(boost::posix_time::microsec_clock::local_time())+std::string("+09")).c_str());
359  } catch (const DBHandlerException &e) {
360  B2WARNING(e.what());
361  }
362 }
363 
364 
365 void StorageRootOutputModule::event()
366 {
367  // Many safety guards and features for online storage
368  // Skip the first event which is the ZMQ init message
369  if (m_firstEvent) {
370  m_firstEvent = false;
371  return;
372  }
373 
374  // Exp/run number should be increased
375  if ((m_expno > m_eventMetaData->getExperiment()) ||
376  (m_expno == m_eventMetaData->getExperiment() && m_runno > m_eventMetaData->getRun())) {
377  return;
378  }
379 
380  // Close file and set file index to 0 if exp/run number is changed
381  if (m_file) {
382  if (m_expno != m_eventMetaData->getExperiment() || m_runno != m_eventMetaData->getRun()) {
383  closeFile();
384  while (m_file) closeFile(); // I hope that this will not be failed
385  m_fileIndex = 0;
386  }
387  }
388 
389  // Open file with automatic naming
390  if (!m_file) {
391  m_expno = m_eventMetaData->getExperiment();
392  m_runno = m_eventMetaData->getRun();
393  m_processNumber = atoi(getName().substr(0, getName().find(std::string("_"))).c_str());
394  m_disk = StringUtil::form("disk%02d", (m_processNumber%m_nDisk)+1);
395  m_outputFileName = StringUtil::form("/rawdata/%s/belle/Raw/%4.4d/%5.5d/%s.%4.4d.%5.5d.%s.p%02d.root",
396  m_disk.c_str(), m_expno, m_runno, m_runType.c_str(),
397  m_expno, m_runno, m_HLTName.c_str(), m_processNumber);
398  }
399 
400  // if we closed after last event ... make a new one
401  if (!m_file) openFile();
402 
403  if (!m_keepParents) {
404  if (m_fileMetaData) {
405  m_eventMetaData->setParentLfn(m_fileMetaData->getLfn());
406  }
407  }
408 
409  //fill Event data
410  fillTree(DataStore::c_Event);
411 
412  if (m_fileMetaData) {
413  if (m_keepParents) {
414  for (int iparent = 0; iparent < m_fileMetaData->getNParents(); iparent++) {
415  string lfn = m_fileMetaData->getParent(iparent);
416  if (!lfn.empty() && (m_parentLfns.empty() || (m_parentLfns.back() != lfn))) {
417  m_parentLfns.push_back(lfn);
418  }
419  }
420  } else {
421  string lfn = m_fileMetaData->getLfn();
422  if (!lfn.empty() && (m_parentLfns.empty() || (m_parentLfns.back() != lfn))) {
423  m_parentLfns.push_back(lfn);
424  }
425  }
426  }
427 
428  // keep track of file level metadata
429  unsigned long experiment = m_eventMetaData->getExperiment();
430  unsigned long run = m_eventMetaData->getRun();
431  unsigned long event = m_eventMetaData->getEvent();
432  if (m_experimentLow > m_experimentHigh) { //starting condition
433  m_experimentLow = m_experimentHigh = experiment;
434  m_runLow = m_runHigh = run;
435  m_eventLow = m_eventHigh = event;
436  } else {
437  if ((experiment < m_experimentLow) || ((experiment == m_experimentLow) && ((run < m_runLow) || ((run == m_runLow)
438  && (event < m_eventLow))))) {
439  m_experimentLow = experiment;
440  m_runLow = run;
441  m_eventLow = event;
442  }
443  if ((experiment > m_experimentHigh) || ((experiment == m_experimentHigh) && ((run > m_runHigh) || ((run == m_runHigh)
444  && (event > m_eventHigh))))) {
445  m_experimentHigh = experiment;
446  m_runHigh = run;
447  m_eventHigh = event;
448  }
449  }
450 
451  // check if the event is a full event or not: if yes, increase the counter
452  if (m_eventMetaData->getErrorFlag() == 0) // no error flag -> this is a full event
453  m_nFullEvents++;
454 
455  // check if we need to split the file
456  if (m_outputSplitSize and (uint64_t)m_file->GetEND() > *m_outputSplitSize) {
457  // close file and open new one
458  B2INFO(getName() << ": Output size limit reached, closing file ...");
459  closeFile();
460  // Introduce while for online storage (can be removed?)
461  while (m_file) closeFile();
462  }
463 }
464 
465 void StorageRootOutputModule::fillFileMetaData()
466 {
467  bool isMC = (m_fileMetaData) ? m_fileMetaData->isMC() : true;
468 
469  // For online storage, force to declareRealData()
470  // I wonder why the no file meta data is associated with isMC == true
471  isMC = false;
472 
473  new(m_outputFileMetaData) FileMetaData;
474  if (!isMC) m_outputFileMetaData->declareRealData();
475 
476  if (m_tree[DataStore::c_Event]) {
477  //create an index for the event tree
478  TTree* tree = m_tree[DataStore::c_Event];
479  unsigned long numEntries = tree->GetEntries();
480  m_outputFileMetaData->setNFullEvents(m_nFullEvents);
481  if (m_buildIndex && numEntries > 0) {
482  if (numEntries > 10000000) {
483  //10M events correspond to about 240MB for the TTreeIndex object. for more than ~45M entries this causes crashes, broken files :(
484  B2WARNING("Not building TTree index because of large number of events. The index object would conflict with ROOT limits on object size and cause problems.");
485  } else if (tree->GetBranch("EventMetaData")) {
486  tree->SetBranchAddress("EventMetaData", nullptr);
487  RootIOUtilities::buildIndex(tree);
488  }
489  }
490 
491  m_outputFileMetaData->setNEvents(numEntries);
492  if (m_experimentLow > m_experimentHigh) {
493  //starting condition so apparently no events at all
494  m_outputFileMetaData->setLow(-1, -1, 0);
495  m_outputFileMetaData->setHigh(-1, -1, 0);
496  } else {
497  m_outputFileMetaData->setLow(m_experimentLow, m_runLow, m_eventLow);
498  m_outputFileMetaData->setHigh(m_experimentHigh, m_runHigh, m_eventHigh);
499  }
500  }
501 
502  //fill more file level metadata
503  m_outputFileMetaData->setParents(m_parentLfns);
504  RootIOUtilities::setCreationData(*m_outputFileMetaData);
505  m_outputFileMetaData->setRandomSeed(RandomNumbers::getSeed());
506  m_outputFileMetaData->setSteering(Environment::Instance().getSteering());
507  auto mcEvents = Environment::Instance().getNumberOfMCEvents();
508  if(m_outputSplitSize and mcEvents > 0) {
509  if(m_fileIndex == 0) B2WARNING("Number of MC Events cannot be saved when splitting output files by size, setting to 0");
510  mcEvents = 0;
511  }
512  m_outputFileMetaData->setMcEvents(mcEvents);
513  m_outputFileMetaData->setDatabaseGlobalTag(Database::Instance().getGlobalTags());
514  for (const auto& item : m_additionalDataDescription) {
515  m_outputFileMetaData->setDataDescription(item.first, item.second);
516  }
517  // Set the LFN to the filename: if it's a URL to directly, otherwise make sure it's absolute
518  std::string lfn = m_file->GetName();
519  if(m_regularFile) {
520  lfn = std::filesystem::absolute(lfn).string();
521  }
522  // Format LFN if BELLE2_LFN_FORMATSTRING is set
523  std::string format = EnvironmentVariables::get("BELLE2_LFN_FORMATSTRING", "");
524  if (!format.empty()) {
525  auto format_filename = boost::python::import("B2Tools.format").attr("format_filename");
526  lfn = boost::python::extract<std::string>(format_filename(format, m_outputFileName, m_outputFileMetaData->getJsonStr()));
527  }
528  m_outputFileMetaData->setLfn(lfn);
529  //register the file in the catalog
530  if (m_updateFileCatalog) {
531  FileCatalog::Instance().registerFile(m_file->GetName(), *m_outputFileMetaData);
532  }
533 }
534 
535 
536 void StorageRootOutputModule::terminate()
537 {
538  closeFile();
539  // Introduce while for online storage (can be removed?)
540  while (m_file) closeFile();
541 }
542 
543 void StorageRootOutputModule::closeFile()
544 {
545  if(!m_file) return;
546 
547  fillFileMetaData();
548 
549  //fill Persistent data
550  fillTree(DataStore::c_Persistent);
551 
552  //write the trees
553  TDirectory* dir = gDirectory;
554  m_file->cd();
555  for (int durability = 0; durability < DataStore::c_NDurabilityTypes; ++durability) {
556  if (m_tree[durability]) {
557  B2DEBUG(30, "Write TTree " << c_treeNames[durability]);
558  m_tree[durability]->Write(c_treeNames[durability].c_str(), TObject::kWriteDelete);
559  delete m_tree[durability];
560  }
561  m_tree[durability] = nullptr;
562  }
563  dir->cd();
564 
565  const std::string filename = m_file->GetName();
566  if (m_outputSplitSize) {
567  B2INFO(getName() << ": Finished writing file." << LogVar("filename", filename));
568  }
569 
570  // Before deleting m_file, store file list in online storage file list DB table
571  const std::filesystem::path filename_path{filename};
572  std::string filename_notmp = m_file->GetName();
573  filename_notmp.erase(0, 7); // remove "/buffer" in front of full path
574  const std::filesystem::path filename_notmp_path{filename_notmp};
575  try {
576  m_db->connect();
577  m_db->execute("UPDATE datafiles_root SET "
578  "correct_close = TRUE, "
579  "will_merge = %s ,"
580  "nevents = %lu, "
581  "nfullevents = %lu, "
582  "size = %lu, "
583  "time_close = '%s' "
584  "WHERE name = '%s' AND host = '%s';",
585  ((boost::optional<uint64_t>)(m_file->GetSize()*2) < m_outputSplitSize || m_ramdiskBuffer) ? "true" : "false",
586  m_outputFileMetaData->getNEvents(), m_outputFileMetaData->getNFullEvents(), m_file->GetSize(),
587  (boost::posix_time::to_iso_extended_string(boost::posix_time::microsec_clock::local_time())+std::string("+09")).c_str(),
588  filename_path.filename().c_str(), m_HLTName.c_str());
589  } catch (const DBHandlerException &e) {
590  B2WARNING(e.what());
591  }
592  m_db->close();
593 
594  delete m_file;
595  m_file = nullptr;
596 
597  // and now add it to the metadata service as it's fully written
598  MetadataService::Instance().addRootOutputFile(filename, m_outputFileMetaData);
599 
600  // reset some variables
601  for (auto & entry : m_entries) {
602  entry.clear();
603  }
604  m_parentLfns.clear();
605  m_experimentLow = 1;
606  m_experimentHigh = 0;
607  m_runLow = 0;
608  m_runHigh = 0;
609  m_eventLow = 0;
610  m_eventHigh = 0;
611  // and increase index of next file
612  ++m_fileIndex;
613 
614  // Call system(/usr/bin/rsync) for online storage ramdisk buffer cleanup
615  if (m_ramdiskBuffer) {
616  const std::string rsync_cmd = std::string("/usr/bin/rsync -a --remove-source-files --recursive ") + filename + std::string(" ") + filename_notmp_path.parent_path().generic_string() + std::string("/ &");
617  B2INFO(getName() << ": system(" << rsync_cmd << ")");
618  system(rsync_cmd.c_str());
619  }
620 }
621 
622 
623 void StorageRootOutputModule::fillTree(DataStore::EDurability durability)
624 {
625  if (!m_tree[durability]) return;
626 
627  TTree& tree = *m_tree[durability];
628  for(auto* entry: m_entries[durability]) {
629  // Check for entries whose object was not created and mark them as invalid.
630  // We still have to write them in the file due to the structure we have. This could be done better
631  if (!entry->ptr) {
632  entry->object->SetBit(kInvalidObject);
633  }
634  //FIXME: Do we need this? in theory no but it crashes in parallel processing otherwise ¯\_(ツ)_/¯
635  if (entry->name == "FileMetaData") {
636  tree.SetBranchAddress(entry->name.c_str(), &m_outputFileMetaData);
637  } else {
638  tree.SetBranchAddress(entry->name.c_str(), &entry->object);
639  }
640  }
641  tree.Fill();
642  for (auto* entry: m_entries[durability]) {
643  entry->object->ResetBit(kInvalidObject);
644  }
645 
646  const bool writeError = m_file->TestBit(TFile::kWriteError);
647  if (writeError) {
648  //m_file deleted first so we have a chance of closing it (though that will probably fail)
649  const std::string filename = m_file->GetName();
650  delete m_file;
651  B2FATAL("A write error occured while saving '" << filename << "', please check if enough disk space is available.");
652  }
653 }
EDurability
Durability types.
Definition: DataStore.h:58
std::map< std::string, StoreEntry > StoreEntryMap
Map for StoreEntries.
Definition: DataStore.h:87
Metadata information about a file.
Definition: FileMetaData.h:29
void declareRealData()
Declare that this is not generated, but real data.
Definition: FileMetaData.h:294
Base class for Modules.
Definition: Module.h:72
Write objects from DataStore into a ROOT file.
Base class for StoreObjPtr and StoreArray for easier common treatment.
Class to store variables with their name which were sent to the logging service.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
bool hasCustomStreamer(const TClass *cl)
Returns true if and only if 'cl' has a user-defined streamer.
const std::string c_treeNames[]
Names of trees.
std::set< std::string > filterBranches(const std::set< std::string > &branchesToFilter, const std::vector< std::string > &branches, const std::vector< std::string > &excludeBranches, int durability, bool quiet=false)
Given a list of input branches and lists of branches to include/exclude, returns a list of branches t...
bool hasStreamer(const TClass *cl)
Returns true if and only if 'cl' or one of its bases has I/O streamers.
Abstract base class for different kinds of events.