Belle II Software development
MsgHandler.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <framework/pcore/MsgHandler.h>
10#include <framework/logging/Logger.h>
11
12#include <TObject.h>
13#include <TMessage.h>
14#include <RZip.h>
15
16using namespace std;
17using namespace Belle2;
18
19
20namespace {
22 const static int c_maxObjectSizeBytes = 50000000; //50MB
23}
24
26 m_buf(100000),
27 m_compBuf(0),
28 m_msg(new TMessage(kMESS_OBJECT))
29{
30 m_complevel = complevel;
31
32 //Schema evolution is needed to stream genfit tracks
33 //If disabled, streamers will crash when reading data.
34 TMessage::EnableSchemaEvolutionForAll();
35 m_msg->SetWriteMode();
36}
37
38MsgHandler::~MsgHandler() = default;
39
41{
42 m_buf.clear();
44}
45
46void MsgHandler::add(const TObject* obj, const string& name)
47{
48 m_msg->WriteObject(obj);
49
50 int len = m_msg->Length();
51 char* buf = m_msg->Buffer();
52
53 if (len > c_maxObjectSizeBytes) {
54 B2WARNING("MsgHandler: Object " << name << " is very large (" << len << " bytes), parallel processing may be slow.");
55 }
56
57 // Put name of object in output buffer including a final 0-byte
58 UInt_t nameLength = name.size() + 1;
59 m_buf.add(&nameLength, sizeof(nameLength));
60 m_buf.add(name.c_str(), nameLength);
61 // Copy object into buffer
62 m_buf.add(&len, sizeof(len));
63 m_buf.add(buf, len);
64 m_msg->Reset();
65}
66
68{
69 if (rectype == MSG_TERMINATE) {
70 auto* eod = new EvtMessage(nullptr, 0, rectype);
71 return eod;
72 }
73
74 // which buffer to send? defaults to uncompressed
75 auto buf = &m_buf;
76 unsigned int flags = 0;
77 // but if we have compression enabled then please compress.
78 if (m_complevel > 0) {
79 // make sure buffer for the compression is big enough.
81 // And call the root compression function
82 const int algorithm = m_complevel / 100;
83 const int level = m_complevel % 100;
84 int irep{0}, nin{(int)m_buf.size()}, nout{nin};
85 R__zipMultipleAlgorithm(level, &nin, m_buf.data(), &nout, m_compBuf.data(), &irep,
86 (ROOT::RCompressionSetting::EAlgorithm::EValues) algorithm);
87 // it returns the number of bytes of the output in irep. If that is zero or
88 // to big compression failed and we transmit uncompressed.
89 if (irep > 0 && irep <= nin) {
90 //set correct size of compressed message
91 m_compBuf.resize(irep);
92 // and set pointer to correct buffer for creating message
93 buf = &m_compBuf;
94 // also add a flag indicating it's compressed
96 }
97 }
98
99 auto* evtmsg = new EvtMessage(buf->data(), buf->size(), rectype);
100 evtmsg->setMsgFlags(flags);
101 clear();
102
103 return evtmsg;
104}
105
106void MsgHandler::decode_msg(EvtMessage* msg, vector<TObject*>& objlist,
107 vector<string>& namelist)
108{
109 const char* msgptr = msg->msg();
110 const char* end = msgptr + msg->msg_size();
111
113 // apparently message is compressed, let's decompress
115 int nzip{0}, nout{0};
116 // ROOT wants unsigned char so make a new pointer to the data
117 auto* zipptr = (unsigned char*) msgptr;
118 // and uncompress everything
119 while (zipptr < (unsigned char*)end) {
120 // first get a header of the next block so we know how big the output will be
121 if (R__unzip_header(&nzip, zipptr, &nout) != 0) {
122 B2FATAL("Cannot uncompress message header");
123 }
124 // no more output? fine
125 if (!nout) break;
126 if (std::distance(zipptr, (unsigned char*) end) > nzip) {
127 B2FATAL("Not enough bytes left to uncompress");
128 }
129 // otherwise make sure output buffer is large enough
130 int old_size = m_compBuf.size();
131 m_compBuf.resize(old_size + nout);
132 // and uncompress, the amount of bytes will be returned as irep
133 int irep{0};
134 R__unzip(&nzip, zipptr, &nout, (unsigned char*)(m_compBuf.data() + old_size), &irep);
135 // if that is not positive an error happend, bail
136 if (irep <= 0) {
137 B2FATAL("Cannot uncompress message");
138 }
139 // otherwise advance pointer by the amount of bytes compressed bytes in the block
140 zipptr += nzip;
141 }
142 // ok, decompressed succesfully, set msg pointer to the correct area
143 msgptr = m_compBuf.data();
144 end = msgptr + m_compBuf.size();
145 }
146
147 while (msgptr < end) {
148 // Restore object name
149 UInt_t nameLength;
150 memcpy(&nameLength, msgptr, sizeof(nameLength));
151 msgptr += sizeof(nameLength);
152 if (nameLength == 0 || std::distance(msgptr, end) < nameLength)
153 B2FATAL("Buffer overrun while decoding object name, check length fields!");
154
155 // read full string but omit final 0-byte. This safeguards against strings containing 0-bytes
156 namelist.emplace_back(msgptr, nameLength - 1);
157 msgptr += nameLength;
158
159 // Restore object
160 UInt_t objlen;
161 memcpy(&objlen, msgptr, sizeof(objlen));
162 msgptr += sizeof(objlen);
163 if (objlen == 0 || std::distance(msgptr, end) < objlen)
164 B2FATAL("Buffer overrun while decoding object, check length fields!");
165
166 m_inMsg.SetBuffer(msgptr, objlen);
167 objlist.push_back(m_inMsg.readTObject());
168 msgptr += objlen;
169 //no need to call InMessage::Reset() here (done in SetBuffer())
170 }
171}
size_t size() const
return buffer size (do not access data() beyond this)
Definition: MsgHandler.h:54
char * data()
return raw pointer.
Definition: MsgHandler.h:52
void resize(size_t size)
resize, similar to std::vector<char>::resize in that it will copy the existing buffer to a new,...
Definition: MsgHandler.h:60
void clear()
reset (without deallocating)
Definition: MsgHandler.h:56
void add(const void *data, size_t len)
copy data to end of buffer, expanding buffer if needed.
Definition: MsgHandler.h:43
Class to manage streamed object.
Definition: EvtMessage.h:59
int msg_size() const
Get size of message body.
Definition: EvtMessage.cc:108
bool hasMsgFlags(unsigned int flags) const
Check if the message has the given flags.
Definition: EvtMessage.h:115
char * msg()
Get pointer to message body.
Definition: EvtMessage.cc:172
@ c_MsgCompressed
indicates that the message body is compressed and should be uncompressed using ROOT R__unzip_header a...
Definition: EvtMessage.h:69
void SetBuffer(const void *ptr, UInt_t bufsize)
Replace buffer (doesn't take ownership).
Definition: MsgHandler.h:85
TObject * readTObject()
Read one object from the message assuming it inherits from TObject.
Definition: MsgHandler.h:99
virtual ~MsgHandler()
Destructor.
int m_complevel
compression algorithm * 100 + compression level.
Definition: MsgHandler.h:134
InMessage m_inMsg
Used for deserializing in decode_msg()
Definition: MsgHandler.h:133
virtual void decode_msg(EvtMessage *msg, std::vector< TObject * > &objlist, std::vector< std::string > &namelist)
Decode an EvtMessage into a vector list of objects with names.
Definition: MsgHandler.cc:106
MsgHandler(int complevel=0)
Constructor.
Definition: MsgHandler.cc:25
CharBuffer m_buf
EvtMessage character buffer for encode_msg().
Definition: MsgHandler.h:130
virtual void add(const TObject *, const std::string &name)
Add an object to be streamed.
Definition: MsgHandler.cc:46
CharBuffer m_compBuf
EvtMessage character buffer for compressing/decompressing.
Definition: MsgHandler.h:131
std::unique_ptr< TMessage > m_msg
Used for serialising objects into m_buf.
Definition: MsgHandler.h:132
virtual EvtMessage * encode_msg(ERecordType rectype)
Stream object list into an EvtMessage.
Definition: MsgHandler.cc:67
virtual void clear()
Clear object list.
Definition: MsgHandler.cc:40
ERecordType
What type of message is this?
Definition: EvtMessage.h:25
Abstract base class for different kinds of events.
STL namespace.