Belle II Software light-2406-ragdoll
SeqFile.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <framework/pcore/SeqFile.h>
10#include <framework/logging/Logger.h>
11
12#include <ios>
13#include <fcntl.h>
14#include <cerrno>
15
16#include <boost/iostreams/device/file_descriptor.hpp>
17#include <boost/iostreams/filter/gzip.hpp>
18#include <boost/iostreams/filtering_stream.hpp>
19#include <boost/format.hpp>
20
21using namespace Belle2;
22using namespace std;
23namespace io = boost::iostreams;
24
25SeqFile::SeqFile(const std::string& filename, const std::string& rwflag, char* streamerinfo, int streamerinfo_size,
26 bool filenameIsPattern):
27 m_filename(filename)
28{
29 if (m_filename.empty()) {
30 B2ERROR("SeqFile: Empty filename given");
31 return;
32 }
33 bool readonly = rwflag.find('w') == std::string::npos;
34 // CAF use URL input style and add a prefix that need to be removed for sroot files
35 if (m_filename.compare(0, 7, "file://") == 0) {
36 m_filename = m_filename.substr(7, m_filename.size() - 7);
37 }
38 // is the file already compressed?
39 m_compressed = m_filename.size() > 3 && m_filename.compare(m_filename.size() - 3, 3, ".gz") == 0;
40 // strip .gz suffix to add it at the end automatically and correctly for subsequent files
41 if (m_compressed) {
42 m_filename = m_filename.substr(0, m_filename.size() - 3);
43 }
44 // check if we want different naming scheme using boost::format
45 if (filenameIsPattern) {
47 try {
48 m_filename = (boost::format(m_filenamePattern) % m_nfile).str();
49 } catch (std::exception& e) {
50 B2FATAL("Cannot use filename pattern" << m_filenamePattern << ": " << e.what());
51 }
52 }
53
54 // Store StreamerInfo 2017.5.8
55 m_streamerinfo = nullptr;
57 if (streamerinfo != nullptr && streamerinfo_size > 0) {
58 m_streamerinfo_size = streamerinfo_size;
60 memcpy(m_streamerinfo, streamerinfo, m_streamerinfo_size);
61 }
62
63 // open the file
64 openFile(m_filename, readonly);
65 // if that fails and it's not already assumed to be compressed try again adding .gz to the name
66 if (m_fd < 0 && !m_compressed) {
67 B2WARNING("SeqFile: error opening '" << m_filename << "': " << strerror(errno)
68 << ", trying again with '.gz'");
69 m_compressed = true;
70 openFile(m_filename, readonly);
71 }
72 // is the file open now?
73 if (m_fd < 0) {
74 B2ERROR("SeqFile: error opening '" << m_filename << "': " << strerror(errno));
75 } else {
76 B2INFO("SeqFile: " << m_filename << " opened (fd=" << m_fd << ")");
77 }
78
79}
80
81void SeqFile::openFile(std::string filename, bool readonly)
82{
83
84 // add compression suffix if file is supposed to be compressed
85 if (m_compressed) filename += ".gz";
86 if (!readonly) {
87 //open file in create mode and set stream correctly
88 m_fd = open(filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0664);
89 auto filter = new io::filtering_ostream();
90 if (m_compressed) filter->push(io::gzip_compressor());
91 filter->push(io::file_descriptor_sink(m_fd, io::close_handle));
92 filter->exceptions(ios_base::badbit | ios_base::failbit);
93 m_stream.reset(filter);
94
95 //
96 // Write StreamerInfo (2017.5.8)
97 //
98 if (m_streamerinfo == nullptr || m_streamerinfo_size <= 0) {
99 // If you want to use SeqFile for non-sroot file type, please skip this B2FATAL
100 B2FATAL("Invalid size of StreamerInfo : " << m_streamerinfo_size << "bytes");
101 } else {
102 auto* out = dynamic_cast<std::ostream*>(m_stream.get());
103 if (!out) {
104 B2FATAL("SeqFile::write() called on a file opened in read mode");
105 }
106 try {
108 B2INFO("Wrote StreamerInfo at the begenning of the file. : " << m_streamerinfo_size << "bytes");
109 } catch (ios_base::failure& e) {
110
111 B2ERROR("SeqFile::openFile() error: " << e.what() << ", " << strerror(errno));
112 }
113 }
114
115 } else {
116 //open file in read mode and set stream correctly
117 m_fd = open(filename.c_str(), O_RDONLY);
118 auto filter = new io::filtering_istream();
119 if (m_compressed) filter->push(io::gzip_decompressor());
120 filter->push(io::file_descriptor_source(m_fd, io::close_handle));
121 filter->exceptions(ios_base::badbit | ios_base::failbit);
122 m_stream.reset(filter);
123 }
124 // reset number of written bytes (does not include streamerinfo )
125 m_nb = 0;
126}
127
129{
130 if (m_streamerinfo != nullptr) delete m_streamerinfo;
131 B2INFO("Closing SeqFile " << m_nfile);
132 //closed automatically by m_stream.
133}
134
136{
137 return m_fd;
138}
139
140int SeqFile::write(const char* buf)
141{
142 // cast stream object
143 auto* out = dynamic_cast<std::ostream*>(m_stream.get());
144 if (!out) {
145 B2FATAL("SeqFile::write() called on a file opened in read mode");
146 }
147 int insize = *((int*)buf); // nbytes in the buffer at the beginning
148 if (insize + m_nb >= c_MaxFileSize && m_filename != "/dev/null") {
149 B2INFO("SeqFile: previous file closed (size=" << m_nb << " bytes)");
150 m_nfile++;
151 auto file = m_filename + '-' + std::to_string(m_nfile);
152 if (!m_filenamePattern.empty()) {
153 file = (boost::format(m_filenamePattern) % m_nfile).str();
154 }
155 openFile(file, false);
156 if (m_fd < 0) {
157 B2FATAL("SeqFile::write() error opening file '" << file << "': " << strerror(errno));
158 }
159 m_nb = 0;
160 B2INFO("SeqFile::write() opened '" << file << "'");
161 // update stream pointer since we reopened the file
162 out = dynamic_cast<std::ostream*>(m_stream.get());
163 }
164 try {
165 out->write(buf, insize);
166 m_nb += insize;
167 return insize;
168 } catch (ios_base::failure& e) {
169 B2ERROR("SeqFile::write() error: " << e.what() << ", " << strerror(errno));
170 return 0;
171 }
172}
173
174int SeqFile::read(char* buf, int size)
175{
176 // cast stream object
177 auto* in = dynamic_cast<std::istream*>(m_stream.get());
178 if (!in) {
179 B2FATAL("SeqFile::read() called on a file opened in write mode");
180 }
181 //trigger eof if there's nothing left int the file. Could throw an error on decompress failure
182 try {
183 in->peek();
184 } catch (ios_base::failure& e) {
185 B2ERROR("SeqFile::read() cannot read file: " << e.what());
186 return -1;
187 }
188 //ok, now we can open the next file reliably
189 if (in->eof()) {
190 // EOF of current file, search for next file
191 m_nfile++;
192 auto nextfile = m_filename + '-' + std::to_string(m_nfile);
193 if (!m_filenamePattern.empty()) {
194 nextfile = (boost::format(m_filenamePattern) % m_nfile).str();
195 }
196 openFile(nextfile, true);
197 if (m_fd < 0) return 0; // End of all files
198 // update the stream pointer
199 in = dynamic_cast<std::istream*>(m_stream.get());
200 B2INFO("SeqFile::read() opened '" << nextfile << "'");
201 }
202 try {
203 // Obtain new header
204 in->read(buf, sizeof(int));
205 } catch (ios_base::failure& e) {
206 B2ERROR("SeqFile::read() " << e.what() << ": couldn't read next record size");
207 return -1;
208 }
209 // Normal processing, extract the record size from the first 4 bytes
210 int recsize = *((int*)buf);
211 if (recsize > size) {
212 B2ERROR("SeqFile::read() buffer too small, need at least " << recsize << " bytes");
213 return -1;
214 }
215 try {
216 in->read(buf + sizeof(int), recsize - sizeof(int));
217 } catch (ios_base::failure& e) {
218 B2ERROR("SeqFile::read() " << e.what() << ": could only read " << in->gcount() << " bytes, expected " << recsize);
219 return -1;
220 }
221 return recsize;
222}
~SeqFile()
Destructor.
Definition: SeqFile.cc:128
std::string m_filenamePattern
Pattern for creating the file from the sequence number.
Definition: SeqFile.h:60
int m_nb
when saving a file, the total number of bytes written, 0 when reading.
Definition: SeqFile.h:62
void openFile(std::string filename, bool readonly)
actually open the file
Definition: SeqFile.cc:81
int m_streamerinfo_size
size(bytes) of StreamerInfo
Definition: SeqFile.h:71
std::unique_ptr< std::ios > m_stream
pointer to the filtering input or output stream
Definition: SeqFile.h:65
bool m_compressed
is file gzipped compressed?
Definition: SeqFile.h:64
int status() const
Returns status after constructor call.
Definition: SeqFile.cc:135
static const int c_MaxFileSize
maximal size of one file (in Bytes).
Definition: SeqFile.h:57
SeqFile(const std::string &filename, const std::string &rwflag, char *streamerinfo=nullptr, int streamerinfo_size=0, bool filenameIsPattern=false)
Constructor.
Definition: SeqFile.cc:25
std::string m_filename
Name of the opened file.
Definition: SeqFile.h:59
int m_fd
file descriptor.
Definition: SeqFile.h:61
int m_nfile
file counter, starting at 0 (files are split after c_MaxFileSize bytes).
Definition: SeqFile.h:63
int write(const char *buf)
Write a record to a file.
Definition: SeqFile.cc:140
char * m_streamerinfo
StreamerInfo.
Definition: SeqFile.h:68
int read(char *buf, int max)
Read a record from a file.
Definition: SeqFile.cc:174
Abstract base class for different kinds of events.
Definition: ClusterUtils.h:24
STL namespace.