9 #include <framework/pcore/SeqFile.h>
10 #include <framework/logging/Logger.h>
16 #include <boost/iostreams/device/file_descriptor.hpp>
17 #include <boost/iostreams/filter/gzip.hpp>
18 #include <boost/iostreams/filtering_stream.hpp>
19 #include <boost/format.hpp>
23 namespace io = boost::iostreams;
25 SeqFile::SeqFile(
const std::string& filename,
const std::string& rwflag,
char* streamerinfo,
int streamerinfo_size,
26 bool filenameIsPattern):
29 if (filename.empty()) {
30 B2ERROR(
"SeqFile: Empty filename given");
33 bool readonly = rwflag.find(
'w') == std::string::npos;
35 m_compressed = filename.size() > 3 && filename.compare(filename.size() - 3, 3,
".gz") == 0;
38 m_filename = filename.substr(0, filename.size() - 3);
41 if (filenameIsPattern) {
45 }
catch (std::exception& e) {
53 if (streamerinfo !=
nullptr && streamerinfo_size > 0) {
63 B2WARNING(
"SeqFile: error opening '" << filename <<
"': " << strerror(errno)
64 <<
", trying again with '.gz'");
70 B2ERROR(
"SeqFile: error opening '" << filename <<
"': " << strerror(errno));
72 B2INFO(
"SeqFile: " <<
m_filename <<
" opened (fd=" <<
m_fd <<
")");
84 m_fd = open(filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0664);
85 auto filter =
new io::filtering_ostream();
87 filter->push(io::file_descriptor_sink(
m_fd, io::close_handle));
88 filter->exceptions(ios_base::badbit | ios_base::failbit);
98 auto* out =
dynamic_cast<std::ostream*
>(
m_stream.get());
100 B2FATAL(
"SeqFile::write() called on a file opened in read mode");
104 B2INFO(
"Wrote StreamerInfo at the begenning of the file. : " <<
m_streamerinfo_size <<
"bytes");
105 }
catch (ios_base::failure& e) {
107 B2ERROR(
"SeqFile::openFile() error: " << e.what() <<
", " << strerror(errno));
113 m_fd = open(filename.c_str(), O_RDONLY);
114 auto filter =
new io::filtering_istream();
116 filter->push(io::file_descriptor_source(
m_fd, io::close_handle));
117 filter->exceptions(ios_base::badbit | ios_base::failbit);
127 B2INFO(
"Closing SeqFile " <<
m_nfile);
139 auto* out =
dynamic_cast<std::ostream*
>(
m_stream.get());
141 B2FATAL(
"SeqFile::write() called on a file opened in read mode");
143 int insize = *((
int*)buf);
145 B2INFO(
"SeqFile: previous file closed (size=" <<
m_nb <<
" bytes)");
153 B2FATAL(
"SeqFile::write() error opening file '" << file <<
"': " << strerror(errno));
156 B2INFO(
"SeqFile::write() opened '" << file <<
"'");
158 out =
dynamic_cast<std::ostream*
>(
m_stream.get());
161 out->write(buf, insize);
164 }
catch (ios_base::failure& e) {
165 B2ERROR(
"SeqFile::write() error: " << e.what() <<
", " << strerror(errno));
173 auto* in =
dynamic_cast<std::istream*
>(
m_stream.get());
175 B2FATAL(
"SeqFile::read() called on a file opened in write mode");
180 }
catch (ios_base::failure& e) {
181 B2ERROR(
"SeqFile::read() cannot read file: " << e.what());
193 if (
m_fd < 0)
return 0;
195 in =
dynamic_cast<std::istream*
>(
m_stream.get());
196 B2INFO(
"SeqFile::read() opened '" << nextfile <<
"'");
200 in->read(buf,
sizeof(
int));
201 }
catch (ios_base::failure& e) {
202 B2ERROR(
"SeqFile::read() " << e.what() <<
": couldn't read next record size");
206 int recsize = *((
int*)buf);
207 if (recsize > size) {
208 B2ERROR(
"SeqFile::read() buffer too small, need at least " << recsize <<
" bytes");
212 in->read(buf +
sizeof(
int), recsize -
sizeof(
int));
213 }
catch (ios_base::failure& e) {
214 B2ERROR(
"SeqFile::read() " << e.what() <<
": could only read " << in->gcount() <<
" bytes, expected " << recsize);
std::string m_filenamePattern
Pattern for creating the file from the sequence number.
int m_nb
when saving a file, the total number of bytes written, 0 when reading.
void openFile(std::string filename, bool readonly)
actually open the file
int m_streamerinfo_size
size(bytes) of StreamerInfo
std::unique_ptr< std::ios > m_stream
pointer to the filtering input or output stream
bool m_compressed
is file gzipped compressed?
int status() const
Returns status after constructor call.
static const int c_MaxFileSize
maximal size of one file (in Bytes).
SeqFile(const std::string &filename, const std::string &rwflag, char *streamerinfo=nullptr, int streamerinfo_size=0, bool filenameIsPattern=false)
Constructor.
std::string m_filename
Name of the opened file.
int m_nfile
file counter, starting at 0 (files are split after c_MaxFileSize bytes).
int write(const char *buf)
Write a record to a file.
char * m_streamerinfo
StreamerInfo.
int read(char *buf, int max)
Read a record from a file.
std::map< ExpRun, std::pair< double, double > > filter(const std::map< ExpRun, std::pair< double, double >> &runs, double cut, std::map< ExpRun, std::pair< double, double >> &runsRemoved)
filter events to remove runs shorter than cut, it stores removed runs in runsRemoved
Abstract base class for different kinds of events.