Belle II Software  release-08-00-10
StorageRootOutputModule.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <boost/python.hpp>
10 
11 #include <daq/storage/modules/StorageRootOutputModule.h>
12 
13 #include <framework/io/RootIOUtilities.h>
14 #include <framework/core/FileCatalog.h>
15 #include <framework/core/MetadataService.h>
16 #include <framework/core/RandomNumbers.h>
17 #include <framework/database/Database.h>
18 // needed for complex module parameter
19 #include <framework/core/ModuleParam.templateDetails.h>
20 #include <framework/utilities/EnvironmentVariables.h>
21 
22 #include <boost/format.hpp>
23 #include <boost/algorithm/string.hpp>
24 
25 #include <TClonesArray.h>
26 
27 #include <nlohmann/json.hpp>
28 
29 #include <memory>
30 #include <regex>
31 #include <filesystem>
32 
33 // For online storage
34 #include <boost/date_time/posix_time/posix_time.hpp>
35 #include <daq/slc/base/ConfigFile.h>
36 #include <daq/slc/psql/PostgreSQLInterface.h>
37 #include <daq/slc/database/DBHandlerException.h>
38 #include <daq/slc/base/StringUtil.h>
39 
40 
41 using namespace std;
42 using namespace Belle2;
43 using namespace RootIOUtilities;
44 
45 //-----------------------------------------------------------------
46 // Register the Module
47 //-----------------------------------------------------------------
48 REG_MODULE(StorageRootOutput)
49 
50 //-----------------------------------------------------------------
51 // Implementation
52 //-----------------------------------------------------------------
53 
54 StorageRootOutputModule::StorageRootOutputModule() : Module(), m_file(nullptr), m_tree{0}, m_experimentLow(1), m_runLow(0),
55  m_eventLow(0), m_experimentHigh(0), m_runHigh(0), m_eventHigh(0)
56 {
57  //Set module properties
58  setDescription("Writes DataStore objects into a .root file. Data is stored in a TTree 'tree' for event-dependent and in 'persistent' for peristent data. You can use RootInput to read the files back into basf2.");
59  setPropertyFlags(c_Output);
60 
61  //Parameter definition
62  addParam("outputFileName", m_outputFileName, "Name of the output file. Can be overridden using the -o argument to basf2.",
63  string("RootOutput.root"));
64  addParam("ignoreCommandLineOverride", m_ignoreCommandLineOverride,
65  "Ignore override of file name via command line argument -o. Useful if you have multiple output modules in one path.", false);
66  addParam("compressionLevel", m_compressionLevel,
67  "0 for no, 1 for low, 9 for high compression. Level 1 usually reduces size by >50%, higher levels have no noticeable effect. On typical hard disks, disabling compression reduces write time by 10-20 %, but almost doubles read time, so you probably should leave this turned on.",
68  m_compressionLevel);
69  addParam("compressionAlgorithm", m_compressionAlgorithm,
70  "Set the Compression algorithm. Recommended values are 0 for default, 1 for zlib and 4 for lz4\n\n"
71  ".. versionadded:: release-03-00-00", m_compressionAlgorithm);
72  addParam("splitLevel", m_splitLevel,
73  "Branch split level: determines up to which depth object members will be saved in separate sub-branches in the tree. For arrays or objects with custom streamers, -1 is used instead to ensure the streamers are used. The default (99) usually gives the highest read performance with RootInput.",
74  99);
75  addParam("updateFileCatalog", m_updateFileCatalog, R"DOC(
76 Flag that specifies whether the file metadata catalog is updated or created.
77 This is only necessary in special cases and can always be done afterwards using
78 ``b2file-catalog-add filename.root``"
79 
80 (You can also set the ``BELLE2_FILECATALOG`` environment variable to NONE to get
81 the same effect as setting this to false))DOC", false);
82 
83  vector<string> emptyvector;
84  addParam(c_SteerBranchNames[0], m_branchNames[0],
85  "Names of event durability branches to be saved. Empty means all branches. Objects with c_DontWriteOut flag added here will also be saved. (EventMetaData is always saved)",
86  emptyvector);
87  addParam(c_SteerBranchNames[1], m_branchNames[1],
88  "Names of persistent durability branches to be saved. Empty means all branches. Objects with c_DontWriteOut flag added here will also be saved. (FileMetaData is always saved)",
89  emptyvector);
90  addParam(c_SteerAdditionalBranchNames[0], m_additionalBranchNames[0],
91  "Add additional event branch names without the need to specify all branchnames.",
92  emptyvector);
93  addParam(c_SteerAdditionalBranchNames[1], m_additionalBranchNames[1],
94  "Add additional persistent branch names without the need to specify all branchnames.",
95  emptyvector);
96  addParam(c_SteerExcludeBranchNames[0], m_excludeBranchNames[0],
97  "Names of event durability branches NOT to be saved. Branches also in branchNames are not saved.", emptyvector);
98  addParam(c_SteerExcludeBranchNames[1], m_excludeBranchNames[1],
99  "Names of persistent durability branches NOT to be saved. Branches also in branchNamesPersistent are not saved.", emptyvector);
100  addParam("autoFlushSize", m_autoflush,
101  "Value for TTree SetAutoFlush(): a positive value tells ROOT to flush all baskets to disk after n entries, a negative value to flush after -n bytes",
102  -10000000);
103  addParam("autoSaveSize", m_autosave,
104  "Value for TTree SetAutoSave(): a positive value tells ROOT to write the TTree metadata after n entries, a negative value to write the metadata after -n bytes",
105  -10000000);
106  addParam("basketSize", m_basketsize, "Basketsize for Branches in the Tree in bytes", 32000);
107  addParam("additionalDataDescription", m_additionalDataDescription, "Additional dictionary of "
108  "name->value pairs to be added to the file metadata to describe the data",
109  m_additionalDataDescription);
110  addParam("buildIndex", m_buildIndex, "Build Event Index for faster finding of events by exp/run/event number", m_buildIndex);
111  addParam("keepParents", m_keepParents, "Keep parents files of input files, input files will not be added as output file's parents",
112  m_keepParents);
113  addParam("outputSplitSize", m_outputSplitSize, R"DOC(
114 If given split the output file once the file has reached the given size in MB.
115 If set the filename will end in ``.f{index:05d}.root``. So if for example
116 ``outputFileName`` is set to "RootOutput.root" then the files will be named
117 ``RootOutput.f00000.root``, ``RootOutput.f00001.root``,
118 ``RootOutput.f00002.root``, ...
119 
120 All created output files are complete and independent files and can
121 subsequently processed completely independent.
122 
123 Note:
124  The output files will be approximately of the size given by
125  ``outputSplitSize`` but they will be slightly larger since
126  additional information has to be written at the end of the file. If necessary
127  please account for this. Also, using ``buildIndex=False`` might be beneficial
128  to reduce the overshoot.
129 
130 Warning:
131  This will set the amount of generated events stored in the file metadata to
132  zero as it is not possible to determine which fraction ends up in which
133  output file.
134 
135 .. versionadded:: release-03-00-00
136 )DOC", m_outputSplitSize);
137 
138  // Parameters for online storage
139  addParam("runType", m_runType, "Run type", string("null"));
140  addParam("HLTName", m_HLTName, "HLT name", string("HLT00"));
141  addParam("nDisk", m_nDisk, "The number of paratitions", 3);
142  addParam("skipFirstEvent", m_firstEvent, "Boolean to skip the first event or not. "
143  "If the module is used inside the hbasf2, like HLT storage, the first event need to be skipped.", m_firstEvent);
144  addParam("ramdiskBuffer", m_ramdiskBuffer, "Boolean to make small ramdisk buffer setup. "
145  "If this is false, assuming the buffer disks are large SSD.", m_ramdiskBuffer);
146 }
147 
148 
149 StorageRootOutputModule::~StorageRootOutputModule() = default;
150 
151 void StorageRootOutputModule::initialize()
152 {
153  //ROOT has a default maximum size of 100GB for trees??? For larger trees it creates a new file and does other things that finally produce crashes.
154  //Let's set this to 100PB, that should last a bit longer.
155  TTree::SetMaxTreeSize(1000 * 1000 * 100000000000LL);
156 
157  //create a file level metadata object in the data store
158  m_fileMetaData.registerInDataStore();
159  //and make sure we have event meta data
160  m_eventMetaData.isRequired();
161 
162  //check outputSplitSize
163  if (m_outputSplitSize) {
164  if (*m_outputSplitSize == 0) B2ERROR("outputSplitSize must be set to a positive value");
165  // Warn is splitsize is >= 1TB ... because this seems weirdly like size was given in bytes
166  if (*m_outputSplitSize >= 1024*1024) B2WARNING("outputSplitSize set to " << *m_outputSplitSize << " MB, please make sure the units are correct");
167  // convert to bytes
168  *m_outputSplitSize *= 1024 * 1024;
169  }
170 
171  getFileNames();
172 
173  // Now check if the file has a protocol like file:// or http:// in front
174  std::regex protocol("^([A-Za-z]*)://");
175  if(std::smatch m; std::regex_search(m_outputFileName, m, protocol)) {
176  if(m[1] == "file") {
177  // file protocol: treat as local and just remove it from the filename
178  m_outputFileName = std::regex_replace(m_outputFileName, protocol, "");
179  } else {
180  // any other protocol: not local, don't create directories
181  m_regularFile = false;
182  }
183  }
184  // For online storage
185  // Do not open file in basf2 initialize
186  // openFile();
187 
188  ConfigFile config("slowcontrol");
189  PostgreSQLInterface *db = new PostgreSQLInterface(config.get("database.host"),
190  config.get("database.dbname"),
191  config.get("database.user"),
192  config.get("database.password"),
193  config.getInt("database.port"));
194  m_db = db;
195 }
196 
197 void StorageRootOutputModule::openFile()
198 {
199  // Since we open a new file, we also have to reset the number of full events
200  m_nFullEvents = 0;
201  // Continue with opening the file
202  TDirectory* dir = gDirectory;
203  std::filesystem::path out{m_outputFileName};
204  if (m_outputSplitSize) {
205  // Mangle the filename to add the fNNNNN part. However we need to be
206  // careful since the file name could be non-local and have some options or
207  // anchor information attached (like
208  // http://mydomain.org/filename.root?foo=bar#baz). So use "TUrl" *sigh* to
209  // do the parsing and only replace the extension of the file part.
210  TUrl fileUrl(m_outputFileName.c_str(), m_regularFile);
211  std::filesystem::path file{fileUrl.GetFile()};
212  file.replace_extension((boost::format("f%05d.root") % m_fileIndex).str());
213  fileUrl.SetFile(file.c_str());
214  // In case of regular files we don't want the protocol or anything, just the file
215  out = m_regularFile? fileUrl.GetFileAndOptions() : fileUrl.GetUrl();
216  }
217 
218  // For online storage buffer disks
219  std::filesystem::path out_notmp = out;
220  out = std::filesystem::path{std::string("/buffer") + out.generic_string()};
221 
222  m_file = TFile::Open(out.c_str(), "RECREATE", "basf2 Event File");
223  if ((!m_file || m_file->IsZombie()) && m_regularFile) {
224  //try creating necessary directories since this is a local file
225  auto dirpath = out.parent_path();
226 
227  if (std::filesystem::create_directories(dirpath)) {
228  B2INFO("Created missing directory " << dirpath << ".");
229  //try again
230  m_file = TFile::Open(out.c_str(), "RECREATE", "basf2 Event File");
231  }
232 
233  }
234 
235  // Retry file open for online storage, with ramdisk, may not be needed
236  int openCounter = 0;
237  while (!m_file || m_file->IsZombie()) {
238  m_file = TFile::Open(out.c_str(), "RECREATE", "basf2 Event File");
239  B2INFO("process(" << m_processNumber << "), Try open: " << ++openCounter);
240  }
241 
242  if (!m_file || m_file->IsZombie()) {
243  B2FATAL("Couldn't open file " << out << " for writing!");
244  }
245  m_file->SetCompressionAlgorithm(m_compressionAlgorithm);
246  m_file->SetCompressionLevel(m_compressionLevel);
247 
248  for (int durability = 0; durability < DataStore::c_NDurabilityTypes; durability++) {
249  DataStore::StoreEntryMap& map = DataStore::Instance().getStoreEntryMap(DataStore::EDurability(durability));
250  set<string> branchList;
251  for (const auto& pair : map)
252  branchList.insert(pair.first);
253  //skip branches the user doesn't want
254  branchList = filterBranches(branchList, m_branchNames[durability], m_excludeBranchNames[durability], durability);
255 
256  //create the tree and branches
257  m_tree[durability] = new TTree(c_treeNames[durability].c_str(), c_treeNames[durability].c_str());
258  m_tree[durability]->SetAutoFlush(m_autoflush);
259  m_tree[durability]->SetAutoSave(m_autosave);
260  for (auto & iter : map) {
261  const std::string& branchName = iter.first;
262  //skip transient entries (allow overriding via branchNames)
263  if (iter.second.dontWriteOut
264  && find(m_branchNames[durability].begin(), m_branchNames[durability].end(), branchName) == m_branchNames[durability].end()
265  && find(m_additionalBranchNames[durability].begin(), m_additionalBranchNames[durability].end(),
266  branchName) == m_additionalBranchNames[durability].end())
267  continue;
268  //skip branches the user doesn't want
269  if (branchList.count(branchName) == 0) {
270  //make sure FileMetaData and EventMetaData are always included in the output
271  if (((branchName != "FileMetaData") || (durability == DataStore::c_Event)) &&
272  ((branchName != "EventMetaData") || (durability == DataStore::c_Persistent))) {
273  continue;
274  }
275  }
276 
277  // Warn for anything other than FileMetaData and ProcessStatistics ...
278  if(durability == DataStore::c_Persistent and m_outputSplitSize and m_fileIndex==0 and
279  (branchName != "FileMetaData" and branchName != "ProcessStatistics")) {
280  B2WARNING("Persistent branches might not be stored as expected when splitting the output by size" << LogVar("branch", branchName));
281  }
282 
283  TClass* entryClass = iter.second.objClass;
284 
285  //I want to do this in the input module, but I apparently I cannot disable reading those branches.
286  //isabling reading the branch by not calling SetBranchAddress() for it results in the following crashes. Calling SetBranchStatus(..., 0) doesn't help, either.
287  //reported to ROOT devs, let's see if it gets fixed.
288  //
289  //HasDictionary() is a new function in root 6
290  //using it instead of GetClassInfo() avoids having to parse header files (and
291  //the associated memory cost)
292  if (!entryClass->HasDictionary()) {
293  if (m_fileIndex == 0) {
294  B2WARNING("No dictionary found, object will not be saved (This is probably an obsolete class that is still present in the input file.)"
295  << LogVar("class", entryClass->GetName()) << LogVar("branch", branchName));
296  }
297  continue;
298  }
299 
300  if (!hasStreamer(entryClass)) {
301  B2ERROR("The version number in the ClassDef() macro must be at least 1 to enable I/O!" << LogVar("class", entryClass->GetName()));
302  }
303 
304  int splitLevel = m_splitLevel;
305  if (hasCustomStreamer(entryClass)) {
306  B2DEBUG(38, "Class has custom streamer, setting split level -1 for this branch." << LogVar("class", entryClass->GetName()));
307 
308  splitLevel = -1;
309  if (iter.second.isArray) {
310  //for arrays, we also don't want TClonesArray to go around our streamer
311  static_cast<TClonesArray*>(iter.second.object)->BypassStreamer(kFALSE);
312  }
313  }
314  m_tree[durability]->Branch(branchName.c_str(), &iter.second.object, m_basketsize, splitLevel);
315  m_entries[durability].push_back(&iter.second);
316  B2DEBUG(39, "The branch " << branchName << " was created.");
317 
318  //Tell DataStore that we are using this entry
319  if (m_fileIndex == 0) {
320  DataStore::Instance().optionalInput(StoreAccessorBase(branchName, (DataStore::EDurability)durability, entryClass,
321  iter.second.isArray));
322  }
323  }
324  }
325 
326  dir->cd();
327  if (m_outputSplitSize) {
328  B2INFO(getName() << ": Opened " << (m_fileIndex > 0 ? "new " : "") << "file for writing" << LogVar("filename", out));
329  }
330 
331  // Insert file entry into the DAQ DB while open new file, online storage feature
332  try {
333  m_db->connect();
334  m_db->execute("INSERT INTO datafiles_root "
335  "(name, path, host, disk, runtype, expno, runno, compression_level, compression_algorithm, time_open, "
336  "size, nevents, nfullevents, correct_close, renamed, used_for_merge, removed) "
337  "VALUES ('%s', '%s', '%s', '%s','%s', %d, %d, %d, %d, '%s', 0, 0, 0, FALSE, FALSE, FALSE, FALSE);",
338  out_notmp.filename().c_str(), out_notmp.c_str(), m_HLTName.c_str(), m_disk.c_str(),
339  m_runType.c_str(), m_expno, m_runno,
340  m_compressionLevel, m_compressionAlgorithm,
341  (boost::posix_time::to_iso_extended_string(boost::posix_time::microsec_clock::local_time())+std::string("+09")).c_str());
342  } catch (const DBHandlerException &e) {
343  B2WARNING(e.what());
344  }
345 }
346 
347 
348 void StorageRootOutputModule::event()
349 {
350  // Many safety guards and features for online storage
351  // Skip the first event which is the ZMQ init message
352  if (m_firstEvent) {
353  m_firstEvent = false;
354  return;
355  }
356 
357  // Exp/run number should be increased
358  if ((m_expno > m_eventMetaData->getExperiment()) ||
359  (m_expno == m_eventMetaData->getExperiment() && m_runno > m_eventMetaData->getRun())) {
360  return;
361  }
362 
363  // Close file and set file index to 0 if exp/run number is changed
364  if (m_file) {
365  if (m_expno != m_eventMetaData->getExperiment() || m_runno != m_eventMetaData->getRun()) {
366  closeFile();
367  while (m_file) closeFile(); // I hope that this will not be failed
368  m_fileIndex = 0;
369  }
370  }
371 
372  // Open file with automatic naming
373  if (!m_file) {
374  m_expno = m_eventMetaData->getExperiment();
375  m_runno = m_eventMetaData->getRun();
376  m_disk = StringUtil::form("disk%02d", (m_processNumber%m_nDisk)+1);
377  m_processNumber = atoi(getName().substr(0, getName().find(std::string("_"))).c_str());
378  m_outputFileName = StringUtil::form("/rawdata/%s/belle/Raw/%4.4d/%5.5d/%s.%4.4d.%5.5d.%s.p%02d.root",
379  m_disk.c_str(), m_expno, m_runno, m_runType.c_str(),
380  m_expno, m_runno, m_HLTName.c_str(), m_processNumber);
381  }
382 
383  // if we closed after last event ... make a new one
384  if (!m_file) openFile();
385 
386  if (!m_keepParents) {
387  if (m_fileMetaData) {
388  m_eventMetaData->setParentLfn(m_fileMetaData->getLfn());
389  }
390  }
391 
392  //fill Event data
393  fillTree(DataStore::c_Event);
394 
395  if (m_fileMetaData) {
396  if (m_keepParents) {
397  for (int iparent = 0; iparent < m_fileMetaData->getNParents(); iparent++) {
398  string lfn = m_fileMetaData->getParent(iparent);
399  if (!lfn.empty() && (m_parentLfns.empty() || (m_parentLfns.back() != lfn))) {
400  m_parentLfns.push_back(lfn);
401  }
402  }
403  } else {
404  string lfn = m_fileMetaData->getLfn();
405  if (!lfn.empty() && (m_parentLfns.empty() || (m_parentLfns.back() != lfn))) {
406  m_parentLfns.push_back(lfn);
407  }
408  }
409  }
410 
411  // keep track of file level metadata
412  unsigned long experiment = m_eventMetaData->getExperiment();
413  unsigned long run = m_eventMetaData->getRun();
414  unsigned long event = m_eventMetaData->getEvent();
415  if (m_experimentLow > m_experimentHigh) { //starting condition
416  m_experimentLow = m_experimentHigh = experiment;
417  m_runLow = m_runHigh = run;
418  m_eventLow = m_eventHigh = event;
419  } else {
420  if ((experiment < m_experimentLow) || ((experiment == m_experimentLow) && ((run < m_runLow) || ((run == m_runLow)
421  && (event < m_eventLow))))) {
422  m_experimentLow = experiment;
423  m_runLow = run;
424  m_eventLow = event;
425  }
426  if ((experiment > m_experimentHigh) || ((experiment == m_experimentHigh) && ((run > m_runHigh) || ((run == m_runHigh)
427  && (event > m_eventHigh))))) {
428  m_experimentHigh = experiment;
429  m_runHigh = run;
430  m_eventHigh = event;
431  }
432  }
433 
434  // check if the event is a full event or not: if yes, increase the counter
435  if (m_eventMetaData->getErrorFlag() == 0) // no error flag -> this is a full event
436  m_nFullEvents++;
437 
438  // check if we need to split the file
439  if (m_outputSplitSize and (uint64_t)m_file->GetEND() > *m_outputSplitSize) {
440  // close file and open new one
441  B2INFO(getName() << ": Output size limit reached, closing file ...");
442  closeFile();
443  // Introduce while for online storage (can be removed?)
444  while (m_file) closeFile();
445  }
446 }
447 
448 void StorageRootOutputModule::fillFileMetaData()
449 {
450  bool isMC = (m_fileMetaData) ? m_fileMetaData->isMC() : true;
451 
452  // For online storage, force to declareRealData()
453  // I wonder why the no file meta data is associated with isMC == true
454  isMC = false;
455 
456  m_fileMetaData.create(true);
457  if (!isMC) m_fileMetaData->declareRealData();
458 
459  if (m_tree[DataStore::c_Event]) {
460  //create an index for the event tree
461  TTree* tree = m_tree[DataStore::c_Event];
462  unsigned long numEntries = tree->GetEntries();
463  m_fileMetaData->setNFullEvents(m_nFullEvents);
464  if (m_buildIndex && numEntries > 0) {
465  if (numEntries > 10000000) {
466  //10M events correspond to about 240MB for the TTreeIndex object. for more than ~45M entries this causes crashes, broken files :(
467  B2WARNING("Not building TTree index because of large number of events. The index object would conflict with ROOT limits on object size and cause problems.");
468  } else if (tree->GetBranch("EventMetaData")) {
469  tree->SetBranchAddress("EventMetaData", nullptr);
470  RootIOUtilities::buildIndex(tree);
471  }
472  }
473 
474  m_fileMetaData->setNEvents(numEntries);
475  if (m_experimentLow > m_experimentHigh) {
476  //starting condition so apparently no events at all
477  m_fileMetaData->setLow(-1, -1, 0);
478  m_fileMetaData->setHigh(-1, -1, 0);
479  } else {
480  m_fileMetaData->setLow(m_experimentLow, m_runLow, m_eventLow);
481  m_fileMetaData->setHigh(m_experimentHigh, m_runHigh, m_eventHigh);
482  }
483  }
484 
485  //fill more file level metadata
486  m_fileMetaData->setParents(m_parentLfns);
487  RootIOUtilities::setCreationData(*m_fileMetaData);
488  m_fileMetaData->setRandomSeed(RandomNumbers::getSeed());
489  m_fileMetaData->setSteering(Environment::Instance().getSteering());
490  auto mcEvents = Environment::Instance().getNumberOfMCEvents();
491  if(m_outputSplitSize and mcEvents > 0) {
492  if(m_fileIndex == 0) B2WARNING("Number of MC Events cannot be saved when splitting output files by size, setting to 0");
493  mcEvents = 0;
494  }
495  m_fileMetaData->setMcEvents(mcEvents);
496  m_fileMetaData->setDatabaseGlobalTag(Database::Instance().getGlobalTags());
497  for (const auto& item : m_additionalDataDescription) {
498  m_fileMetaData->setDataDescription(item.first, item.second);
499  }
500  // Set the LFN to the filename: if it's a URL to directly, otherwise make sure it's absolute
501  std::string lfn = m_file->GetName();
502  if(m_regularFile) {
503  lfn = std::filesystem::absolute(lfn).string();
504  }
505  // Format LFN if BELLE2_LFN_FORMATSTRING is set
506  std::string format = EnvironmentVariables::get("BELLE2_LFN_FORMATSTRING", "");
507  if (!format.empty()) {
508  auto format_filename = boost::python::import("B2Tools.format").attr("format_filename");
509  lfn = boost::python::extract<std::string>(format_filename(format, m_outputFileName, m_fileMetaData->getJsonStr()));
510  }
511  m_fileMetaData->setLfn(lfn);
512  //register the file in the catalog
513  if (m_updateFileCatalog) {
514  FileCatalog::Instance().registerFile(m_file->GetName(), *m_fileMetaData);
515  }
516  m_outputFileMetaData = *m_fileMetaData;
517 }
518 
519 
520 void StorageRootOutputModule::terminate()
521 {
522  closeFile();
523  // Introduce while for online storage (can be removed?)
524  while (m_file) closeFile();
525 }
526 
527 void StorageRootOutputModule::closeFile()
528 {
529  if(!m_file) return;
530  //get pointer to file level metadata
531  std::unique_ptr<FileMetaData> old;
532  if (m_fileMetaData) old = std::make_unique<FileMetaData>(*m_fileMetaData);
533 
534  fillFileMetaData();
535 
536  //fill Persistent data
537  fillTree(DataStore::c_Persistent);
538 
539  // restore old file meta data if it existed
540  if (old) *m_fileMetaData = *old;
541  old.reset();
542 
543  //write the trees
544  TDirectory* dir = gDirectory;
545  m_file->cd();
546  for (int durability = 0; durability < DataStore::c_NDurabilityTypes; ++durability) {
547  if (m_tree[durability]) {
548  B2DEBUG(30, "Write TTree " << c_treeNames[durability]);
549  m_tree[durability]->Write(c_treeNames[durability].c_str(), TObject::kWriteDelete);
550  delete m_tree[durability];
551  }
552  m_tree[durability] = nullptr;
553  }
554  dir->cd();
555 
556  const std::string filename = m_file->GetName();
557  if (m_outputSplitSize) {
558  B2INFO(getName() << ": Finished writing file." << LogVar("filename", filename));
559  }
560 
561  // Before deleting m_file, store file list in online storage file list DB table
562  const std::filesystem::path filename_path{filename};
563  std::string filename_notmp = m_file->GetName();
564  filename_notmp.erase(0, 7); // remove "/buffer" in front of full path
565  const std::filesystem::path filename_notmp_path{filename_notmp};
566  try {
567  m_db->connect();
568  m_db->execute("UPDATE datafiles_root SET "
569  "correct_close = TRUE, "
570  "will_merge = %s ,"
571  "nevents = %lu, "
572  "nfullevents = %lu, "
573  "size = %lu, "
574  "time_close = '%s' "
575  "WHERE name = '%s' AND host = '%s';",
576  ((boost::optional<uint64_t>)(m_file->GetSize()*2) < m_outputSplitSize || m_ramdiskBuffer) ? "true" : "false",
577  m_outputFileMetaData.getNEvents(), m_outputFileMetaData.getNFullEvents(), m_file->GetSize(),
578  (boost::posix_time::to_iso_extended_string(boost::posix_time::microsec_clock::local_time())+std::string("+09")).c_str(),
579  filename_path.filename().c_str(), m_HLTName.c_str());
580  } catch (const DBHandlerException &e) {
581  B2WARNING(e.what());
582  }
583  m_db->close();
584 
585  delete m_file;
586  m_file = nullptr;
587 
588  // and now add it to the metadata service as it's fully written
589  MetadataService::Instance().addRootOutputFile(filename, &m_outputFileMetaData);
590 
591  // reset some variables
592  for (auto & entry : m_entries) {
593  entry.clear();
594  }
595  m_parentLfns.clear();
596  m_experimentLow = 1;
597  m_experimentHigh = 0;
598  m_runLow = 0;
599  m_runHigh = 0;
600  m_eventLow = 0;
601  m_eventHigh = 0;
602  // and increase index of next file
603  ++m_fileIndex;
604 
605  // Call system(/usr/bin/rsync) for online storage ramdisk buffer cleanup
606  if (m_ramdiskBuffer) {
607  const std::string rsync_cmd = std::string("/usr/bin/rsync -a --remove-source-files --recursive ") + filename + std::string(" ") + filename_notmp_path.parent_path().generic_string() + std::string("/ &");
608  B2INFO(getName() << ": system(" << rsync_cmd << ")");
609  system(rsync_cmd.c_str());
610  }
611 }
612 
613 
614 void StorageRootOutputModule::fillTree(DataStore::EDurability durability)
615 {
616  if (!m_tree[durability]) return;
617 
618  TTree& tree = *m_tree[durability];
619  for(auto* entry: m_entries[durability]) {
620  // Check for entries whose object was not created and mark them as invalid.
621  // We still have to write them in the file due to the structure we have. This could be done better
622  if (!entry->ptr) {
623  entry->object->SetBit(kInvalidObject);
624  }
625  //FIXME: Do we need this? in theory no but it crashes in parallel processing otherwise ¯\_(ツ)_/¯
626  tree.SetBranchAddress(entry->name.c_str(), &entry->object);
627  }
628  tree.Fill();
629  for (auto* entry: m_entries[durability]) {
630  entry->object->ResetBit(kInvalidObject);
631  }
632 
633  const bool writeError = m_file->TestBit(TFile::kWriteError);
634  if (writeError) {
635  //m_file deleted first so we have a chance of closing it (though that will probably fail)
636  const std::string filename = m_file->GetName();
637  delete m_file;
638  B2FATAL("A write error occured while saving '" << filename << "', please check if enough disk space is available.");
639  }
640 }
EDurability
Durability types.
Definition: DataStore.h:58
std::map< std::string, StoreEntry > StoreEntryMap
Map for StoreEntries.
Definition: DataStore.h:87
Base class for Modules.
Definition: Module.h:72
Write objects from DataStore into a ROOT file.
Base class for StoreObjPtr and StoreArray for easier common treatment.
Class to store variables with their name which were sent to the logging service.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
bool hasCustomStreamer(const TClass *cl)
Returns true if and only if 'cl' has a user-defined streamer.
const std::string c_treeNames[]
Names of trees.
std::set< std::string > filterBranches(const std::set< std::string > &branchesToFilter, const std::vector< std::string > &branches, const std::vector< std::string > &excludeBranches, int durability, bool quiet=false)
Given a list of input branches and lists of branches to include/exclude, returns a list of branches t...
bool hasStreamer(const TClass *cl)
Returns true if and only if 'cl' or one of its bases has I/O streamers.
Abstract base class for different kinds of events.