Belle II Software  release-08-01-10
SeqFile.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <framework/pcore/SeqFile.h>
10 #include <framework/logging/Logger.h>
11 
12 #include <ios>
13 #include <fcntl.h>
14 #include <cerrno>
15 
16 #include <boost/iostreams/device/file_descriptor.hpp>
17 #include <boost/iostreams/filter/gzip.hpp>
18 #include <boost/iostreams/filtering_stream.hpp>
19 #include <boost/format.hpp>
20 
21 using namespace Belle2;
22 using namespace std;
23 namespace io = boost::iostreams;
24 
25 SeqFile::SeqFile(const std::string& filename, const std::string& rwflag, char* streamerinfo, int streamerinfo_size,
26  bool filenameIsPattern):
27  m_filename(filename)
28 {
29  if (m_filename.empty()) {
30  B2ERROR("SeqFile: Empty filename given");
31  return;
32  }
33  bool readonly = rwflag.find('w') == std::string::npos;
34  // CAF use URL input style and add a prefix that need to be removed for sroot files
35  if (m_filename.compare(0, 7, "file://") == 0) {
36  m_filename = m_filename.substr(7, m_filename.size() - 7);
37  }
38  // is the file already compressed?
39  m_compressed = m_filename.size() > 3 && m_filename.compare(m_filename.size() - 3, 3, ".gz") == 0;
40  // strip .gz suffix to add it at the end automatically and correctly for subsequent files
41  if (m_compressed) {
42  m_filename = m_filename.substr(0, m_filename.size() - 3);
43  }
44  // check if we want different naming scheme using boost::format
45  if (filenameIsPattern) {
47  try {
48  m_filename = (boost::format(m_filenamePattern) % m_nfile).str();
49  } catch (std::exception& e) {
50  B2FATAL("Cannot use filename pattern" << m_filenamePattern << ": " << e.what());
51  }
52  }
53 
54  // Store StreamerInfo 2017.5.8
55  m_streamerinfo = nullptr;
57  if (streamerinfo != nullptr && streamerinfo_size > 0) {
58  m_streamerinfo_size = streamerinfo_size;
59  m_streamerinfo = new char[ m_streamerinfo_size ];
60  memcpy(m_streamerinfo, streamerinfo, m_streamerinfo_size);
61  }
62 
63  // open the file
64  openFile(m_filename, readonly);
65  // if that fails and it's not already assumed to be compressed try again adding .gz to the name
66  if (m_fd < 0 && !m_compressed) {
67  B2WARNING("SeqFile: error opening '" << m_filename << "': " << strerror(errno)
68  << ", trying again with '.gz'");
69  m_compressed = true;
70  openFile(m_filename, readonly);
71  }
72  // is the file open now?
73  if (m_fd < 0) {
74  B2ERROR("SeqFile: error opening '" << m_filename << "': " << strerror(errno));
75  } else {
76  B2INFO("SeqFile: " << m_filename << " opened (fd=" << m_fd << ")");
77  }
78 
79 }
80 
81 void SeqFile::openFile(std::string filename, bool readonly)
82 {
83 
84  // add compression suffix if file is supposed to be compressed
85  if (m_compressed) filename += ".gz";
86  if (!readonly) {
87  //open file in create mode and set stream correctly
88  m_fd = open(filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0664);
89  auto filter = new io::filtering_ostream();
90  if (m_compressed) filter->push(io::gzip_compressor());
91  filter->push(io::file_descriptor_sink(m_fd, io::close_handle));
92  filter->exceptions(ios_base::badbit | ios_base::failbit);
93  m_stream.reset(filter);
94 
95  //
96  // Write StreamerInfo (2017.5.8)
97  //
98  if (m_streamerinfo == nullptr || m_streamerinfo_size <= 0) {
99  // If you want to use SeqFile for non-sroot file type, please skip this B2FATAL
100  B2FATAL("Invalid size of StreamerInfo : " << m_streamerinfo_size << "bytes");
101  } else {
102  auto* out = dynamic_cast<std::ostream*>(m_stream.get());
103  if (!out) {
104  B2FATAL("SeqFile::write() called on a file opened in read mode");
105  }
106  try {
107  out->write(m_streamerinfo, m_streamerinfo_size);
108  B2INFO("Wrote StreamerInfo at the begenning of the file. : " << m_streamerinfo_size << "bytes");
109  } catch (ios_base::failure& e) {
110 
111  B2ERROR("SeqFile::openFile() error: " << e.what() << ", " << strerror(errno));
112  }
113  }
114 
115  } else {
116  //open file in read mode and set stream correctly
117  m_fd = open(filename.c_str(), O_RDONLY);
118  auto filter = new io::filtering_istream();
119  if (m_compressed) filter->push(io::gzip_decompressor());
120  filter->push(io::file_descriptor_source(m_fd, io::close_handle));
121  filter->exceptions(ios_base::badbit | ios_base::failbit);
122  m_stream.reset(filter);
123  }
124  // reset number of written bytes (does not include streamerinfo )
125  m_nb = 0;
126 }
127 
129 {
130  if (m_streamerinfo != nullptr) delete m_streamerinfo;
131  B2INFO("Closing SeqFile " << m_nfile);
132  //closed automatically by m_stream.
133 }
134 
135 int SeqFile::status() const
136 {
137  return m_fd;
138 }
139 
140 int SeqFile::write(const char* buf)
141 {
142  // cast stream object
143  auto* out = dynamic_cast<std::ostream*>(m_stream.get());
144  if (!out) {
145  B2FATAL("SeqFile::write() called on a file opened in read mode");
146  }
147  int insize = *((int*)buf); // nbytes in the buffer at the beginning
148  if (insize + m_nb >= c_MaxFileSize && m_filename != "/dev/null") {
149  B2INFO("SeqFile: previous file closed (size=" << m_nb << " bytes)");
150  m_nfile++;
151  auto file = m_filename + '-' + std::to_string(m_nfile);
152  if (!m_filenamePattern.empty()) {
153  file = (boost::format(m_filenamePattern) % m_nfile).str();
154  }
155  openFile(file, false);
156  if (m_fd < 0) {
157  B2FATAL("SeqFile::write() error opening file '" << file << "': " << strerror(errno));
158  }
159  m_nb = 0;
160  B2INFO("SeqFile::write() opened '" << file << "'");
161  // update stream pointer since we reopened the file
162  out = dynamic_cast<std::ostream*>(m_stream.get());
163  }
164  try {
165  out->write(buf, insize);
166  m_nb += insize;
167  return insize;
168  } catch (ios_base::failure& e) {
169  B2ERROR("SeqFile::write() error: " << e.what() << ", " << strerror(errno));
170  return 0;
171  }
172 }
173 
174 int SeqFile::read(char* buf, int size)
175 {
176  // cast stream object
177  auto* in = dynamic_cast<std::istream*>(m_stream.get());
178  if (!in) {
179  B2FATAL("SeqFile::read() called on a file opened in write mode");
180  }
181  //trigger eof if there's nothing left int the file. Could throw an error on decompress failure
182  try {
183  in->peek();
184  } catch (ios_base::failure& e) {
185  B2ERROR("SeqFile::read() cannot read file: " << e.what());
186  return -1;
187  }
188  //ok, now we can open the next file reliably
189  if (in->eof()) {
190  // EOF of current file, search for next file
191  m_nfile++;
192  auto nextfile = m_filename + '-' + std::to_string(m_nfile);
193  if (!m_filenamePattern.empty()) {
194  nextfile = (boost::format(m_filenamePattern) % m_nfile).str();
195  }
196  openFile(nextfile, true);
197  if (m_fd < 0) return 0; // End of all files
198  // update the stream pointer
199  in = dynamic_cast<std::istream*>(m_stream.get());
200  B2INFO("SeqFile::read() opened '" << nextfile << "'");
201  }
202  try {
203  // Obtain new header
204  in->read(buf, sizeof(int));
205  } catch (ios_base::failure& e) {
206  B2ERROR("SeqFile::read() " << e.what() << ": couldn't read next record size");
207  return -1;
208  }
209  // Normal processing, extract the record size from the first 4 bytes
210  int recsize = *((int*)buf);
211  if (recsize > size) {
212  B2ERROR("SeqFile::read() buffer too small, need at least " << recsize << " bytes");
213  return -1;
214  }
215  try {
216  in->read(buf + sizeof(int), recsize - sizeof(int));
217  } catch (ios_base::failure& e) {
218  B2ERROR("SeqFile::read() " << e.what() << ": could only read " << in->gcount() << " bytes, expected " << recsize);
219  return -1;
220  }
221  return recsize;
222 }
~SeqFile()
Destructor.
Definition: SeqFile.cc:128
std::string m_filenamePattern
Pattern for creating the file from the sequence number.
Definition: SeqFile.h:60
int m_nb
when saving a file, the total number of bytes written, 0 when reading.
Definition: SeqFile.h:62
void openFile(std::string filename, bool readonly)
actually open the file
Definition: SeqFile.cc:81
int m_streamerinfo_size
size(bytes) of StreamerInfo
Definition: SeqFile.h:71
std::unique_ptr< std::ios > m_stream
pointer to the filtering input or output stream
Definition: SeqFile.h:65
bool m_compressed
is file gzipped compressed?
Definition: SeqFile.h:64
int status() const
Returns status after constructor call.
Definition: SeqFile.cc:135
static const int c_MaxFileSize
maximal size of one file (in Bytes).
Definition: SeqFile.h:57
SeqFile(const std::string &filename, const std::string &rwflag, char *streamerinfo=nullptr, int streamerinfo_size=0, bool filenameIsPattern=false)
Constructor.
Definition: SeqFile.cc:25
std::string m_filename
Name of the opened file.
Definition: SeqFile.h:59
int m_fd
file descriptor.
Definition: SeqFile.h:61
int m_nfile
file counter, starting at 0 (files are split after c_MaxFileSize bytes).
Definition: SeqFile.h:63
int write(const char *buf)
Write a record to a file.
Definition: SeqFile.cc:140
char * m_streamerinfo
StreamerInfo.
Definition: SeqFile.h:68
int read(char *buf, int max)
Read a record from a file.
Definition: SeqFile.cc:174
std::map< ExpRun, std::pair< double, double > > filter(const std::map< ExpRun, std::pair< double, double >> &runs, double cut, std::map< ExpRun, std::pair< double, double >> &runsRemoved)
filter events to remove runs shorter than cut, it stores removed runs in runsRemoved
Definition: Splitter.cc:38
Abstract base class for different kinds of events.