Belle II Software  release-06-01-15
SeqFile.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <framework/pcore/SeqFile.h>
10 #include <framework/logging/Logger.h>
11 
12 #include <ios>
13 #include <fcntl.h>
14 #include <cerrno>
15 
16 #include <boost/iostreams/device/file_descriptor.hpp>
17 #include <boost/iostreams/filter/gzip.hpp>
18 #include <boost/iostreams/filtering_stream.hpp>
19 #include <boost/format.hpp>
20 
21 using namespace Belle2;
22 using namespace std;
23 namespace io = boost::iostreams;
24 
25 SeqFile::SeqFile(const std::string& filename, const std::string& rwflag, char* streamerinfo, int streamerinfo_size,
26  bool filenameIsPattern):
27  m_filename(filename)
28 {
29  if (filename.empty()) {
30  B2ERROR("SeqFile: Empty filename given");
31  return;
32  }
33  bool readonly = rwflag.find('w') == std::string::npos;
34  // is the file already compressed?
35  m_compressed = filename.size() > 3 && filename.compare(filename.size() - 3, 3, ".gz") == 0;
36  // strip .gz suffix to add it at the end automatically and correctly for subsequent files
37  if (m_compressed) {
38  m_filename = filename.substr(0, filename.size() - 3);
39  }
40  // check if we want different naming scheme using boost::format
41  if (filenameIsPattern) {
43  try {
44  m_filename = (boost::format(m_filenamePattern) % m_nfile).str();
45  } catch (std::exception& e) {
46  B2FATAL("Cannot use filename pattern" << m_filenamePattern << ": " << e.what());
47  }
48  }
49 
50  // Store StreamerInfo 2017.5.8
51  m_streamerinfo = nullptr;
53  if (streamerinfo != nullptr && streamerinfo_size > 0) {
54  m_streamerinfo_size = streamerinfo_size;
55  m_streamerinfo = new char[ m_streamerinfo_size ];
56  memcpy(m_streamerinfo, streamerinfo, m_streamerinfo_size);
57  }
58 
59  // open the file
60  openFile(m_filename, readonly);
61  // if that fails and it's not already assumed to be compressed try again adding .gz to the name
62  if (m_fd < 0 && !m_compressed) {
63  B2WARNING("SeqFile: error opening '" << filename << "': " << strerror(errno)
64  << ", trying again with '.gz'");
65  m_compressed = true;
66  openFile(m_filename, readonly);
67  }
68  // is the file open now?
69  if (m_fd < 0) {
70  B2ERROR("SeqFile: error opening '" << filename << "': " << strerror(errno));
71  } else {
72  B2INFO("SeqFile: " << m_filename << " opened (fd=" << m_fd << ")");
73  }
74 
75 }
76 
77 void SeqFile::openFile(std::string filename, bool readonly)
78 {
79 
80  // add compression suffix if file is supposed to be compressed
81  if (m_compressed) filename += ".gz";
82  if (!readonly) {
83  //open file in create mode and set stream correctly
84  m_fd = open(filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0664);
85  auto filter = new io::filtering_ostream();
86  if (m_compressed) filter->push(io::gzip_compressor());
87  filter->push(io::file_descriptor_sink(m_fd, io::close_handle));
88  filter->exceptions(ios_base::badbit | ios_base::failbit);
89  m_stream.reset(filter);
90 
91  //
92  // Write StreamerInfo (2017.5.8)
93  //
94  if (m_streamerinfo == nullptr || m_streamerinfo_size <= 0) {
95  // If you want to use SeqFile for non-sroot file type, please skip this B2FATAL
96  B2FATAL("Invalid size of StreamerInfo : " << m_streamerinfo_size << "bytes");
97  } else {
98  auto* out = dynamic_cast<std::ostream*>(m_stream.get());
99  if (!out) {
100  B2FATAL("SeqFile::write() called on a file opened in read mode");
101  }
102  try {
103  out->write(m_streamerinfo, m_streamerinfo_size);
104  B2INFO("Wrote StreamerInfo at the begenning of the file. : " << m_streamerinfo_size << "bytes");
105  } catch (ios_base::failure& e) {
106 
107  B2ERROR("SeqFile::openFile() error: " << e.what() << ", " << strerror(errno));
108  }
109  }
110 
111  } else {
112  //open file in read mode and set stream correctly
113  m_fd = open(filename.c_str(), O_RDONLY);
114  auto filter = new io::filtering_istream();
115  if (m_compressed) filter->push(io::gzip_decompressor());
116  filter->push(io::file_descriptor_source(m_fd, io::close_handle));
117  filter->exceptions(ios_base::badbit | ios_base::failbit);
118  m_stream.reset(filter);
119  }
120  // reset number of written bytes (does not include streamerinfo )
121  m_nb = 0;
122 }
123 
125 {
126  if (m_streamerinfo != nullptr) delete m_streamerinfo;
127  B2INFO("Closing SeqFile " << m_nfile);
128  //closed automatically by m_stream.
129 }
130 
131 int SeqFile::status() const
132 {
133  return m_fd;
134 }
135 
136 int SeqFile::write(const char* buf)
137 {
138  // cast stream object
139  auto* out = dynamic_cast<std::ostream*>(m_stream.get());
140  if (!out) {
141  B2FATAL("SeqFile::write() called on a file opened in read mode");
142  }
143  int insize = *((int*)buf); // nbytes in the buffer at the beginning
144  if (insize + m_nb >= c_MaxFileSize && m_filename != "/dev/null") {
145  B2INFO("SeqFile: previous file closed (size=" << m_nb << " bytes)");
146  m_nfile++;
147  auto file = m_filename + '-' + std::to_string(m_nfile);
148  if (!m_filenamePattern.empty()) {
149  file = (boost::format(m_filenamePattern) % m_nfile).str();
150  }
151  openFile(file, false);
152  if (m_fd < 0) {
153  B2FATAL("SeqFile::write() error opening file '" << file << "': " << strerror(errno));
154  }
155  m_nb = 0;
156  B2INFO("SeqFile::write() opened '" << file << "'");
157  // update stream pointer since we reopened the file
158  out = dynamic_cast<std::ostream*>(m_stream.get());
159  }
160  try {
161  out->write(buf, insize);
162  m_nb += insize;
163  return insize;
164  } catch (ios_base::failure& e) {
165  B2ERROR("SeqFile::write() error: " << e.what() << ", " << strerror(errno));
166  return 0;
167  }
168 }
169 
170 int SeqFile::read(char* buf, int size)
171 {
172  // cast stream object
173  auto* in = dynamic_cast<std::istream*>(m_stream.get());
174  if (!in) {
175  B2FATAL("SeqFile::read() called on a file opened in write mode");
176  }
177  //trigger eof if there's nothing left int the file. Could throw an error on decompress failure
178  try {
179  in->peek();
180  } catch (ios_base::failure& e) {
181  B2ERROR("SeqFile::read() cannot read file: " << e.what());
182  return -1;
183  }
184  //ok, now we can open the next file reliably
185  if (in->eof()) {
186  // EOF of current file, search for next file
187  m_nfile++;
188  auto nextfile = m_filename + '-' + std::to_string(m_nfile);
189  if (!m_filenamePattern.empty()) {
190  nextfile = (boost::format(m_filenamePattern) % m_nfile).str();
191  }
192  openFile(nextfile, true);
193  if (m_fd < 0) return 0; // End of all files
194  // update the stream pointer
195  in = dynamic_cast<std::istream*>(m_stream.get());
196  B2INFO("SeqFile::read() opened '" << nextfile << "'");
197  }
198  try {
199  // Obtain new header
200  in->read(buf, sizeof(int));
201  } catch (ios_base::failure& e) {
202  B2ERROR("SeqFile::read() " << e.what() << ": couldn't read next record size");
203  return -1;
204  }
205  // Normal processing, extract the record size from the first 4 bytes
206  int recsize = *((int*)buf);
207  if (recsize > size) {
208  B2ERROR("SeqFile::read() buffer too small, need at least " << recsize << " bytes");
209  return -1;
210  }
211  try {
212  in->read(buf + sizeof(int), recsize - sizeof(int));
213  } catch (ios_base::failure& e) {
214  B2ERROR("SeqFile::read() " << e.what() << ": could only read " << in->gcount() << " bytes, expected " << recsize);
215  return -1;
216  }
217  return recsize;
218 }
~SeqFile()
Destructor.
Definition: SeqFile.cc:124
std::string m_filenamePattern
Pattern for creating the file from the sequence number.
Definition: SeqFile.h:60
int m_nb
when saving a file, the total number of bytes written, 0 when reading.
Definition: SeqFile.h:62
void openFile(std::string filename, bool readonly)
actually open the file
Definition: SeqFile.cc:77
int m_streamerinfo_size
size(bytes) of StreamerInfo
Definition: SeqFile.h:71
std::unique_ptr< std::ios > m_stream
pointer to the filtering input or output stream
Definition: SeqFile.h:65
bool m_compressed
is file gzipped compressed?
Definition: SeqFile.h:64
int status() const
Returns status after constructor call.
Definition: SeqFile.cc:131
static const int c_MaxFileSize
maximal size of one file (in Bytes).
Definition: SeqFile.h:57
SeqFile(const std::string &filename, const std::string &rwflag, char *streamerinfo=nullptr, int streamerinfo_size=0, bool filenameIsPattern=false)
Constructor.
Definition: SeqFile.cc:25
std::string m_filename
Name of the opened file.
Definition: SeqFile.h:59
int m_fd
file descriptor.
Definition: SeqFile.h:61
int m_nfile
file counter, starting at 0 (files are split after c_MaxFileSize bytes).
Definition: SeqFile.h:63
int write(const char *buf)
Write a record to a file.
Definition: SeqFile.cc:136
char * m_streamerinfo
StreamerInfo.
Definition: SeqFile.h:68
int read(char *buf, int max)
Read a record from a file.
Definition: SeqFile.cc:170
std::map< ExpRun, std::pair< double, double > > filter(const std::map< ExpRun, std::pair< double, double >> &runs, double cut, std::map< ExpRun, std::pair< double, double >> &runsRemoved)
filter events to remove runs shorter than cut, it stores removed runs in runsRemoved
Definition: Splitter.cc:40
Abstract base class for different kinds of events.