9 #include <framework/pcore/SeqFile.h> 
   10 #include <framework/logging/Logger.h> 
   16 #include <boost/iostreams/device/file_descriptor.hpp> 
   17 #include <boost/iostreams/filter/gzip.hpp> 
   18 #include <boost/iostreams/filtering_stream.hpp> 
   19 #include <boost/format.hpp> 
   23 namespace io = boost::iostreams;
 
   25 SeqFile::SeqFile(
const std::string& filename, 
const std::string& rwflag, 
char* streamerinfo, 
int streamerinfo_size,
 
   26                  bool filenameIsPattern):
 
   29   if (filename.empty()) {
 
   30     B2ERROR(
"SeqFile: Empty filename given");
 
   33   bool readonly = rwflag.find(
'w') == std::string::npos;
 
   35   m_compressed = filename.size() > 3 && filename.compare(filename.size() - 3, 3, 
".gz") == 0;
 
   38     m_filename = filename.substr(0, filename.size() - 3);
 
   41   if (filenameIsPattern) {
 
   45     } 
catch (std::exception& e) {
 
   53   if (streamerinfo != 
nullptr && streamerinfo_size > 0) {
 
   63     B2WARNING(
"SeqFile: error opening '" << filename << 
"': " << strerror(errno)
 
   64               << 
", trying again with '.gz'");
 
   70     B2ERROR(
"SeqFile: error opening '" << filename << 
"': " << strerror(errno));
 
   72     B2INFO(
"SeqFile: " << 
m_filename << 
" opened (fd=" << 
m_fd << 
")");
 
   84     m_fd = open(filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0664);
 
   85     auto filter = 
new io::filtering_ostream();
 
   87     filter->push(io::file_descriptor_sink(
m_fd, io::close_handle));
 
   88     filter->exceptions(ios_base::badbit | ios_base::failbit);
 
   98       auto* out = 
dynamic_cast<std::ostream*
>(
m_stream.get());
 
  100         B2FATAL(
"SeqFile::write() called on a file opened in read mode");
 
  104         B2INFO(
"Wrote StreamerInfo at the begenning of the file. : " << 
m_streamerinfo_size << 
"bytes");
 
  105       } 
catch (ios_base::failure& e) {
 
  107         B2ERROR(
"SeqFile::openFile() error: " << e.what() << 
", " << strerror(errno));
 
  113     m_fd = open(filename.c_str(), O_RDONLY);
 
  114     auto filter = 
new io::filtering_istream();
 
  115     if (
m_compressed) filter->push(io::gzip_decompressor());
 
  116     filter->push(io::file_descriptor_source(
m_fd, io::close_handle));
 
  117     filter->exceptions(ios_base::badbit | ios_base::failbit);
 
  127   B2INFO(
"Closing SeqFile " << 
m_nfile);
 
  139   auto* out = 
dynamic_cast<std::ostream*
>(
m_stream.get());
 
  141     B2FATAL(
"SeqFile::write() called on a file opened in read mode");
 
  143   int insize = *((
int*)buf); 
 
  145     B2INFO(
"SeqFile: previous file closed (size=" << 
m_nb << 
" bytes)");
 
  153       B2FATAL(
"SeqFile::write() error opening file '" << file << 
"': " << strerror(errno));
 
  156     B2INFO(
"SeqFile::write() opened '" << file << 
"'");
 
  158     out = 
dynamic_cast<std::ostream*
>(
m_stream.get());
 
  161     out->write(buf, insize);
 
  164   } 
catch (ios_base::failure& e) {
 
  165     B2ERROR(
"SeqFile::write() error: " << e.what() << 
", " << strerror(errno));
 
  173   auto* in = 
dynamic_cast<std::istream*
>(
m_stream.get());
 
  175     B2FATAL(
"SeqFile::read() called on a file opened in write mode");
 
  180   } 
catch (ios_base::failure& e) {
 
  181     B2ERROR(
"SeqFile::read() cannot read file: " << e.what());
 
  193     if (
m_fd < 0) 
return 0;   
 
  195     in = 
dynamic_cast<std::istream*
>(
m_stream.get());
 
  196     B2INFO(
"SeqFile::read() opened '" << nextfile << 
"'");
 
  200     in->read(buf, 
sizeof(
int));
 
  201   } 
catch (ios_base::failure& e) {
 
  202     B2ERROR(
"SeqFile::read() " << e.what() << 
": couldn't read next record size");
 
  206   int recsize = *((
int*)buf);
 
  207   if (recsize > size) {
 
  208     B2ERROR(
"SeqFile::read() buffer too small, need at least " << recsize << 
" bytes");
 
  212     in->read(buf + 
sizeof(
int), recsize - 
sizeof(
int));
 
  213   } 
catch (ios_base::failure& e) {
 
  214     B2ERROR(
"SeqFile::read() " << e.what() << 
": could only read " << in->gcount() << 
" bytes, expected " << recsize);
 
std::string m_filenamePattern
Pattern for creating the file from the sequence number.
int m_nb
when saving a file, the total number of bytes written, 0 when reading.
void openFile(std::string filename, bool readonly)
actually open the file
int m_streamerinfo_size
size(bytes) of StreamerInfo
std::unique_ptr< std::ios > m_stream
pointer to the filtering input or output stream
bool m_compressed
is file gzipped compressed?
int status() const
Returns status after constructor call.
static const int c_MaxFileSize
maximal size of one file (in Bytes).
SeqFile(const std::string &filename, const std::string &rwflag, char *streamerinfo=nullptr, int streamerinfo_size=0, bool filenameIsPattern=false)
Constructor.
std::string m_filename
Name of the opened file.
int m_nfile
file counter, starting at 0 (files are split after c_MaxFileSize bytes).
int write(const char *buf)
Write a record to a file.
char * m_streamerinfo
StreamerInfo.
int read(char *buf, int max)
Read a record from a file.
Abstract base class for different kinds of events.