Belle II Software development
SeqFile Class Reference

A class to manage I/O for a chain of blocked files. More...

#include <SeqFile.h>

Public Member Functions

 SeqFile (const std::string &filename, const std::string &rwflag, char *streamerinfo=nullptr, int streamerinfo_size=0, bool filenameIsPattern=false)
 Constructor.
 
 ~SeqFile ()
 Destructor.
 
 SeqFile (const SeqFile &)=delete
 No copying.
 
SeqFileoperator= (const SeqFile &)=delete
 No assignment.
 
int status () const
 Returns status after constructor call.
 
int write (const char *buf)
 Write a record to a file.
 
int read (char *buf, int max)
 Read a record from a file.
 

Private Member Functions

void openFile (std::string filename, bool readonly)
 actually open the file
 

Private Attributes

std::string m_filename
 Name of the opened file.
 
std::string m_filenamePattern
 Pattern for creating the file from the sequence number.
 
int m_fd { -1}
 file descriptor.
 
int m_nb {0}
 when saving a file, the total number of bytes written, 0 when reading.
 
int m_nfile {0}
 file counter, starting at 0 (files are split after c_MaxFileSize bytes).
 
bool m_compressed {false}
 is file gzipped compressed?
 
std::unique_ptr< std::ios > m_stream
 pointer to the filtering input or output stream
 
char * m_streamerinfo
 StreamerInfo.
 
int m_streamerinfo_size
 size(bytes) of StreamerInfo
 

Static Private Attributes

static const int c_MaxFileSize {512000000 * 4}
 maximal size of one file (in Bytes).
 

Detailed Description

A class to manage I/O for a chain of blocked files.

Definition at line 22 of file SeqFile.h.

Constructor & Destructor Documentation

◆ SeqFile()

SeqFile ( const std::string &  filename,
const std::string &  rwflag,
char *  streamerinfo = nullptr,
int  streamerinfo_size = 0,
bool  filenameIsPattern = false 
)

Constructor.

Parameters
filenamename of the file
rwflagshould probably be r or rw
streamerinfoString containing the streamer info
streamerinfo_sizeSize of the string containing the streamer info
filenameIsPatternif true interpret the filename as a boost::format pattern which takes the sequence number as argument instead of producing .sroot-N files

Definition at line 25 of file SeqFile.cc.

26 :
27 m_filename(filename)
28{
29 if (m_filename.empty()) {
30 B2ERROR("SeqFile: Empty filename given");
31 return;
32 }
33 bool readonly = rwflag.find('w') == std::string::npos;
34 // CAF use URL input style and add a prefix that need to be removed for sroot files
35 if (m_filename.compare(0, 7, "file://") == 0) {
36 m_filename = m_filename.substr(7, m_filename.size() - 7);
37 }
38 // is the file already compressed?
39 m_compressed = m_filename.size() > 3 && m_filename.compare(m_filename.size() - 3, 3, ".gz") == 0;
40 // strip .gz suffix to add it at the end automatically and correctly for subsequent files
41 if (m_compressed) {
42 m_filename = m_filename.substr(0, m_filename.size() - 3);
43 }
44 // check if we want different naming scheme using boost::format
45 if (filenameIsPattern) {
47 try {
48 m_filename = (boost::format(m_filenamePattern) % m_nfile).str();
49 } catch (std::exception& e) {
50 B2FATAL("Cannot use filename pattern" << m_filenamePattern << ": " << e.what());
51 }
52 }
53
54 // Store StreamerInfo 2017.5.8
55 m_streamerinfo = nullptr;
57 if (streamerinfo != nullptr && streamerinfo_size > 0) {
58 m_streamerinfo_size = streamerinfo_size;
60 memcpy(m_streamerinfo, streamerinfo, m_streamerinfo_size);
61 }
62
63 // open the file
64 openFile(m_filename, readonly);
65 // if that fails and it's not already assumed to be compressed try again adding .gz to the name
66 if (m_fd < 0 && !m_compressed) {
67 B2WARNING("SeqFile: error opening '" << m_filename << "': " << strerror(errno)
68 << ", trying again with '.gz'");
69 m_compressed = true;
70 openFile(m_filename, readonly);
71 }
72 // is the file open now?
73 if (m_fd < 0) {
74 B2ERROR("SeqFile: error opening '" << m_filename << "': " << strerror(errno));
75 } else {
76 B2INFO("SeqFile: " << m_filename << " opened (fd=" << m_fd << ")");
77 }
78
79}
std::string m_filenamePattern
Pattern for creating the file from the sequence number.
Definition: SeqFile.h:60
void openFile(std::string filename, bool readonly)
actually open the file
Definition: SeqFile.cc:81
int m_streamerinfo_size
size(bytes) of StreamerInfo
Definition: SeqFile.h:71
bool m_compressed
is file gzipped compressed?
Definition: SeqFile.h:64
std::string m_filename
Name of the opened file.
Definition: SeqFile.h:59
int m_fd
file descriptor.
Definition: SeqFile.h:61
int m_nfile
file counter, starting at 0 (files are split after c_MaxFileSize bytes).
Definition: SeqFile.h:63
char * m_streamerinfo
StreamerInfo.
Definition: SeqFile.h:68

