Belle II Software  release-08-01-10
StorageRootOutputModule.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <boost/python.hpp>
10 
11 #include <daq/storage/modules/StorageRootOutputModule.h>
12 
13 #include <framework/io/RootIOUtilities.h>
14 #include <framework/core/FileCatalog.h>
15 #include <framework/core/MetadataService.h>
16 #include <framework/core/RandomNumbers.h>
17 #include <framework/database/Database.h>
18 // needed for complex module parameter
19 #include <framework/core/ModuleParam.templateDetails.h>
20 #include <framework/utilities/EnvironmentVariables.h>
21 
22 #include <boost/format.hpp>
23 #include <boost/algorithm/string.hpp>
24 
25 #include <TClonesArray.h>
26 
27 #include <nlohmann/json.hpp>
28 
29 #include <memory>
30 #include <regex>
31 #include <filesystem>
32 
33 // For online storage
34 #include <boost/date_time/posix_time/posix_time.hpp>
35 #include <daq/slc/base/ConfigFile.h>
36 #include <daq/slc/psql/PostgreSQLInterface.h>
37 #include <daq/slc/database/DBHandlerException.h>
38 #include <daq/slc/base/StringUtil.h>
39 
40 
41 using namespace std;
42 using namespace Belle2;
43 using namespace RootIOUtilities;
44 
45 //-----------------------------------------------------------------
46 // Register the Module
47 //-----------------------------------------------------------------
48 REG_MODULE(StorageRootOutput)
49 
50 //-----------------------------------------------------------------
51 // Implementation
52 //-----------------------------------------------------------------
53 
54 StorageRootOutputModule::StorageRootOutputModule() : Module(), m_file(nullptr), m_tree{0}, m_experimentLow(1), m_runLow(0),
55  m_eventLow(0), m_experimentHigh(0), m_runHigh(0), m_eventHigh(0)
56 {
57  //Set module properties
58  setDescription("Writes DataStore objects into a .root file. Data is stored in a TTree 'tree' for event-dependent and in 'persistent' for peristent data. You can use RootInput to read the files back into basf2.");
59  setPropertyFlags(c_Output);
60 
61  //Parameter definition
62  addParam("outputFileName", m_outputFileName, "Name of the output file. Can be overridden using the -o argument to basf2.",
63  string("RootOutput.root"));
64  addParam("ignoreCommandLineOverride", m_ignoreCommandLineOverride,
65  "Ignore override of file name via command line argument -o. Useful if you have multiple output modules in one path.", false);
66  addParam("compressionLevel", m_compressionLevel,
67  "0 for no, 1 for low, 9 for high compression. Level 1 usually reduces size by >50%, higher levels have no noticeable effect. On typical hard disks, disabling compression reduces write time by 10-20 %, but almost doubles read time, so you probably should leave this turned on.",
68  m_compressionLevel);
69  addParam("compressionAlgorithm", m_compressionAlgorithm,
70  "Set the Compression algorithm. Recommended values are 0 for default, 1 for zlib and 4 for lz4\n\n"
71  ".. versionadded:: release-03-00-00", m_compressionAlgorithm);
72  addParam("splitLevel", m_splitLevel,
73  "Branch split level: determines up to which depth object members will be saved in separate sub-branches in the tree. For arrays or objects with custom streamers, -1 is used instead to ensure the streamers are used. The default (99) usually gives the highest read performance with RootInput.",
74  99);
75  addParam("updateFileCatalog", m_updateFileCatalog, R"DOC(
76 Flag that specifies whether the file metadata catalog is updated or created.
77 This is only necessary in special cases and can always be done afterwards using
78 ``b2file-catalog-add filename.root``"
79 
80 (You can also set the ``BELLE2_FILECATALOG`` environment variable to NONE to get
81 the same effect as setting this to false))DOC", false);
82 
83  vector<string> emptyvector;
84  addParam(c_SteerBranchNames[0], m_branchNames[0],
85  "Names of event durability branches to be saved. Empty means all branches. Objects with c_DontWriteOut flag added here will also be saved. (EventMetaData is always saved)",
86  emptyvector);
87  addParam(c_SteerBranchNames[1], m_branchNames[1],
88  "Names of persistent durability branches to be saved. Empty means all branches. Objects with c_DontWriteOut flag added here will also be saved. (FileMetaData is always saved)",
89  emptyvector);
90  addParam(c_SteerAdditionalBranchNames[0], m_additionalBranchNames[0],
91  "Add additional event branch names without the need to specify all branchnames.",
92  emptyvector);
93  addParam(c_SteerAdditionalBranchNames[1], m_additionalBranchNames[1],
94  "Add additional persistent branch names without the need to specify all branchnames.",
95  emptyvector);
96  addParam(c_SteerExcludeBranchNames[0], m_excludeBranchNames[0],
97  "Names of event durability branches NOT to be saved. Branches also in branchNames are not saved.", emptyvector);
98  addParam(c_SteerExcludeBranchNames[1], m_excludeBranchNames[1],
99  "Names of persistent durability branches NOT to be saved. Branches also in branchNamesPersistent are not saved.", emptyvector);
100  addParam("autoFlushSize", m_autoflush,
101  "Value for TTree SetAutoFlush(): a positive value tells ROOT to flush all baskets to disk after n entries, a negative value to flush after -n bytes",
102  -10000000);
103  addParam("autoSaveSize", m_autosave,
104  "Value for TTree SetAutoSave(): a positive value tells ROOT to write the TTree metadata after n entries, a negative value to write the metadata after -n bytes",
105  -10000000);
106  addParam("basketSize", m_basketsize, "Basketsize for Branches in the Tree in bytes", 32000);
107  addParam("additionalDataDescription", m_additionalDataDescription, "Additional dictionary of "
108  "name->value pairs to be added to the file metadata to describe the data",
109  m_additionalDataDescription);
110  addParam("buildIndex", m_buildIndex, "Build Event Index for faster finding of events by exp/run/event number", m_buildIndex);
111  addParam("keepParents", m_keepParents, "Keep parents files of input files, input files will not be added as output file's parents",
112  m_keepParents);
113  addParam("outputSplitSize", m_outputSplitSize, R"DOC(
114 If given split the output file once the file has reached the given size in MB.
115 If set the filename will end in ``.f{index:05d}.root``. So if for example
116 ``outputFileName`` is set to "RootOutput.root" then the files will be named
117 ``RootOutput.f00000.root``, ``RootOutput.f00001.root``,
118 ``RootOutput.f00002.root``, ...
119 
120 All created output files are complete and independent files and can
121 subsequently processed completely independent.
122 
123 Note:
124  The output files will be approximately of the size given by
125  ``outputSplitSize`` but they will be slightly larger since
126  additional information has to be written at the end of the file. If necessary
127  please account for this. Also, using ``buildIndex=False`` might be beneficial
128  to reduce the overshoot.
129 
130 Warning:
131  This will set the amount of generated events stored in the file metadata to
132  zero as it is not possible to determine which fraction ends up in which
133  output file.
134 
135 .. versionadded:: release-03-00-00
136 )DOC", m_outputSplitSize);
137 
138  m_outputFileMetaData = new FileMetaData;
139 
140  // Parameters for online storage
141  addParam("runType", m_runType, "Run type", string("null"));
142  addParam("HLTName", m_HLTName, "HLT name", string("HLT00"));
143  addParam("nDisk", m_nDisk, "The number of paratitions", 3);
144  addParam("skipFirstEvent", m_firstEvent, "Boolean to skip the first event or not. "
145  "If the module is used inside the hbasf2, like HLT storage, the first event need to be skipped.", m_firstEvent);
146  addParam("ramdiskBuffer", m_ramdiskBuffer, "Boolean to make small ramdisk buffer setup. "
147  "If this is false, assuming the buffer disks are large SSD.", m_ramdiskBuffer);
148 }
149 
150 
151 StorageRootOutputModule::~StorageRootOutputModule()
152 {
153  delete m_outputFileMetaData;
154 }
155 
156 void StorageRootOutputModule::initialize()
157 {
158  //ROOT has a default maximum size of 100GB for trees??? For larger trees it creates a new file and does other things that finally produce crashes.
159  //Let's set this to 100PB, that should last a bit longer.
160  TTree::SetMaxTreeSize(1000 * 1000 * 100000000000LL);
161 
162  //make sure we have event meta data
163  m_eventMetaData.isRequired();
164 
165  //check outputSplitSize
166  if (m_outputSplitSize) {
167  if (*m_outputSplitSize == 0) B2ERROR("outputSplitSize must be set to a positive value");
168  // Warn is splitsize is >= 1TB ... because this seems weirdly like size was given in bytes
169  if (*m_outputSplitSize >= 1024*1024) B2WARNING("outputSplitSize set to " << *m_outputSplitSize << " MB, please make sure the units are correct");
170  // convert to bytes
171  *m_outputSplitSize *= 1024 * 1024;
172  }
173 
174  getFileNames();
175 
176  // Now check if the file has a protocol like file:// or http:// in front
177  std::regex protocol("^([A-Za-z]*)://");
178  if(std::smatch m; std::regex_search(m_outputFileName, m, protocol)) {
179  if(m[1] == "file") {
180  // file protocol: treat as local and just remove it from the filename
181  m_outputFileName = std::regex_replace(m_outputFileName, protocol, "");
182  } else {
183  // any other protocol: not local, don't create directories
184  m_regularFile = false;
185  }
186  }
187  // For online storage
188  // Do not open file in basf2 initialize
189  // openFile();
190 
191  ConfigFile config("slowcontrol");
192  PostgreSQLInterface *db = new PostgreSQLInterface(config.get("database.host"),
193  config.get("database.dbname"),
194  config.get("database.user"),
195  config.get("database.password"),
196  config.getInt("database.port"));
197  m_db = db;
198 }
199 
200 // For online storage
201 void StorageRootOutputModule::endRun() {
202  if (m_file) {
203  closeFile();
204  while (m_file) closeFile(); // I hope that this will not be failed
205  m_fileIndex = 0;
206  }
207  m_runno++;
208 }
209 
210 void StorageRootOutputModule::openFile()
211 {
212  // Since we open a new file, we also have to reset the number of full events
213  m_nFullEvents = 0;
214  // Continue with opening the file
215  TDirectory* dir = gDirectory;
216  std::filesystem::path out{m_outputFileName};
217  if (m_outputSplitSize) {
218  // Mangle the filename to add the fNNNNN part. However we need to be
219  // careful since the file name could be non-local and have some options or
220  // anchor information attached (like
221  // http://mydomain.org/filename.root?foo=bar#baz). So use "TUrl" *sigh* to
222  // do the parsing and only replace the extension of the file part.
223  TUrl fileUrl(m_outputFileName.c_str(), m_regularFile);
224  std::filesystem::path file{fileUrl.GetFile()};
225  file.replace_extension((boost::format("f%05d.root") % m_fileIndex).str());
226  fileUrl.SetFile(file.c_str());
227  // In case of regular files we don't want the protocol or anything, just the file
228  out = m_regularFile? fileUrl.GetFileAndOptions() : fileUrl.GetUrl();
229  }
230 
231  // For online storage buffer disks
232  std::filesystem::path out_notmp = out;
233  out = std::filesystem::path{std::string("/buffer") + out.generic_string()};
234 
235  m_file = TFile::Open(out.c_str(), "RECREATE", "basf2 Event File");
236  if ((!m_file || m_file->IsZombie()) && m_regularFile) {
237  //try creating necessary directories since this is a local file
238  auto dirpath = out.parent_path();
239 
240  if (std::filesystem::create_directories(dirpath)) {
241  B2INFO("Created missing directory " << dirpath << ".");
242  //try again
243  m_file = TFile::Open(out.c_str(), "RECREATE", "basf2 Event File");
244  }
245 
246  }
247 
248  // Retry file open for online storage, with ramdisk, may not be needed
249  int openCounter = 0;
250  while (!m_file || m_file->IsZombie()) {
251  m_file = TFile::Open(out.c_str(), "RECREATE", "basf2 Event File");
252  B2INFO("process(" << m_processNumber << "), Try open: " << ++openCounter);
253  }
254 
255  if (!m_file || m_file->IsZombie()) {
256  B2FATAL("Couldn't open file " << out << " for writing!");
257  }
258  m_file->SetCompressionAlgorithm(m_compressionAlgorithm);
259  m_file->SetCompressionLevel(m_compressionLevel);
260 
261  for (int durability = 0; durability < DataStore::c_NDurabilityTypes; durability++) {
262  DataStore::StoreEntryMap& map = DataStore::Instance().getStoreEntryMap(DataStore::EDurability(durability));
263  set<string> branchList;
264  for (const auto& pair : map)
265  branchList.insert(pair.first);
266  //skip branches the user doesn't want
267  branchList = filterBranches(branchList, m_branchNames[durability], m_excludeBranchNames[durability], durability);
268 
269  //create the tree and branches
270  m_tree[durability] = new TTree(c_treeNames[durability].c_str(), c_treeNames[durability].c_str());
271  m_tree[durability]->SetAutoFlush(m_autoflush);
272  m_tree[durability]->SetAutoSave(m_autosave);
273  for (auto & iter : map) {
274  const std::string& branchName = iter.first;
275  //skip transient entries (allow overriding via branchNames)
276  if (iter.second.dontWriteOut
277  && find(m_branchNames[durability].begin(), m_branchNames[durability].end(), branchName) == m_branchNames[durability].end()
278  && find(m_additionalBranchNames[durability].begin(), m_additionalBranchNames[durability].end(),
279  branchName) == m_additionalBranchNames[durability].end())
280  continue;
281  //skip branches the user doesn't want
282  if (branchList.count(branchName) == 0) {
283  //make sure FileMetaData and EventMetaData are always included in the output
284  if (((branchName != "FileMetaData") || (durability == DataStore::c_Event)) &&
285  ((branchName != "EventMetaData") || (durability == DataStore::c_Persistent))) {
286  continue;
287  }
288  }
289 
290  // Warn for anything other than FileMetaData and ProcessStatistics ...
291  if(durability == DataStore::c_Persistent and m_outputSplitSize and m_fileIndex==0 and
292  (branchName != "FileMetaData" and branchName != "ProcessStatistics")) {
293  B2WARNING("Persistent branches might not be stored as expected when splitting the output by size" << LogVar("branch", branchName));
294  }
295 
296  TClass* entryClass = iter.second.objClass;
297 
298  //I want to do this in the input module, but I apparently I cannot disable reading those branches.
299  //isabling reading the branch by not calling SetBranchAddress() for it results in the following crashes. Calling SetBranchStatus(..., 0) doesn't help, either.
300  //reported to ROOT devs, let's see if it gets fixed.
301  //
302  //HasDictionary() is a new function in root 6
303  //using it instead of GetClassInfo() avoids having to parse header files (and
304  //the associated memory cost)
305  if (!entryClass->HasDictionary()) {
306  if (m_fileIndex == 0) {
307  B2WARNING("No dictionary found, object will not be saved (This is probably an obsolete class that is still present in the input file.)"
308  << LogVar("class", entryClass->GetName()) << LogVar("branch", branchName));
309  }
310  continue;
311  }
312 
313  if (!hasStreamer(entryClass)) {
314  B2ERROR("The version number in the ClassDef() macro must be at least 1 to enable I/O!" << LogVar("class", entryClass->GetName()));
315  }
316 
317  int splitLevel = m_splitLevel;
318  if (hasCustomStreamer(entryClass)) {
319  B2DEBUG(38, "Class has custom streamer, setting split level -1 for this branch." << LogVar("class", entryClass->GetName()));
320 
321  splitLevel = -1;
322  if (iter.second.isArray) {
323  //for arrays, we also don't want TClonesArray to go around our streamer
324  static_cast<TClonesArray*>(iter.second.object)->BypassStreamer(kFALSE);
325  }
326  }
327  m_tree[durability]->Branch(branchName.c_str(), &iter.second.object, m_basketsize, splitLevel);
328  m_entries[durability].push_back(&iter.second);
329  B2DEBUG(39, "The branch " << branchName << " was created.");
330 
331  //Tell DataStore that we are using this entry
332  if (m_fileIndex == 0) {
333  DataStore::Instance().optionalInput(StoreAccessorBase(branchName, (DataStore::EDurability)durability, entryClass,
334  iter.second.isArray));
335  }
336  }
337  }
338 
339  // set the address of the FileMetaData branch for the output to a separate one from the input
340  TBranch* fileMetaDataBranch = m_tree[DataStore::c_Persistent]->GetBranch("FileMetaData");
341  if (fileMetaDataBranch) {
342  fileMetaDataBranch->SetAddress(&m_outputFileMetaData);
343  } else {
344  m_tree[DataStore::c_Persistent]->Branch("FileMetaData", &m_outputFileMetaData, m_basketsize, m_splitLevel);
345  }
346 
347  dir->cd();
348  if (m_outputSplitSize) {
349  B2INFO(getName() << ": Opened " << (m_fileIndex > 0 ? "new " : "") << "file for writing" << LogVar("filename", out));
350  }
351 
352  // Insert file entry into the DAQ DB while open new file, online storage feature
353  try {
354  m_db->connect();
355  m_db->execute("INSERT INTO datafiles_root "
356  "(name, path, host, disk, runtype, expno, runno, compression_level, compression_algorithm, time_open, "
357  "size, nevents, nfullevents, correct_close, renamed, used_for_merge, removed) "
358  "VALUES ('%s', '%s', '%s', '%s','%s', %d, %d, %d, %d, '%s', 0, 0, 0, FALSE, FALSE, FALSE, FALSE);",
359  out_notmp.filename().c_str(), out_notmp.c_str(), m_HLTName.c_str(), m_disk.c_str(),
360  m_runType.c_str(), m_expno, m_runno,
361  m_compressionLevel, m_compressionAlgorithm,
362  (boost::posix_time::to_iso_extended_string(boost::posix_time::microsec_clock::local_time())+std::string("+09")).c_str());
363  } catch (const DBHandlerException &e) {
364  B2WARNING(e.what());
365  }
366 }
367 
368 
369 void StorageRootOutputModule::event()
370 {
371  // Many safety guards and features for online storage
372  // Skip the first event which is the ZMQ init message
373  if (m_firstEvent) {
374  m_firstEvent = false;
375  return;
376  }
377 
378  // Exp/run number should be increased
379  if ((m_expno > m_eventMetaData->getExperiment()) ||
380  (m_expno == m_eventMetaData->getExperiment() && m_runno > m_eventMetaData->getRun())) {
381  return;
382  }
383 
384  // Close file and set file index to 0 if exp/run number is changed
385  if (m_file) {
386  if (m_expno != m_eventMetaData->getExperiment() || m_runno != m_eventMetaData->getRun()) {
387  closeFile();
388  while (m_file) closeFile(); // I hope that this will not be failed
389  m_fileIndex = 0;
390  }
391  }
392 
393  // Open file with automatic naming
394  if (!m_file) {
395  m_expno = m_eventMetaData->getExperiment();
396  m_runno = m_eventMetaData->getRun();
397  m_processNumber = atoi(getName().substr(0, getName().find(std::string("_"))).c_str());
398  m_disk = StringUtil::form("disk%02d", (m_processNumber%m_nDisk)+1);
399  m_outputFileName = StringUtil::form("/rawdata/%s/belle/Raw/%4.4d/%5.5d/%s.%4.4d.%5.5d.%s.p%02d.root",
400  m_disk.c_str(), m_expno, m_runno, m_runType.c_str(),
401  m_expno, m_runno, m_HLTName.c_str(), m_processNumber);
402  }
403 
404  // if we closed after last event ... make a new one
405  if (!m_file) openFile();
406 
407  if (!m_keepParents) {
408  if (m_fileMetaData) {
409  m_eventMetaData->setParentLfn(m_fileMetaData->getLfn());
410  }
411  }
412 
413  //fill Event data
414  fillTree(DataStore::c_Event);
415 
416  if (m_fileMetaData) {
417  if (m_keepParents) {
418  for (int iparent = 0; iparent < m_fileMetaData->getNParents(); iparent++) {
419  string lfn = m_fileMetaData->getParent(iparent);
420  if (!lfn.empty() && (m_parentLfns.empty() || (m_parentLfns.back() != lfn))) {
421  m_parentLfns.push_back(lfn);
422  }
423  }
424  } else {
425  string lfn = m_fileMetaData->getLfn();
426  if (!lfn.empty() && (m_parentLfns.empty() || (m_parentLfns.back() != lfn))) {
427  m_parentLfns.push_back(lfn);
428  }
429  }
430  }
431 
432  // keep track of file level metadata
433  unsigned long experiment = m_eventMetaData->getExperiment();
434  unsigned long run = m_eventMetaData->getRun();
435  unsigned long event = m_eventMetaData->getEvent();
436  if (m_experimentLow > m_experimentHigh) { //starting condition
437  m_experimentLow = m_experimentHigh = experiment;
438  m_runLow = m_runHigh = run;
439  m_eventLow = m_eventHigh = event;
440  } else {
441  if ((experiment < m_experimentLow) || ((experiment == m_experimentLow) && ((run < m_runLow) || ((run == m_runLow)
442  && (event < m_eventLow))))) {
443  m_experimentLow = experiment;
444  m_runLow = run;
445  m_eventLow = event;
446  }
447  if ((experiment > m_experimentHigh) || ((experiment == m_experimentHigh) && ((run > m_runHigh) || ((run == m_runHigh)
448  && (event > m_eventHigh))))) {
449  m_experimentHigh = experiment;
450  m_runHigh = run;
451  m_eventHigh = event;
452  }
453  }
454 
455  // check if the event is a full event or not: if yes, increase the counter
456  if (m_eventMetaData->getErrorFlag() == 0) // no error flag -> this is a full event
457  m_nFullEvents++;
458 
459  // check if we need to split the file
460  if (m_outputSplitSize and (uint64_t)m_file->GetEND() > *m_outputSplitSize) {
461  // close file and open new one
462  B2INFO(getName() << ": Output size limit reached, closing file ...");
463  closeFile();
464  // Introduce while for online storage (can be removed?)
465  while (m_file) closeFile();
466  }
467 }
468 
469 void StorageRootOutputModule::fillFileMetaData()
470 {
471  bool isMC = (m_fileMetaData) ? m_fileMetaData->isMC() : true;
472 
473  // For online storage, force to declareRealData()
474  // I wonder why the no file meta data is associated with isMC == true
475  isMC = false;
476 
477  new(m_outputFileMetaData) FileMetaData;
478  if (!isMC) m_outputFileMetaData->declareRealData();
479 
480  if (m_tree[DataStore::c_Event]) {
481  //create an index for the event tree
482  TTree* tree = m_tree[DataStore::c_Event];
483  unsigned long numEntries = tree->GetEntries();
484  m_outputFileMetaData->setNFullEvents(m_nFullEvents);
485  if (m_buildIndex && numEntries > 0) {
486  if (numEntries > 10000000) {
487  //10M events correspond to about 240MB for the TTreeIndex object. for more than ~45M entries this causes crashes, broken files :(
488  B2WARNING("Not building TTree index because of large number of events. The index object would conflict with ROOT limits on object size and cause problems.");
489  } else if (tree->GetBranch("EventMetaData")) {
490  tree->SetBranchAddress("EventMetaData", nullptr);
491  RootIOUtilities::buildIndex(tree);
492  }
493  }
494 
495  m_outputFileMetaData->setNEvents(numEntries);
496  if (m_experimentLow > m_experimentHigh) {
497  //starting condition so apparently no events at all
498  m_outputFileMetaData->setLow(-1, -1, 0);
499  m_outputFileMetaData->setHigh(-1, -1, 0);
500  } else {
501  m_outputFileMetaData->setLow(m_experimentLow, m_runLow, m_eventLow);
502  m_outputFileMetaData->setHigh(m_experimentHigh, m_runHigh, m_eventHigh);
503  }
504  }
505 
506  //fill more file level metadata
507  m_outputFileMetaData->setParents(m_parentLfns);
508  RootIOUtilities::setCreationData(*m_outputFileMetaData);
509  m_outputFileMetaData->setRandomSeed(RandomNumbers::getSeed());
510  m_outputFileMetaData->setSteering(Environment::Instance().getSteering());
511  auto mcEvents = Environment::Instance().getNumberOfMCEvents();
512  if(m_outputSplitSize and mcEvents > 0) {
513  if(m_fileIndex == 0) B2WARNING("Number of MC Events cannot be saved when splitting output files by size, setting to 0");
514  mcEvents = 0;
515  }
516  m_outputFileMetaData->setMcEvents(mcEvents);
517  m_outputFileMetaData->setDatabaseGlobalTag(Database::Instance().getGlobalTags());
518  for (const auto& item : m_additionalDataDescription) {
519  m_outputFileMetaData->setDataDescription(item.first, item.second);
520  }
521  // Set the LFN to the filename: if it's a URL to directly, otherwise make sure it's absolute
522  std::string lfn = m_file->GetName();
523  if(m_regularFile) {
524  lfn = std::filesystem::absolute(lfn).string();
525  }
526  // Format LFN if BELLE2_LFN_FORMATSTRING is set
527  std::string format = EnvironmentVariables::get("BELLE2_LFN_FORMATSTRING", "");
528  if (!format.empty()) {
529  auto format_filename = boost::python::import("B2Tools.format").attr("format_filename");
530  lfn = boost::python::extract<std::string>(format_filename(format, m_outputFileName, m_outputFileMetaData->getJsonStr()));
531  }
532  m_outputFileMetaData->setLfn(lfn);
533  //register the file in the catalog
534  if (m_updateFileCatalog) {
535  FileCatalog::Instance().registerFile(m_file->GetName(), *m_outputFileMetaData);
536  }
537 }
538 
539 
540 void StorageRootOutputModule::terminate()
541 {
542  closeFile();
543  // Introduce while for online storage (can be removed?)
544  while (m_file) closeFile();
545 }
546 
547 void StorageRootOutputModule::closeFile()
548 {
549  if(!m_file) return;
550 
551  fillFileMetaData();
552 
553  //fill Persistent data
554  fillTree(DataStore::c_Persistent);
555 
556  //write the trees
557  TDirectory* dir = gDirectory;
558  m_file->cd();
559  for (int durability = 0; durability < DataStore::c_NDurabilityTypes; ++durability) {
560  if (m_tree[durability]) {
561  B2DEBUG(30, "Write TTree " << c_treeNames[durability]);
562  m_tree[durability]->Write(c_treeNames[durability].c_str(), TObject::kWriteDelete);
563  delete m_tree[durability];
564  }
565  m_tree[durability] = nullptr;
566  }
567  dir->cd();
568 
569  const std::string filename = m_file->GetName();
570  if (m_outputSplitSize) {
571  B2INFO(getName() << ": Finished writing file." << LogVar("filename", filename));
572  }
573 
574  // Before deleting m_file, store file list in online storage file list DB table
575  const std::filesystem::path filename_path{filename};
576  std::string filename_notmp = m_file->GetName();
577  filename_notmp.erase(0, 7); // remove "/buffer" in front of full path
578  const std::filesystem::path filename_notmp_path{filename_notmp};
579  try {
580  m_db->connect();
581  m_db->execute("UPDATE datafiles_root SET "
582  "correct_close = TRUE, "
583  "will_merge = %s ,"
584  "nevents = %lu, "
585  "nfullevents = %lu, "
586  "size = %lu, "
587  "time_close = '%s' "
588  "WHERE name = '%s' AND host = '%s';",
589  ((boost::optional<uint64_t>)(m_file->GetSize()*2) < m_outputSplitSize || m_ramdiskBuffer) ? "true" : "false",
590  m_outputFileMetaData->getNEvents(), m_outputFileMetaData->getNFullEvents(), m_file->GetSize(),
591  (boost::posix_time::to_iso_extended_string(boost::posix_time::microsec_clock::local_time())+std::string("+09")).c_str(),
592  filename_path.filename().c_str(), m_HLTName.c_str());
593  } catch (const DBHandlerException &e) {
594  B2WARNING(e.what());
595  }
596  m_db->close();
597 
598  delete m_file;
599  m_file = nullptr;
600 
601  // and now add it to the metadata service as it's fully written
602  MetadataService::Instance().addRootOutputFile(filename, m_outputFileMetaData);
603 
604  // reset some variables
605  for (auto & entry : m_entries) {
606  entry.clear();
607  }
608  m_parentLfns.clear();
609  m_experimentLow = 1;
610  m_experimentHigh = 0;
611  m_runLow = 0;
612  m_runHigh = 0;
613  m_eventLow = 0;
614  m_eventHigh = 0;
615  // and increase index of next file
616  ++m_fileIndex;
617 
618  // Call system(/usr/bin/rsync) for online storage ramdisk buffer cleanup
619  if (m_ramdiskBuffer) {
620  const std::string rsync_cmd = std::string("/usr/bin/rsync -a --remove-source-files --recursive ") + filename + std::string(" ") + filename_notmp_path.parent_path().generic_string() + std::string("/ &");
621  B2INFO(getName() << ": system(" << rsync_cmd << ")");
622  system(rsync_cmd.c_str());
623  }
624 }
625 
626 
627 void StorageRootOutputModule::fillTree(DataStore::EDurability durability)
628 {
629  if (!m_tree[durability]) return;
630 
631  TTree& tree = *m_tree[durability];
632  for(auto* entry: m_entries[durability]) {
633  // Check for entries whose object was not created and mark them as invalid.
634  // We still have to write them in the file due to the structure we have. This could be done better
635  if (!entry->ptr) {
636  entry->object->SetBit(kInvalidObject);
637  }
638  //FIXME: Do we need this? in theory no but it crashes in parallel processing otherwise ¯\_(ツ)_/¯
639  if (entry->name == "FileMetaData") {
640  tree.SetBranchAddress(entry->name.c_str(), &m_outputFileMetaData);
641  } else {
642  tree.SetBranchAddress(entry->name.c_str(), &entry->object);
643  }
644  }
645  tree.Fill();
646  for (auto* entry: m_entries[durability]) {
647  entry->object->ResetBit(kInvalidObject);
648  }
649 
650  const bool writeError = m_file->TestBit(TFile::kWriteError);
651  if (writeError) {
652  //m_file deleted first so we have a chance of closing it (though that will probably fail)
653  const std::string filename = m_file->GetName();
654  delete m_file;
655  B2FATAL("A write error occured while saving '" << filename << "', please check if enough disk space is available.");
656  }
657 }
EDurability
Durability types.
Definition: DataStore.h:58
std::map< std::string, StoreEntry > StoreEntryMap
Map for StoreEntries.
Definition: DataStore.h:87
Metadata information about a file.
Definition: FileMetaData.h:29
void declareRealData()
Declare that this is not generated, but real data.
Definition: FileMetaData.h:294
Base class for Modules.
Definition: Module.h:72
Write objects from DataStore into a ROOT file.
Base class for StoreObjPtr and StoreArray for easier common treatment.
Class to store variables with their name which were sent to the logging service.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
bool hasCustomStreamer(const TClass *cl)
Returns true if and only if 'cl' has a user-defined streamer.
const std::string c_treeNames[]
Names of trees.
std::set< std::string > filterBranches(const std::set< std::string > &branchesToFilter, const std::vector< std::string > &branches, const std::vector< std::string > &excludeBranches, int durability, bool quiet=false)
Given a list of input branches and lists of branches to include/exclude, returns a list of branches t...
bool hasStreamer(const TClass *cl)
Returns true if and only if 'cl' or one of its bases has I/O streamers.
Abstract base class for different kinds of events.