Belle II Software  release-06-02-00
RootOutputModule.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <boost/python.hpp>
10 
11 #include <framework/modules/rootio/RootOutputModule.h>
12 
13 #include <framework/io/RootIOUtilities.h>
14 #include <framework/core/FileCatalog.h>
15 #include <framework/core/MetadataService.h>
16 #include <framework/core/RandomNumbers.h>
17 #include <framework/database/Database.h>
18 // needed for complex module parameter
19 #include <framework/core/ModuleParam.templateDetails.h>
20 #include <framework/utilities/EnvironmentVariables.h>
21 
22 #include <boost/filesystem/path.hpp>
23 #include <boost/filesystem/operations.hpp>
24 #include <boost/format.hpp>
25 #include <boost/algorithm/string.hpp>
26 
27 #include <TClonesArray.h>
28 
29 #include <nlohmann/json.hpp>
30 
31 #include <memory>
32 #include <regex>
33 
34 
35 using namespace std;
36 using namespace Belle2;
37 using namespace RootIOUtilities;
38 
39 //-----------------------------------------------------------------
40 // Register the Module
41 //-----------------------------------------------------------------
42 REG_MODULE(RootOutput)
43 
44 //-----------------------------------------------------------------
45 // Implementation
46 //-----------------------------------------------------------------
47 
48 RootOutputModule::RootOutputModule() : Module(), m_file(nullptr), m_tree{0}, m_experimentLow(1), m_runLow(0),
49  m_eventLow(0), m_experimentHigh(0), m_runHigh(0), m_eventHigh(0)
50 {
51  //Set module properties
52  setDescription("Writes DataStore objects into a .root file. Data is stored in a TTree 'tree' for event-dependent and in 'persistent' for peristent data. You can use RootInput to read the files back into basf2.");
53  setPropertyFlags(c_Output);
54 
55  //Parameter definition
56  addParam("outputFileName" , m_outputFileName, "Name of the output file. Can be overridden using the -o argument to basf2.",
57  string("RootOutput.root"));
58  addParam("ignoreCommandLineOverride" , m_ignoreCommandLineOverride,
59  "Ignore override of file name via command line argument -o. Useful if you have multiple output modules in one path.", false);
60  addParam("compressionLevel", m_compressionLevel,
61  "0 for no, 1 for low, 9 for high compression. Level 1 usually reduces size by >50%, higher levels have no noticeable effect. On typical hard disks, disabling compression reduces write time by 10-20 %, but almost doubles read time, so you probably should leave this turned on.",
62  m_compressionLevel);
63  addParam("compressionAlgorithm", m_compressionAlgorithm,
64  "Set the Compression algorithm. Recommended values are 0 for default, 1 for zlib and 4 for lz4\n\n"
65  ".. versionadded:: release-03-00-00" , m_compressionAlgorithm);
66  addParam("splitLevel", m_splitLevel,
67  "Branch split level: determines up to which depth object members will be saved in separate sub-branches in the tree. For arrays or objects with custom streamers, -1 is used instead to ensure the streamers are used. The default (99) usually gives the highest read performance with RootInput.",
68  99);
69  addParam("updateFileCatalog", m_updateFileCatalog, R"DOC(
70 Flag that specifies whether the file metadata catalog is updated or created.
71 This is only necessary in special cases and can always be done afterwards using
72 ``b2file-catalog-add filename.root``"
73 
74 (You can also set the ``BELLE2_FILECATALOG`` environment variable to NONE to get
75 the same effect as setting this to false))DOC", false);
76 
77  vector<string> emptyvector;
78  addParam(c_SteerBranchNames[0], m_branchNames[0],
79  "Names of event durability branches to be saved. Empty means all branches. Objects with c_DontWriteOut flag added here will also be saved. (EventMetaData is always saved)",
80  emptyvector);
81  addParam(c_SteerBranchNames[1], m_branchNames[1],
82  "Names of persistent durability branches to be saved. Empty means all branches. Objects with c_DontWriteOut flag added here will also be saved. (FileMetaData is always saved)",
83  emptyvector);
84  addParam(c_SteerAdditionalBranchNames[0], m_additionalBranchNames[0],
85  "Add additional event branch names without the need to specify all branchnames.",
86  emptyvector);
87  addParam(c_SteerAdditionalBranchNames[1], m_additionalBranchNames[1],
88  "Add additional persistent branch names without the need to specify all branchnames.",
89  emptyvector);
90  addParam(c_SteerExcludeBranchNames[0], m_excludeBranchNames[0],
91  "Names of event durability branches NOT to be saved. Branches also in branchNames are not saved.", emptyvector);
92  addParam(c_SteerExcludeBranchNames[1], m_excludeBranchNames[1],
93  "Names of persistent durability branches NOT to be saved. Branches also in branchNamesPersistent are not saved.", emptyvector);
94  addParam("autoFlushSize", m_autoflush,
95  "Value for TTree SetAutoFlush(): a positive value tells ROOT to flush all baskets to disk after n entries, a negative value to flush after -n bytes",
96  -10000000);
97  addParam("autoSaveSize", m_autosave,
98  "Value for TTree SetAutoSave(): a positive value tells ROOT to write the TTree metadata after n entries, a negative value to write the metadata after -n bytes",
99  -10000000);
100  addParam("basketSize", m_basketsize, "Basketsize for Branches in the Tree in bytes", 32000);
101  addParam("additionalDataDescription", m_additionalDataDescription, "Additional dictionary of "
102  "name->value pairs to be added to the file metadata to describe the data",
103  m_additionalDataDescription);
104  addParam("buildIndex", m_buildIndex, "Build Event Index for faster finding of events by exp/run/event number", m_buildIndex);
105  addParam("keepParents", m_keepParents, "Keep parents files of input files, input files will not be added as output file's parents",
106  m_keepParents);
107  addParam("outputSplitSize", m_outputSplitSize, R"DOC(
108 If given split the output file once the file has reached the given size in MB.
109 If set the filename will end in ``.f{index:05d}.root``. So if for example
110 ``outputFileName`` is set to "RootOutput.root" then the files will be named
111 ``RootOutput.f00000.root``, ``RootOutput.f00001.root``,
112 ``RootOutput.f00002.root``, ...
113 
114 All created output files are complete and independent files and can
115 subsequently processed completely independent.
116 
117 Note:
118  The output files will be approximately of the size given by
119  ``outputSplitSize`` but they will be slightly larger since
120  additional information has to be written at the end of the file. If necessary
121  please account for this. Also, using ``buildIndex=False`` might be beneficial
122  to reduce the overshoot.
123 
124 Warning:
125  This will set the amount of generated events stored in the file metadata to
126  zero as it is not possible to determine which fraction ends up in which
127  output file.
128 
129 .. versionadded:: release-03-00-00
130 )DOC", m_outputSplitSize);
131 }
132 
133 
134 RootOutputModule::~RootOutputModule() = default;
135 
136 void RootOutputModule::initialize()
137 {
138  //ROOT has a default maximum size of 100GB for trees??? For larger trees it creates a new file and does other things that finally produce crashes.
139  //Let's set this to 100PB, that should last a bit longer.
140  TTree::SetMaxTreeSize(1000 * 1000 * 100000000000LL);
141 
142  //create a file level metadata object in the data store
143  m_fileMetaData.registerInDataStore();
144  //and make sure we have event meta data
145  m_eventMetaData.isRequired();
146 
147  //check outputSplitSize
148  if (m_outputSplitSize) {
149  if (*m_outputSplitSize == 0) B2ERROR("outputSplitSize must be set to a positive value");
150  // Warn is splitsize is >= 1TB ... because this seems weirdly like size was given in bytes
151  if (*m_outputSplitSize >= 1024*1024) B2WARNING("outputSplitSize set to " << *m_outputSplitSize << " MB, please make sure the units are correct");
152  // convert to bytes
153  *m_outputSplitSize *= 1024 * 1024;
154  }
155 
156  getFileNames();
157 
158  // Now check if the file has a protocol like file:// or http:// in front
159  std::regex protocol("^([A-Za-z]*)://");
160  if(std::smatch m; std::regex_search(m_outputFileName, m, protocol)) {
161  if(m[1] == "file") {
162  // file protocol: treat as local and just remove it from the filename
163  m_outputFileName = std::regex_replace(m_outputFileName, protocol, "");
164  } else {
165  // any other protocol: not local, don't create directories
166  m_regularFile = false;
167  }
168  }
169  openFile();
170 }
171 
172 void RootOutputModule::openFile()
173 {
174  TDirectory* dir = gDirectory;
175  boost::filesystem::path out{m_outputFileName};
176  if (m_outputSplitSize) {
177  // Mangle the filename to add the fNNNNN part. However we need to be
178  // careful since the file name could be non-local and have some options or
179  // anchor information attached (like
180  // http://mydomain.org/filename.root?foo=bar#baz). So use "TUrl" *sigh* to
181  // do the parsing and only replace the extension of the file part.
182  TUrl fileUrl(m_outputFileName.c_str(), m_regularFile);
183  boost::filesystem::path file{fileUrl.GetFile()};
184  file.replace_extension((boost::format("f%05d.root") % m_fileIndex).str());
185  fileUrl.SetFile(file.c_str());
186  // In case of regular files we don't want the protocol or anything, just the file
187  out = m_regularFile? fileUrl.GetFileAndOptions() : fileUrl.GetUrl();
188  }
189  m_file = TFile::Open(out.c_str(), "RECREATE", "basf2 Event File");
190  if ((!m_file || m_file->IsZombie()) && m_regularFile) {
191  //try creating necessary directories since this is a local file
192  auto dirpath = out.parent_path();
193 
194  if (boost::filesystem::create_directories(dirpath)) {
195  B2INFO("Created missing directory " << dirpath << ".");
196  //try again
197  m_file = TFile::Open(out.c_str(), "RECREATE", "basf2 Event File");
198  }
199 
200  }
201  if (!m_file || m_file->IsZombie()) {
202  B2FATAL("Couldn't open file " << out << " for writing!");
203  }
204  m_file->SetCompressionAlgorithm(m_compressionAlgorithm);
205  m_file->SetCompressionLevel(m_compressionLevel);
206 
207  for (int durability = 0; durability < DataStore::c_NDurabilityTypes; durability++) {
208  DataStore::StoreEntryMap& map = DataStore::Instance().getStoreEntryMap(DataStore::EDurability(durability));
209  set<string> branchList;
210  for (const auto& pair : map)
211  branchList.insert(pair.first);
212  //skip branches the user doesn't want
213  branchList = filterBranches(branchList, m_branchNames[durability], m_excludeBranchNames[durability], durability);
214 
215  //create the tree and branches
216  m_tree[durability] = new TTree(c_treeNames[durability].c_str(), c_treeNames[durability].c_str());
217  m_tree[durability]->SetAutoFlush(m_autoflush);
218  m_tree[durability]->SetAutoSave(m_autosave);
219  for (auto & iter : map) {
220  const std::string& branchName = iter.first;
221  //skip transient entries (allow overriding via branchNames)
222  if (iter.second.dontWriteOut
223  && find(m_branchNames[durability].begin(), m_branchNames[durability].end(), branchName) == m_branchNames[durability].end()
224  && find(m_additionalBranchNames[durability].begin(), m_additionalBranchNames[durability].end(),
225  branchName) == m_additionalBranchNames[durability].end())
226  continue;
227  //skip branches the user doesn't want
228  if (branchList.count(branchName) == 0) {
229  //make sure FileMetaData and EventMetaData are always included in the output
230  if (((branchName != "FileMetaData") || (durability == DataStore::c_Event)) &&
231  ((branchName != "EventMetaData") || (durability == DataStore::c_Persistent))) {
232  continue;
233  }
234  }
235 
236  // Warn for anything other than FileMetaData and ProcessStatistics ...
237  if(durability == DataStore::c_Persistent and m_outputSplitSize and m_fileIndex==0 and
238  (branchName != "FileMetaData" and branchName != "ProcessStatistics")) {
239  B2WARNING("Persistent branches might not be stored as expected when splitting the output by size" << LogVar("branch", branchName));
240  }
241 
242  TClass* entryClass = iter.second.objClass;
243 
244  //I want to do this in the input module, but I apparently I cannot disable reading those branches.
245  //isabling reading the branch by not calling SetBranchAddress() for it results in the following crashes. Calling SetBranchStatus(..., 0) doesn't help, either.
246  //reported to ROOT devs, let's see if it gets fixed.
247  //
248  //HasDictionary() is a new function in root 6
249  //using it instead of GetClassInfo() avoids having to parse header files (and
250  //the associated memory cost)
251  if (!entryClass->HasDictionary()) {
252  if (m_fileIndex == 0) {
253  B2WARNING("No dictionary found, object will not be saved (This is probably an obsolete class that is still present in the input file.)"
254  << LogVar("class", entryClass->GetName()) << LogVar("branch", branchName));
255  }
256  continue;
257  }
258 
259  if (!hasStreamer(entryClass)) {
260  B2ERROR("The version number in the ClassDef() macro must be at least 1 to enable I/O!" << LogVar("class", entryClass->GetName()));
261  }
262 
263  int splitLevel = m_splitLevel;
264  if (hasCustomStreamer(entryClass)) {
265  B2DEBUG(38, "Class has custom streamer, setting split level -1 for this branch." << LogVar("class", entryClass->GetName()));
266 
267  splitLevel = -1;
268  if (iter.second.isArray) {
269  //for arrays, we also don't want TClonesArray to go around our streamer
270  static_cast<TClonesArray*>(iter.second.object)->BypassStreamer(kFALSE);
271  }
272  }
273  m_tree[durability]->Branch(branchName.c_str(), &iter.second.object, m_basketsize, splitLevel);
274  m_entries[durability].push_back(&iter.second);
275  B2DEBUG(39, "The branch " << branchName << " was created.");
276 
277  //Tell DataStore that we are using this entry
278  if (m_fileIndex == 0) {
279  DataStore::Instance().optionalInput(StoreAccessorBase(branchName, (DataStore::EDurability)durability, entryClass,
280  iter.second.isArray));
281  }
282  }
283  }
284 
285  dir->cd();
286  if (m_outputSplitSize) {
287  B2INFO(getName() << ": Opened " << (m_fileIndex > 0 ? "new " : "") << "file for writing" << LogVar("filename", out));
288  }
289 }
290 
291 
292 void RootOutputModule::event()
293 {
294  // if we closed after last event ... make a new one
295  if (!m_file) openFile();
296 
297  if (!m_keepParents) {
298  if (m_fileMetaData) {
299  m_eventMetaData->setParentLfn(m_fileMetaData->getLfn());
300  }
301  }
302 
303  //fill Event data
304  fillTree(DataStore::c_Event);
305 
306  if (m_fileMetaData) {
307  if (m_keepParents) {
308  for (int iparent = 0; iparent < m_fileMetaData->getNParents(); iparent++) {
309  string lfn = m_fileMetaData->getParent(iparent);
310  if (!lfn.empty() && (m_parentLfns.empty() || (m_parentLfns.back() != lfn))) {
311  m_parentLfns.push_back(lfn);
312  }
313  }
314  } else {
315  string lfn = m_fileMetaData->getLfn();
316  if (!lfn.empty() && (m_parentLfns.empty() || (m_parentLfns.back() != lfn))) {
317  m_parentLfns.push_back(lfn);
318  }
319  }
320  }
321 
322  // keep track of file level metadata
323  unsigned long experiment = m_eventMetaData->getExperiment();
324  unsigned long run = m_eventMetaData->getRun();
325  unsigned long event = m_eventMetaData->getEvent();
326  if (m_experimentLow > m_experimentHigh) { //starting condition
327  m_experimentLow = m_experimentHigh = experiment;
328  m_runLow = m_runHigh = run;
329  m_eventLow = m_eventHigh = event;
330  } else {
331  if ((experiment < m_experimentLow) || ((experiment == m_experimentLow) && ((run < m_runLow) || ((run == m_runLow)
332  && (event < m_eventLow))))) {
333  m_experimentLow = experiment;
334  m_runLow = run;
335  m_eventLow = event;
336  }
337  if ((experiment > m_experimentHigh) || ((experiment == m_experimentHigh) && ((run > m_runHigh) || ((run == m_runHigh)
338  && (event > m_eventHigh))))) {
339  m_experimentHigh = experiment;
340  m_runHigh = run;
341  m_eventHigh = event;
342  }
343  }
344 
345  // check if we need to split the file
346  if (m_outputSplitSize and (uint64_t)m_file->GetEND() > *m_outputSplitSize) {
347  // close file and open new one
348  B2INFO(getName() << ": Output size limit reached, closing file ...");
349  closeFile();
350  }
351 }
352 
353 void RootOutputModule::fillFileMetaData()
354 {
355  bool isMC = (m_fileMetaData) ? m_fileMetaData->isMC() : true;
356  m_fileMetaData.create(true);
357  if (!isMC) m_fileMetaData->declareRealData();
358 
359  if (m_tree[DataStore::c_Event]) {
360  //create an index for the event tree
361  TTree* tree = m_tree[DataStore::c_Event];
362  unsigned long numEntries = tree->GetEntries();
363  if (m_buildIndex && numEntries > 0) {
364  if (numEntries > 10000000) {
365  //10M events correspond to about 240MB for the TTreeIndex object. for more than ~45M entries this causes crashes, broken files :(
366  B2WARNING("Not building TTree index because of large number of events. The index object would conflict with ROOT limits on object size and cause problems.");
367  } else if (tree->GetBranch("EventMetaData")) {
368  tree->SetBranchAddress("EventMetaData", nullptr);
369  RootIOUtilities::buildIndex(tree);
370  }
371  }
372 
373  m_fileMetaData->setNEvents(numEntries);
374  if (m_experimentLow > m_experimentHigh) {
375  //starting condition so apparently no events at all
376  m_fileMetaData->setLow(-1, -1, 0);
377  m_fileMetaData->setHigh(-1, -1, 0);
378  } else {
379  m_fileMetaData->setLow(m_experimentLow, m_runLow, m_eventLow);
380  m_fileMetaData->setHigh(m_experimentHigh, m_runHigh, m_eventHigh);
381  }
382  }
383 
384  //fill more file level metadata
385  m_fileMetaData->setParents(m_parentLfns);
386  RootIOUtilities::setCreationData(*m_fileMetaData);
387  m_fileMetaData->setRandomSeed(RandomNumbers::getSeed());
388  m_fileMetaData->setSteering(Environment::Instance().getSteering());
389  auto mcEvents = Environment::Instance().getNumberOfMCEvents();
390  if(m_outputSplitSize and mcEvents > 0) {
391  if(m_fileIndex == 0) B2WARNING("Number of MC Events cannot be saved when splitting output files by size, setting to 0");
392  mcEvents = 0;
393  }
394  m_fileMetaData->setMcEvents(mcEvents);
395  m_fileMetaData->setDatabaseGlobalTag(Database::Instance().getGlobalTags());
396  for (const auto& item : m_additionalDataDescription) {
397  m_fileMetaData->setDataDescription(item.first, item.second);
398  }
399  // Set the LFN to the filename: if it's a URL to directly, otherwise make sure it's absolute
400  std::string lfn = m_file->GetName();
401  if(m_regularFile) {
402  lfn = boost::filesystem::absolute(lfn, boost::filesystem::initial_path()).string();
403  }
404  // Format LFN if BELLE2_LFN_FORMATSTRING is set
405  std::string format = EnvironmentVariables::get("BELLE2_LFN_FORMATSTRING", "");
406  if (!format.empty()) {
407  auto format_filename = boost::python::import("B2Tools.format").attr("format_filename");
408  lfn = boost::python::extract<std::string>(format_filename(format, m_outputFileName, m_fileMetaData->getJsonStr()));
409  }
410  m_fileMetaData->setLfn(lfn);
411  //register the file in the catalog
412  if (m_updateFileCatalog) {
413  FileCatalog::Instance().registerFile(m_file->GetName(), *m_fileMetaData);
414  }
415  m_outputFileMetaData = *m_fileMetaData;
416 }
417 
418 
419 void RootOutputModule::terminate()
420 {
421  closeFile();
422 }
423 
424 void RootOutputModule::closeFile()
425 {
426  if(!m_file) return;
427  //get pointer to file level metadata
428  std::unique_ptr<FileMetaData> old;
429  if (m_fileMetaData) old = std::make_unique<FileMetaData>(*m_fileMetaData);
430 
431  fillFileMetaData();
432 
433  //fill Persistent data
434  fillTree(DataStore::c_Persistent);
435 
436  // restore old file meta data if it existed
437  if (old) *m_fileMetaData = *old;
438  old.reset();
439 
440  //write the trees
441  TDirectory* dir = gDirectory;
442  m_file->cd();
443  for (int durability = 0; durability < DataStore::c_NDurabilityTypes; ++durability) {
444  if (m_tree[durability]) {
445  B2DEBUG(30, "Write TTree " << c_treeNames[durability]);
446  m_tree[durability]->Write(c_treeNames[durability].c_str(), TObject::kWriteDelete);
447  delete m_tree[durability];
448  }
449  m_tree[durability] = nullptr;
450  }
451  dir->cd();
452 
453  const std::string filename = m_file->GetName();
454  if (m_outputSplitSize) {
455  B2INFO(getName() << ": Finished writing file." << LogVar("filename", filename));
456  }
457  delete m_file;
458  m_file = nullptr;
459 
460  // and now add it to the metadata service as it's fully written
461  MetadataService::Instance().addRootOutputFile(filename, &m_outputFileMetaData);
462 
463  // reset some variables
464  for (auto & entry : m_entries) {
465  entry.clear();
466  }
467  m_parentLfns.clear();
468  m_experimentLow = 1;
469  m_experimentHigh = 0;
470  m_runLow = 0;
471  m_runHigh = 0;
472  m_eventLow = 0;
473  m_eventHigh = 0;
474  // and increase index of next file
475  ++m_fileIndex;
476 }
477 
478 
479 void RootOutputModule::fillTree(DataStore::EDurability durability)
480 {
481  if (!m_tree[durability]) return;
482 
483  TTree& tree = *m_tree[durability];
484  for(auto* entry: m_entries[durability]) {
485  // Check for entries whose object was not created and mark them as invalid.
486  // We still have to write them in the file due to the structure we have. This could be done better
487  if (!entry->ptr) {
488  entry->object->SetBit(kInvalidObject);
489  }
490  //FIXME: Do we need this? in theory no but it crashes in parallel processing otherwise ¯\_(ツ)_/¯
491  tree.SetBranchAddress(entry->name.c_str(), &entry->object);
492  }
493  tree.Fill();
494  for (auto* entry: m_entries[durability]) {
495  entry->object->ResetBit(kInvalidObject);
496  }
497 
498  const bool writeError = m_file->TestBit(TFile::kWriteError);
499  if (writeError) {
500  //m_file deleted first so we have a chance of closing it (though that will probably fail)
501  const std::string filename = m_file->GetName();
502  delete m_file;
503  B2FATAL("A write error occured while saving '" << filename << "', please check if enough disk space is available.");
504  }
505 }
EDurability
Durability types.
Definition: DataStore.h:58
std::map< std::string, StoreEntry > StoreEntryMap
Map for StoreEntries.
Definition: DataStore.h:87
Base class for Modules.
Definition: Module.h:72
Write objects from DataStore into a ROOT file.
Base class for StoreObjPtr and StoreArray for easier common treatment.
Class to store variables with their name which were sent to the logging service.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
bool hasCustomStreamer(const TClass *cl)
Returns true if and only if 'cl' has a user-defined streamer.
const std::string c_treeNames[]
Names of trees.
std::set< std::string > filterBranches(const std::set< std::string > &branchesToFilter, const std::vector< std::string > &branches, const std::vector< std::string > &excludeBranches, int durability, bool quiet=false)
Given a list of input branches and lists of branches to include/exclude, returns a list of branches t...
bool hasStreamer(const TClass *cl)
Returns true if and only if 'cl' or one of its bases has I/O streamers.
Abstract base class for different kinds of events.