◆ ~SeqFile()

~SeqFile ( )

Destructor.

Definition at line 128 of file SeqFile.cc.

129{
130 if (m_streamerinfo != nullptr) delete m_streamerinfo;
131 B2INFO("Closing SeqFile " << m_nfile);
132 //closed automatically by m_stream.
133}

Member Function Documentation

◆ openFile()

void openFile ( std::string  filename,
bool  readonly 
)
private

actually open the file

Definition at line 81 of file SeqFile.cc.

82{
83
84 // add compression suffix if file is supposed to be compressed
85 if (m_compressed) filename += ".gz";
86 if (!readonly) {
87 //open file in create mode and set stream correctly
88 m_fd = open(filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0664);
89 auto filter = new io::filtering_ostream();
90 if (m_compressed) filter->push(io::gzip_compressor());
91 filter->push(io::file_descriptor_sink(m_fd, io::close_handle));
92 filter->exceptions(ios_base::badbit | ios_base::failbit);
93 m_stream.reset(filter);
94
95 //
96 // Write StreamerInfo (2017.5.8)
97 //
98 if (m_streamerinfo == nullptr || m_streamerinfo_size <= 0) {
99 // If you want to use SeqFile for non-sroot file type, please skip this B2FATAL
100 B2FATAL("Invalid size of StreamerInfo : " << m_streamerinfo_size << "bytes");
101 } else {
102 auto* out = dynamic_cast<std::ostream*>(m_stream.get());
103 if (!out) {
104 B2FATAL("SeqFile::write() called on a file opened in read mode");
105 }
106 try {
108 B2INFO("Wrote StreamerInfo at the begenning of the file. : " << m_streamerinfo_size << "bytes");
109 } catch (ios_base::failure& e) {
110
111 B2ERROR("SeqFile::openFile() error: " << e.what() << ", " << strerror(errno));
112 }
113 }
114
115 } else {
116 //open file in read mode and set stream correctly
117 m_fd = open(filename.c_str(), O_RDONLY);
118 auto filter = new io::filtering_istream();
119 if (m_compressed) filter->push(io::gzip_decompressor());
120 filter->push(io::file_descriptor_source(m_fd, io::close_handle));
121 filter->exceptions(ios_base::badbit | ios_base::failbit);
122 m_stream.reset(filter);
123 }
124 // reset number of written bytes (does not include streamerinfo )
125 m_nb = 0;
126}
int m_nb
when saving a file, the total number of bytes written, 0 when reading.
Definition: SeqFile.h:62
std::unique_ptr< std::ios > m_stream
pointer to the filtering input or output stream
Definition: SeqFile.h:65
std::map< ExpRun, std::pair< double, double > > filter(const std::map< ExpRun, std::pair< double, double > > &runs, double cut, std::map< ExpRun, std::pair< double, double > > &runsRemoved)
filter events to remove runs shorter than cut, it stores removed runs in runsRemoved
Definition: Splitter.cc:38

◆ read()

int read ( char *  buf,
int  max 
)

Read a record from a file.

The record length is returned.

Definition at line 174 of file SeqFile.cc.

