Belle II Software development
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see *
7 **************************************************************************/
9#include <boost/python.hpp>
11#include <daq/storage/modules/StorageRootOutputModule.h>
13#include <framework/io/RootIOUtilities.h>
14#include <framework/core/FileCatalog.h>
15#include <framework/core/MetadataService.h>
16#include <framework/core/RandomNumbers.h>
17#include <framework/database/Database.h>
18// needed for complex module parameter
19#include <framework/core/ModuleParam.templateDetails.h>
20#include <framework/utilities/EnvironmentVariables.h>
22#include <boost/format.hpp>
23#include <boost/algorithm/string.hpp>
25#include <TClonesArray.h>
27#include <nlohmann/json.hpp>
29#include <memory>
30#include <regex>
31#include <filesystem>
33// For online storage
34#include <boost/date_time/posix_time/posix_time.hpp>
35#include <daq/slc/base/ConfigFile.h>
36#include <daq/slc/psql/PostgreSQLInterface.h>
37#include <daq/slc/database/DBHandlerException.h>
38#include <daq/slc/base/StringUtil.h>
41using namespace std;
42using namespace Belle2;
43using namespace RootIOUtilities;
46// Register the Module
51// Implementation
54StorageRootOutputModule::StorageRootOutputModule() : Module(), m_file(nullptr), m_tree{0}, m_experimentLow(1), m_runLow(0),
55 m_eventLow(0), m_experimentHigh(0), m_runHigh(0), m_eventHigh(0)
57 //Set module properties
58 setDescription("Writes DataStore objects into a .root file. Data is stored in a TTree 'tree' for event-dependent and in 'persistent' for peristent data. You can use RootInput to read the files back into basf2.");
59 setPropertyFlags(c_Output);
61 //Parameter definition
62 addParam("outputFileName", m_outputFileName, "Name of the output file. Can be overridden using the -o argument to basf2.",
63 string("RootOutput.root"));
64 addParam("ignoreCommandLineOverride", m_ignoreCommandLineOverride,
65 "Ignore override of file name via command line argument -o. Useful if you have multiple output modules in one path.", false);
66 addParam("compressionLevel", m_compressionLevel,
67 "0 for no, 1 for low, 9 for high compression. Level 1 usually reduces size by >50%, higher levels have no noticeable effect. On typical hard disks, disabling compression reduces write time by 10-20 %, but almost doubles read time, so you probably should leave this turned on.",
68 m_compressionLevel);
69 addParam("compressionAlgorithm", m_compressionAlgorithm,
70 "Set the Compression algorithm. Recommended values are 0 for default, 1 for zlib and 4 for lz4\n\n"
71 ".. versionadded:: release-03-00-00", m_compressionAlgorithm);
72 addParam("splitLevel", m_splitLevel,
73 "Branch split level: determines up to which depth object members will be saved in separate sub-branches in the tree. For arrays or objects with custom streamers, -1 is used instead to ensure the streamers are used. The default (99) usually gives the highest read performance with RootInput.",
74 99);
75 addParam("updateFileCatalog", m_updateFileCatalog, R"DOC(
76Flag that specifies whether the file metadata catalog is updated or created.
77This is only necessary in special cases and can always be done afterwards using
78``b2file-catalog-add filename.root``"
80(You can also set the ``BELLE2_FILECATALOG`` environment variable to NONE to get
81the same effect as setting this to false))DOC", false);
83 vector<string> emptyvector;
84 addParam(c_SteerBranchNames[0], m_branchNames[0],
85 "Names of event durability branches to be saved. Empty means all branches. Objects with c_DontWriteOut flag added here will also be saved. (EventMetaData is always saved)",
86 emptyvector);
87 addParam(c_SteerBranchNames[1], m_branchNames[1],
88 "Names of persistent durability branches to be saved. Empty means all branches. Objects with c_DontWriteOut flag added here will also be saved. (FileMetaData is always saved)",
89 emptyvector);
90 addParam(c_SteerAdditionalBranchNames[0], m_additionalBranchNames[0],
91 "Add additional event branch names without the need to specify all branchnames.",
92 emptyvector);
93 addParam(c_SteerAdditionalBranchNames[1], m_additionalBranchNames[1],
94 "Add additional persistent branch names without the need to specify all branchnames.",
95 emptyvector);
96 addParam(c_SteerExcludeBranchNames[0], m_excludeBranchNames[0],
97 "Names of event durability branches NOT to be saved. Branches also in branchNames are not saved.", emptyvector);
98 addParam(c_SteerExcludeBranchNames[1], m_excludeBranchNames[1],
99 "Names of persistent durability branches NOT to be saved. Branches also in branchNamesPersistent are not saved.", emptyvector);
100 addParam("autoFlushSize", m_autoflush,
101 "Value for TTree SetAutoFlush(): a positive value tells ROOT to flush all baskets to disk after n entries, a negative value to flush after -n bytes",
102 -10000000);
103 addParam("autoSaveSize", m_autosave,
104 "Value for TTree SetAutoSave(): a positive value tells ROOT to write the TTree metadata after n entries, a negative value to write the metadata after -n bytes",
105 -10000000);
106 addParam("basketSize", m_basketsize, "Basketsize for Branches in the Tree in bytes", 32000);
107 addParam("additionalDataDescription", m_additionalDataDescription, "Additional dictionary of "
108 "name->value pairs to be added to the file metadata to describe the data",
109 m_additionalDataDescription);
110 addParam("buildIndex", m_buildIndex, "Build Event Index for faster finding of events by exp/run/event number", m_buildIndex);
111 addParam("keepParents", m_keepParents, "Keep parents files of input files, input files will not be added as output file's parents",
112 m_keepParents);
113 addParam("outputSplitSize", m_outputSplitSize, R"DOC(
114If given split the output file once the file has reached the given size in MB.
115If set the filename will end in ``.f{index:05d}.root``. So if for example
116``outputFileName`` is set to "RootOutput.root" then the files will be named
117``RootOutput.f00000.root``, ``RootOutput.f00001.root``,
118``RootOutput.f00002.root``, ...
120All created output files are complete and independent files and can
121subsequently processed completely independent.
124 The output files will be approximately of the size given by
125 ``outputSplitSize`` but they will be slightly larger since
126 additional information has to be written at the end of the file. If necessary
127 please account for this. Also, using ``buildIndex=False`` might be beneficial
128 to reduce the overshoot.
131 This will set the amount of generated events stored in the file metadata to
132 zero as it is not possible to determine which fraction ends up in which
133 output file.
135.. versionadded:: release-03-00-00
136)DOC", m_outputSplitSize);
138 m_outputFileMetaData = new FileMetaData;
140 // Parameters for online storage
141 addParam("runType", m_runType, "Run type", string("null"));
142 addParam("HLTName", m_HLTName, "HLT name", string("HLT00"));
143 addParam("nDisk", m_nDisk, "The number of paratitions", 3);
144 addParam("skipFirstEvent", m_firstEvent, "Boolean to skip the first event or not. "
145 "If the module is used inside the hbasf2, like HLT storage, the first event need to be skipped.", m_firstEvent);
146 addParam("ramdiskBuffer", m_ramdiskBuffer, "Boolean to make small ramdisk buffer setup. "
147 "If this is false, assuming the buffer disks are large SSD.", m_ramdiskBuffer);
155 //ROOT has a default maximum size of 100GB for trees??? For larger trees it creates a new file and does other things that finally produce crashes.
156 //Let's set this to 100PB, that should last a bit longer.
157 TTree::SetMaxTreeSize(1000 * 1000 * 100000000000LL);
159 //make sure we have event meta data
160 m_eventMetaData.isRequired();
162 //check outputSplitSize
163 if (m_outputSplitSize) {
164 if (*m_outputSplitSize == 0) B2ERROR("outputSplitSize must be set to a positive value");
165 // Warn is splitsize is >= 1TB ... because this seems weirdly like size was given in bytes
166 if (*m_outputSplitSize >= 1024*1024) B2WARNING("outputSplitSize set to " << *m_outputSplitSize << " MB, please make sure the units are correct");
167 // convert to bytes
168 *m_outputSplitSize *= 1024 * 1024;
169 }
171 getFileNames();
173 // Now check if the file has a protocol like file:// or http:// in front
174 std::regex protocol("^([A-Za-z]*)://");
175 if(std::smatch m; std::regex_search(m_outputFileName, m, protocol)) {
176 if(m[1] == "file") {
177 // file protocol: treat as local and just remove it from the filename
178 m_outputFileName = std::regex_replace(m_outputFileName, protocol, "");
179 } else {
180 // any other protocol: not local, don't create directories
181 m_regularFile = false;
182 }
183 }
184 // For online storage
185 // Do not open file in basf2 initialize
186 // openFile();
188 ConfigFile config("slowcontrol");
189 PostgreSQLInterface *db = new PostgreSQLInterface(config.get(""),
190 config.get("database.dbname"),
191 config.get("database.user"),
192 config.get("database.password"),
193 config.getInt("database.port"));
194 m_db = db;
197// For online storage
199 if (m_file) {
200 closeFile();
201 while (m_file) closeFile(); // I hope that this will not be failed
202 m_fileIndex = 0;
203 }
208 // Since we open a new file, we also have to reset the number of full events
209 m_nFullEvents = 0;
210 // Continue with opening the file
211 TDirectory* dir = gDirectory;
212 std::filesystem::path out{m_outputFileName};
213 if (m_outputSplitSize) {
214 // Mangle the filename to add the fNNNNN part. However we need to be
215 // careful since the file name could be non-local and have some options or
216 // anchor information attached (like
217 // So use "TUrl" *sigh* to
218 // do the parsing and only replace the extension of the file part.
219 TUrl fileUrl(m_outputFileName.c_str(), m_regularFile);
220 std::filesystem::path file{fileUrl.GetFile()};
221 file.replace_extension((boost::format("f%05d.root") % m_fileIndex).str());
222 fileUrl.SetFile(file.c_str());
223 // In case of regular files we don't want the protocol or anything, just the file
224 out = m_regularFile? fileUrl.GetFileAndOptions() : fileUrl.GetUrl();
225 }
227 // For online storage buffer disks
228 std::filesystem::path out_notmp = out;
229 out = std::filesystem::path{std::string("/buffer") + out.generic_string()};
231 m_file = TFile::Open(out.c_str(), "RECREATE", "basf2 Event File");
232 if ((!m_file || m_file->IsZombie()) && m_regularFile) {
233 //try creating necessary directories since this is a local file
234 auto dirpath = out.parent_path();
236 if (std::filesystem::create_directories(dirpath)) {
237 B2INFO("Created missing directory " << dirpath << ".");
238 //try again
239 m_file = TFile::Open(out.c_str(), "RECREATE", "basf2 Event File");
240 }
242 }
244 // Retry file open for online storage, with ramdisk, may not be needed
245 int openCounter = 0;
246 while (!m_file || m_file->IsZombie()) {
247 m_file = TFile::Open(out.c_str(), "RECREATE", "basf2 Event File");
248 B2INFO("process(" << m_processNumber << "), Try open: " << ++openCounter);
249 }
251 if (!m_file || m_file->IsZombie()) {
252 B2FATAL("Couldn't open file " << out << " for writing!");
253 }
254 m_file->SetCompressionAlgorithm(m_compressionAlgorithm);
255 m_file->SetCompressionLevel(m_compressionLevel);
257 for (int durability = 0; durability < DataStore::c_NDurabilityTypes; durability++) {
259 set<string> branchList;
260 for (const auto& pair : map)
261 branchList.insert(pair.first);
262 //skip branches the user doesn't want
263 branchList = filterBranches(branchList, m_branchNames[durability], m_excludeBranchNames[durability], durability);
265 //create the tree and branches
266 m_tree[durability] = new TTree(c_treeNames[durability].c_str(), c_treeNames[durability].c_str());
267 m_tree[durability]->SetAutoFlush(m_autoflush);
268 m_tree[durability]->SetAutoSave(m_autosave);
269 for (auto & iter : map) {
270 const std::string& branchName = iter.first;
271 //skip transient entries (allow overriding via branchNames)
272 if (iter.second.dontWriteOut
273 && find(m_branchNames[durability].begin(), m_branchNames[durability].end(), branchName) == m_branchNames[durability].end()
274 && find(m_additionalBranchNames[durability].begin(), m_additionalBranchNames[durability].end(),
275 branchName) == m_additionalBranchNames[durability].end())
276 continue;
277 //skip branches the user doesn't want
278 if (branchList.count(branchName) == 0) {
279 //make sure FileMetaData and EventMetaData are always included in the output
280 if (((branchName != "FileMetaData") || (durability == DataStore::c_Event)) &&
281 ((branchName != "EventMetaData") || (durability == DataStore::c_Persistent))) {
282 continue;
283 }
284 }
286 // Warn for anything other than FileMetaData and ProcessStatistics ...
287 if(durability == DataStore::c_Persistent and m_outputSplitSize and m_fileIndex==0 and
288 (branchName != "FileMetaData" and branchName != "ProcessStatistics")) {
289 B2WARNING("Persistent branches might not be stored as expected when splitting the output by size" << LogVar("branch", branchName));
290 }
292 TClass* entryClass = iter.second.objClass;
294 //I want to do this in the input module, but I apparently I cannot disable reading those branches.
295 //isabling reading the branch by not calling SetBranchAddress() for it results in the following crashes. Calling SetBranchStatus(..., 0) doesn't help, either.
296 //reported to ROOT devs, let's see if it gets fixed.
297 //
298 //HasDictionary() is a new function in root 6
299 //using it instead of GetClassInfo() avoids having to parse header files (and
300 //the associated memory cost)
301 if (!entryClass->HasDictionary()) {
302 if (m_fileIndex == 0) {
303 B2WARNING("No dictionary found, object will not be saved (This is probably an obsolete class that is still present in the input file.)"
304 << LogVar("class", entryClass->GetName()) << LogVar("branch", branchName));
305 }
306 continue;
307 }
309 if (!hasStreamer(entryClass)) {
310 B2ERROR("The version number in the ClassDef() macro must be at least 1 to enable I/O!" << LogVar("class", entryClass->GetName()));
311 }
313 int splitLevel = m_splitLevel;
314 if (hasCustomStreamer(entryClass)) {
315 B2DEBUG(38, "Class has custom streamer, setting split level -1 for this branch." << LogVar("class", entryClass->GetName()));
317 splitLevel = -1;
318 if (iter.second.isArray) {
319 //for arrays, we also don't want TClonesArray to go around our streamer
320 static_cast<TClonesArray*>(iter.second.object)->BypassStreamer(kFALSE);
321 }
322 }
323 m_tree[durability]->Branch(branchName.c_str(), &iter.second.object, m_basketsize, splitLevel);
324 m_entries[durability].push_back(&iter.second);
325 B2DEBUG(39, "The branch " << branchName << " was created.");
327 //Tell DataStore that we are using this entry
328 if (m_fileIndex == 0) {
330 iter.second.isArray));
331 }
332 }
333 }
335 // set the address of the FileMetaData branch for the output to a separate one from the input
336 TBranch* fileMetaDataBranch = m_tree[DataStore::c_Persistent]->GetBranch("FileMetaData");
337 if (fileMetaDataBranch) {
338 fileMetaDataBranch->SetAddress(&m_outputFileMetaData);
339 } else {
341 }
343 dir->cd();
344 if (m_outputSplitSize) {
345 B2INFO(getName() << ": Opened " << (m_fileIndex > 0 ? "new " : "") << "file for writing" << LogVar("filename", out));
346 }
348 // Insert file entry into the DAQ DB while open new file, online storage feature
349 try {
350 m_db->connect();
351 m_db->execute("INSERT INTO datafiles_root "
352 "(name, path, host, disk, runtype, expno, runno, compression_level, compression_algorithm, time_open, "
353 "size, nevents, nfullevents, correct_close, renamed, used_for_merge, removed) "
354 "VALUES ('%s', '%s', '%s', '%s','%s', %d, %d, %d, %d, '%s', 0, 0, 0, FALSE, FALSE, FALSE, FALSE);",
355 out_notmp.filename().c_str(), out_notmp.c_str(), m_HLTName.c_str(), m_disk.c_str(),
356 m_runType.c_str(), m_expno, m_runno,
358 (boost::posix_time::to_iso_extended_string(boost::posix_time::microsec_clock::local_time())+std::string("+09")).c_str());
359 } catch (const DBHandlerException &e) {
360 B2WARNING(e.what());
361 }
367 // Many safety guards and features for online storage
368 // Skip the first event which is the ZMQ init message
369 if (m_firstEvent) {
370 m_firstEvent = false;
371 return;
372 }
374 // Exp/run number should be increased
375 if ((m_expno > m_eventMetaData->getExperiment()) ||
376 (m_expno == m_eventMetaData->getExperiment() && m_runno > m_eventMetaData->getRun())) {
377 return;
378 }
380 // Close file and set file index to 0 if exp/run number is changed
381 if (m_file) {
382 if (m_expno != m_eventMetaData->getExperiment() || m_runno != m_eventMetaData->getRun()) {
383 closeFile();
384 while (m_file) closeFile(); // I hope that this will not be failed
385 m_fileIndex = 0;
386 }
387 }
389 // Open file with automatic naming
390 if (!m_file) {
391 m_expno = m_eventMetaData->getExperiment();
392 m_runno = m_eventMetaData->getRun();
393 m_processNumber = atoi(getName().substr(0, getName().find(std::string("_"))).c_str());
394 m_disk = StringUtil::form("disk%02d", (m_processNumber%m_nDisk)+1);
395 m_outputFileName = StringUtil::form("/rawdata/%s/belle/Raw/%4.4d/%5.5d/%s.%4.4d.%5.5d.%s.p%02d.root",
396 m_disk.c_str(), m_expno, m_runno, m_runType.c_str(),
397 m_expno, m_runno, m_HLTName.c_str(), m_processNumber);
398 }
400 // if we closed after last event ... make a new one
401 if (!m_file) openFile();
403 if (!m_keepParents) {
404 if (m_fileMetaData) {
405 m_eventMetaData->setParentLfn(m_fileMetaData->getLfn());
406 }
407 }
409 //fill Event data
412 if (m_fileMetaData) {
413 if (m_keepParents) {
414 for (int iparent = 0; iparent < m_fileMetaData->getNParents(); iparent++) {
415 string lfn = m_fileMetaData->getParent(iparent);
416 if (!lfn.empty() && (m_parentLfns.empty() || (m_parentLfns.back() != lfn))) {
417 m_parentLfns.push_back(lfn);
418 }
419 }
420 } else {
421 string lfn = m_fileMetaData->getLfn();
422 if (!lfn.empty() && (m_parentLfns.empty() || (m_parentLfns.back() != lfn))) {
423 m_parentLfns.push_back(lfn);
424 }
425 }
426 }
428 // keep track of file level metadata
429 unsigned long experiment = m_eventMetaData->getExperiment();
430 unsigned long run = m_eventMetaData->getRun();
431 unsigned long event = m_eventMetaData->getEvent();
432 if (m_experimentLow > m_experimentHigh) { //starting condition
433 m_experimentLow = m_experimentHigh = experiment;
434 m_runLow = m_runHigh = run;
436 } else {
437 if ((experiment < m_experimentLow) || ((experiment == m_experimentLow) && ((run < m_runLow) || ((run == m_runLow)
438 && (event < m_eventLow))))) {
439 m_experimentLow = experiment;
440 m_runLow = run;
442 }
443 if ((experiment > m_experimentHigh) || ((experiment == m_experimentHigh) && ((run > m_runHigh) || ((run == m_runHigh)
444 && (event > m_eventHigh))))) {
445 m_experimentHigh = experiment;
446 m_runHigh = run;
448 }
449 }
451 // check if the event is a full event or not: if yes, increase the counter
452 if (m_eventMetaData->getErrorFlag() == 0) // no error flag -> this is a full event
455 // check if we need to split the file
456 if (m_outputSplitSize and (uint64_t)m_file->GetEND() > *m_outputSplitSize) {
457 // close file and open new one
458 B2INFO(getName() << ": Output size limit reached, closing file ...");
459 closeFile();
460 // Introduce while for online storage (can be removed?)
461 while (m_file) closeFile();
462 }
467 bool isMC = (m_fileMetaData) ? m_fileMetaData->isMC() : true;
469 // For online storage, force to declareRealData()
470 // I wonder why the no file meta data is associated with isMC == true
471 isMC = false;
477 //create an index for the event tree
478 TTree* tree = m_tree[DataStore::c_Event];
479 unsigned long numEntries = tree->GetEntries();
481 if (m_buildIndex && numEntries > 0) {
482 if (numEntries > 10000000) {
483 //10M events correspond to about 240MB for the TTreeIndex object. for more than ~45M entries this causes crashes, broken files :(
484 B2WARNING("Not building TTree index because of large number of events. The index object would conflict with ROOT limits on object size and cause problems.");
485 } else if (tree->GetBranch("EventMetaData")) {
486 tree->SetBranchAddress("EventMetaData", nullptr);
488 }
489 }
491 m_outputFileMetaData->setNEvents(numEntries);
493 //starting condition so apparently no events at all
494 m_outputFileMetaData->setLow(-1, -1, 0);
495 m_outputFileMetaData->setHigh(-1, -1, 0);
496 } else {
499 }
500 }
502 //fill more file level metadata
507 auto mcEvents = Environment::Instance().getNumberOfMCEvents();
508 if(m_outputSplitSize and mcEvents > 0) {
509 if(m_fileIndex == 0) B2WARNING("Number of MC Events cannot be saved when splitting output files by size, setting to 0");
510 mcEvents = 0;
511 }
514 for (const auto& item : m_additionalDataDescription) {
515 m_outputFileMetaData->setDataDescription(item.first, item.second);
516 }
517 // Set the LFN to the filename: if it's a URL to directly, otherwise make sure it's absolute
518 std::string lfn = m_file->GetName();
519 if(m_regularFile) {
520 lfn = std::filesystem::absolute(lfn).string();
521 }
522 // Format LFN if BELLE2_LFN_FORMATSTRING is set
523 std::string format = EnvironmentVariables::get("BELLE2_LFN_FORMATSTRING", "");
524 if (!format.empty()) {
525 auto format_filename = boost::python::import("B2Tools.format").attr("format_filename");
526 lfn = boost::python::extract<std::string>(format_filename(format, m_outputFileName, m_outputFileMetaData->getJsonStr()));
527 }
529 //register the file in the catalog
532 }
538 closeFile();
539 // Introduce while for online storage (can be removed?)
540 while (m_file) closeFile();
545 if(!m_file) return;
549 //fill Persistent data
552 //write the trees
553 TDirectory* dir = gDirectory;
554 m_file->cd();
555 for (int durability = 0; durability < DataStore::c_NDurabilityTypes; ++durability) {
556 if (m_tree[durability]) {
557 B2DEBUG(30, "Write TTree " << c_treeNames[durability]);
558 m_tree[durability]->Write(c_treeNames[durability].c_str(), TObject::kWriteDelete);
559 delete m_tree[durability];
560 }
561 m_tree[durability] = nullptr;
562 }
563 dir->cd();
565 const std::string filename = m_file->GetName();
566 if (m_outputSplitSize) {
567 B2INFO(getName() << ": Finished writing file." << LogVar("filename", filename));
568 }
570 // Before deleting m_file, store file list in online storage file list DB table
571 const std::filesystem::path filename_path{filename};
572 std::string filename_notmp = m_file->GetName();
573 filename_notmp.erase(0, 7); // remove "/buffer" in front of full path
574 const std::filesystem::path filename_notmp_path{filename_notmp};
575 try {
576 m_db->connect();
577 m_db->execute("UPDATE datafiles_root SET "
578 "correct_close = TRUE, "
579 "will_merge = %s ,"
580 "nevents = %lu, "
581 "nfullevents = %lu, "
582 "size = %lu, "
583 "time_close = '%s' "
584 "WHERE name = '%s' AND host = '%s';",
585 ((boost::optional<uint64_t>)(m_file->GetSize()*2) < m_outputSplitSize || m_ramdiskBuffer) ? "true" : "false",
587 (boost::posix_time::to_iso_extended_string(boost::posix_time::microsec_clock::local_time())+std::string("+09")).c_str(),
588 filename_path.filename().c_str(), m_HLTName.c_str());
589 } catch (const DBHandlerException &e) {
590 B2WARNING(e.what());
591 }
592 m_db->close();
594 delete m_file;
595 m_file = nullptr;
597 // and now add it to the metadata service as it's fully written
600 // reset some variables
601 for (auto & entry : m_entries) {
602 entry.clear();
603 }
604 m_parentLfns.clear();
605 m_experimentLow = 1;
607 m_runLow = 0;
608 m_runHigh = 0;
609 m_eventLow = 0;
610 m_eventHigh = 0;
611 // and increase index of next file
612 ++m_fileIndex;
614 // Call system(/usr/bin/rsync) for online storage ramdisk buffer cleanup
615 if (m_ramdiskBuffer) {
616 const std::string rsync_cmd = std::string("/usr/bin/rsync -a --remove-source-files --recursive ") + filename + std::string(" ") + filename_notmp_path.parent_path().generic_string() + std::string("/ &");
617 B2INFO(getName() << ": system(" << rsync_cmd << ")");
618 system(rsync_cmd.c_str());
619 }
625 if (!m_tree[durability]) return;
627 TTree& tree = *m_tree[durability];
628 for(auto* entry: m_entries[durability]) {
629 // Check for entries whose object was not created and mark them as invalid.
630 // We still have to write them in the file due to the structure we have. This could be done better
631 if (!entry->ptr) {
632 entry->object->SetBit(kInvalidObject);
633 }
634 //FIXME: Do we need this? in theory no but it crashes in parallel processing otherwise ¯\_(ツ)_/¯
635 if (entry->name == "FileMetaData") {
636 tree.SetBranchAddress(entry->name.c_str(), &m_outputFileMetaData);
637 } else {
638 tree.SetBranchAddress(entry->name.c_str(), &entry->object);
639 }
640 }
641 tree.Fill();
642 for (auto* entry: m_entries[durability]) {
643 entry->object->ResetBit(kInvalidObject);
644 }
646 const bool writeError = m_file->TestBit(TFile::kWriteError);
647 if (writeError) {
648 //m_file deleted first so we have a chance of closing it (though that will probably fail)
649 const std::string filename = m_file->GetName();
650 delete m_file;
651 B2FATAL("A write error occured while saving '" << filename << "', please check if enough disk space is available.");
652 }
