9#include <framework/pcore/SeqFile.h>
10#include <framework/logging/Logger.h>
16#include <boost/iostreams/device/file_descriptor.hpp>
17#include <boost/iostreams/filter/gzip.hpp>
18#include <boost/iostreams/filtering_stream.hpp>
19#include <boost/format.hpp>
23namespace io = boost::iostreams;
25SeqFile::SeqFile(
const std::string& filename,
const std::string& rwflag,
char* streamerinfo,
int streamerinfo_size,
26 bool filenameIsPattern):
30 B2ERROR(
"SeqFile: Empty filename given");
33 bool readonly = rwflag.find(
'w') == std::string::npos;
35 if (
m_filename.compare(0, 7,
"file://") == 0) {
45 if (filenameIsPattern) {
49 }
catch (std::exception& e) {
57 if (streamerinfo !=
nullptr && streamerinfo_size > 0) {
67 B2WARNING(
"SeqFile: error opening '" <<
m_filename <<
"': " << strerror(errno)
68 <<
", trying again with '.gz'");
74 B2ERROR(
"SeqFile: error opening '" <<
m_filename <<
"': " << strerror(errno));
76 B2INFO(
"SeqFile: " <<
m_filename <<
" opened (fd=" <<
m_fd <<
")");
88 m_fd = open(filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0664);
89 auto filter =
new io::filtering_ostream();
91 filter->push(io::file_descriptor_sink(
m_fd, io::close_handle));
92 filter->exceptions(ios_base::badbit | ios_base::failbit);
102 auto* out =
dynamic_cast<std::ostream*
>(
m_stream.get());
104 B2FATAL(
"SeqFile::write() called on a file opened in read mode");
108 B2INFO(
"Wrote StreamerInfo at the begenning of the file. : " <<
m_streamerinfo_size <<
"bytes");
109 }
catch (ios_base::failure& e) {
111 B2ERROR(
"SeqFile::openFile() error: " << e.what() <<
", " << strerror(errno));
117 m_fd = open(filename.c_str(), O_RDONLY);
118 auto filter =
new io::filtering_istream();
119 if (
m_compressed) filter->push(io::gzip_decompressor());
120 filter->push(io::file_descriptor_source(
m_fd, io::close_handle));
121 filter->exceptions(ios_base::badbit | ios_base::failbit);
131 B2INFO(
"Closing SeqFile " <<
m_nfile);
143 auto* out =
dynamic_cast<std::ostream*
>(
m_stream.get());
145 B2FATAL(
"SeqFile::write() called on a file opened in read mode");
147 int insize = *((
int*)buf);
149 B2INFO(
"SeqFile: previous file closed (size=" <<
m_nb <<
" bytes)");
157 B2FATAL(
"SeqFile::write() error opening file '" << file <<
"': " << strerror(errno));
160 B2INFO(
"SeqFile::write() opened '" << file <<
"'");
162 out =
dynamic_cast<std::ostream*
>(
m_stream.get());
165 out->write(buf, insize);
168 }
catch (ios_base::failure& e) {
169 B2ERROR(
"SeqFile::write() error: " << e.what() <<
", " << strerror(errno));
177 auto* in =
dynamic_cast<std::istream*
>(
m_stream.get());
179 B2FATAL(
"SeqFile::read() called on a file opened in write mode");
184 }
catch (ios_base::failure& e) {
185 B2ERROR(
"SeqFile::read() cannot read file: " << e.what());
197 if (
m_fd < 0)
return 0;
199 in =
dynamic_cast<std::istream*
>(
m_stream.get());
200 B2INFO(
"SeqFile::read() opened '" << nextfile <<
"'");
204 in->read(buf,
sizeof(
int));
205 }
catch (ios_base::failure& e) {
206 B2ERROR(
"SeqFile::read() " << e.what() <<
": couldn't read next record size");
210 int recsize = *((
int*)buf);
211 if (recsize > size) {
212 B2ERROR(
"SeqFile::read() buffer too small, need at least " << recsize <<
" bytes");
216 in->read(buf +
sizeof(
int), recsize -
sizeof(
int));
217 }
catch (ios_base::failure& e) {
218 B2ERROR(
"SeqFile::read() " << e.what() <<
": could only read " << in->gcount() <<
" bytes, expected " << recsize);
std::string m_filenamePattern
Pattern for creating the file from the sequence number.
int m_nb
when saving a file, the total number of bytes written, 0 when reading.
void openFile(std::string filename, bool readonly)
actually open the file
int m_streamerinfo_size
size(bytes) of StreamerInfo
std::unique_ptr< std::ios > m_stream
pointer to the filtering input or output stream
bool m_compressed
is file gzipped compressed?
int status() const
Returns status after constructor call.
static const int c_MaxFileSize
maximal size of one file (in Bytes).
SeqFile(const std::string &filename, const std::string &rwflag, char *streamerinfo=nullptr, int streamerinfo_size=0, bool filenameIsPattern=false)
Constructor.
std::string m_filename
Name of the opened file.
int m_nfile
file counter, starting at 0 (files are split after c_MaxFileSize bytes).
int write(const char *buf)
Write a record to a file.
char * m_streamerinfo
StreamerInfo.
int read(char *buf, int max)
Read a record from a file.
Abstract base class for different kinds of events.