175{
176 // cast stream object
177 auto* in = dynamic_cast<std::istream*>(m_stream.get());
178 if (!in) {
179 B2FATAL("SeqFile::read() called on a file opened in write mode");
180 }
181 //trigger eof if there's nothing left int the file. Could throw an error on decompress failure
182 try {
183 in->peek();
184 } catch (ios_base::failure& e) {
185 B2ERROR("SeqFile::read() cannot read file: " << e.what());
186 return -1;
187 }
188 //ok, now we can open the next file reliably
189 if (in->eof()) {
190 // EOF of current file, search for next file
191 m_nfile++;
192 auto nextfile = m_filename + '-' + std::to_string(m_nfile);
193 if (!m_filenamePattern.empty()) {
194 nextfile = (boost::format(m_filenamePattern) % m_nfile).str();
195 }
196 openFile(nextfile, true);
197 if (m_fd < 0) return 0; // End of all files
198 // update the stream pointer
199 in = dynamic_cast<std::istream*>(m_stream.get());
200 B2INFO("SeqFile::read() opened '" << nextfile << "'");
201 }
202 try {
203 // Obtain new header
204 in->read(buf, sizeof(int));
205 } catch (ios_base::failure& e) {
206 B2ERROR("SeqFile::read() " << e.what() << ": couldn't read next record size");
207 return -1;
208 }
209 // Normal processing, extract the record size from the first 4 bytes
210 int recsize = *((int*)buf);
211 if (recsize > size) {
212 B2ERROR("SeqFile::read() buffer too small, need at least " << recsize << " bytes");
213 return -1;
214 }
215 try {
216 in->read(buf + sizeof(int), recsize - sizeof(int));
217 } catch (ios_base::failure& e) {
218 B2ERROR("SeqFile::read() " << e.what() << ": could only read " << in->gcount() << " bytes, expected " << recsize);
219 return -1;
220 }
221 return recsize;
222}

◆ status()

int status ( ) const

Returns status after constructor call.

If success, fd is returned. If not, -1

Definition at line 135 of file SeqFile.cc.

136{
137 return m_fd;
138}

◆ write()

int write ( const char *  buf)

Write a record to a file.

First word of the record should contain number of words.

Definition at line 140 of file SeqFile.cc.

141{
142 // cast stream object
143 auto* out = dynamic_cast<std::ostream*>(m_stream.get());
144 if (!out) {
145 B2FATAL("SeqFile::write() called on a file opened in read mode");
146 }
147 int insize = *((int*)buf); // nbytes in the buffer at the beginning
148 if (insize + m_nb >= c_MaxFileSize && m_filename != "/dev/null") {
149 B2INFO("SeqFile: previous file closed (size=" << m_nb << " bytes)");
150 m_nfile++;
151 auto file = m_filename + '-' + std::to_string(m_nfile);
152 if (!m_filenamePattern.empty()) {
153 file = (boost::format(m_filenamePattern) % m_nfile).str();
154 }
155 openFile(file, false);
156 if (m_fd < 0) {
157 B2FATAL("SeqFile::write() error opening file '" << file << "': " << strerror(errno));
158 }
159 m_nb = 0;
160 B2INFO("SeqFile::write() opened '" << file << "'");
161 // update stream pointer since we reopened the file
162 out = dynamic_cast<std::ostream*>(m_stream.get());
163 }
164 try {
165 out->write(buf, insize);
166 m_nb += insize;
167 return insize;
168 } catch (ios_base::failure& e) {
169 B2ERROR("SeqFile::write() error: " << e.what() << ", " << strerror(errno));
170 return 0;
171 }
172}
static const int c_MaxFileSize
maximal size of one file (in Bytes).
Definition: SeqFile.h:57

Member Data Documentation

◆ c_MaxFileSize

const int c_MaxFileSize {512000000 * 4}
staticprivate

maximal size of one file (in Bytes).

Definition at line 57 of file SeqFile.h.

◆ m_compressed

bool m_compressed {false}
private

is file gzipped compressed?

Definition at line 64 of file SeqFile.h.

◆ m_fd

int m_fd { -1}
private

file descriptor.

Definition at line 61 of file SeqFile.h.

◆ m_filename

std::string m_filename
private

Name of the opened file.

Definition at line 59 of file SeqFile.h.

◆ m_filenamePattern

std::string m_filenamePattern
private

Pattern for creating the file from the sequence number.

Definition at line 60 of file SeqFile.h.

◆ m_nb

int m_nb {0}
private

when saving a file, the total number of bytes written, 0 when reading.

Definition at line 62 of file SeqFile.h.

◆ m_nfile

int m_nfile {0}
private

file counter, starting at 0 (files are split after c_MaxFileSize bytes).

Definition at line 63 of file SeqFile.h.

◆ m_stream

std::unique_ptr<std::ios> m_stream
private

pointer to the filtering input or output stream

Definition at line 65 of file SeqFile.h.

◆ m_streamerinfo

char* m_streamerinfo
private

StreamerInfo.

Definition at line 68 of file SeqFile.h.

◆ m_streamerinfo_size

int m_streamerinfo_size
private

size(bytes) of StreamerInfo

Definition at line 71 of file SeqFile.h.


The documentation for this class was generated from the following files: