Belle II Software  release-05-01-25
MsgHandler.cc
1 //+
2 // File : MsgHandler.cc
3 // Description : Encode/Decode EvtMessage
4 //
5 // Author : SooHyung Lee, Ryosuke Itoh
6 // Date : 31 - Jul - 2008
7 // Modified : 4 - Jun - 2010
8 //-
9 
10 #include <framework/pcore/MsgHandler.h>
11 #include <framework/logging/Logger.h>
12 
13 #include <TObject.h>
14 #include <TMessage.h>
15 #include <RZip.h>
16 
17 using namespace std;
18 using namespace Belle2;
19 
20 
21 namespace {
23  const static int c_maxObjectSizeBytes = 50000000; //50MB
24 }
25 
26 MsgHandler::MsgHandler(int complevel):
27  m_buf(100000),
28  m_compBuf(0),
29  m_msg(new TMessage(kMESS_OBJECT))
30 {
31  m_complevel = complevel;
32 
33  //Schema evolution is needed to stream genfit tracks
34  //If disabled, streamers will crash when reading data.
35  TMessage::EnableSchemaEvolutionForAll();
36  m_msg->SetWriteMode();
37 }
38 
39 MsgHandler::~MsgHandler() = default;
40 
42 {
43  m_buf.clear();
44  m_compBuf.clear();
45 }
46 
47 void MsgHandler::add(const TObject* obj, const string& name)
48 {
49  m_msg->WriteObject(obj);
50 
51  int len = m_msg->Length();
52  char* buf = m_msg->Buffer();
53 
54  if (len > c_maxObjectSizeBytes) {
55  B2WARNING("MsgHandler: Object " << name << " is very large (" << len << " bytes), parallel processing may be slow.");
56  }
57 
58  // Put name of object in output buffer including a final 0-byte
59  UInt_t nameLength = name.size() + 1;
60  m_buf.add(&nameLength, sizeof(nameLength));
61  m_buf.add(name.c_str(), nameLength);
62  // Copy object into buffer
63  m_buf.add(&len, sizeof(len));
64  m_buf.add(buf, len);
65  m_msg->Reset();
66 }
67 
69 {
70  if (rectype == MSG_TERMINATE) {
71  auto* eod = new EvtMessage(nullptr, 0, rectype);
72  return eod;
73  }
74 
75  // which buffer to send? defaults to uncompressed
76  auto buf = &m_buf;
77  unsigned int flags = 0;
78  // but if we have compression enabled then please compress.
79  if (m_complevel > 0) {
80  // make sure buffer for the compression is big enough.
82  // And call the root compression function
83  const int algorithm = m_complevel / 100;
84  const int level = m_complevel % 100;
85  int irep{0}, nin{(int)m_buf.size()}, nout{nin};
86  R__zipMultipleAlgorithm(level, &nin, m_buf.data(), &nout, m_compBuf.data(), &irep,
87  (ROOT::RCompressionSetting::EAlgorithm::EValues) algorithm);
88  // it returns the number of bytes of the output in irep. If that is zero or
89  // to big compression failed and we transmit uncompressed.
90  if (irep > 0 && irep <= nin) {
91  //set correct size of compressed message
92  m_compBuf.resize(irep);
93  // and set pointer to correct buffer for creating message
94  buf = &m_compBuf;
95  // also add a flag indicating it's compressed
97  }
98  }
99 
100  auto* evtmsg = new EvtMessage(buf->data(), buf->size(), rectype);
101  evtmsg->setMsgFlags(flags);
102  clear();
103 
104  return evtmsg;
105 }
106 
107 void MsgHandler::decode_msg(EvtMessage* msg, vector<TObject*>& objlist,
108  vector<string>& namelist)
109 {
110  const char* msgptr = msg->msg();
111  const char* end = msgptr + msg->msg_size();
112 
114  // apparently message is compressed, let's decompress
115  m_compBuf.clear();
116  int nzip{0}, nout{0};
117  // ROOT wants unsigned char so make a new pointer to the data
118  auto* zipptr = (unsigned char*) msgptr;
119  // and uncompress everything
120  while (zipptr < (unsigned char*)end) {
121  // first get a header of the next block so we know how big the output will be
122  if (R__unzip_header(&nzip, zipptr, &nout) != 0) {
123  B2FATAL("Cannot uncompress message header");
124  }
125  // no more output? fine
126  if (!nout) break;
127  if (std::distance(zipptr, (unsigned char*) end) > nzip) {
128  B2FATAL("Not enough bytes left to uncompress");
129  }
130  // otherwise make sure output buffer is large enough
131  int old_size = m_compBuf.size();
132  m_compBuf.resize(old_size + nout);
133  // and uncompress, the amount of bytes will be returned as irep
134  int irep{0};
135  R__unzip(&nzip, zipptr, &nout, (unsigned char*)(m_compBuf.data() + old_size), &irep);
136  // if that is not positive an error happend, bail
137  if (irep <= 0) {
138  B2FATAL("Cannot uncompress message");
139  }
140  // otherwise advance pointer by the amount of bytes compressed bytes in the block
141  zipptr += nzip;
142  }
143  // ok, decompressed succesfully, set msg pointer to the correct area
144  msgptr = m_compBuf.data();
145  end = msgptr + m_compBuf.size();
146  }
147 
148  while (msgptr < end) {
149  // Restore object name
150  UInt_t nameLength;
151  memcpy(&nameLength, msgptr, sizeof(nameLength));
152  msgptr += sizeof(nameLength);
153  if (nameLength == 0 || std::distance(msgptr, end) < nameLength)
154  B2FATAL("Buffer overrun while decoding object name, check length fields!");
155 
156  // read full string but omit final 0-byte. This safeguards against strings containing 0-bytes
157  namelist.emplace_back(msgptr, nameLength - 1);
158  msgptr += nameLength;
159 
160  // Restore object
161  UInt_t objlen;
162  memcpy(&objlen, msgptr, sizeof(objlen));
163  msgptr += sizeof(objlen);
164  if (objlen == 0 || std::distance(msgptr, end) < objlen)
165  B2FATAL("Buffer overrun while decoding object, check length fields!");
166 
167  m_inMsg.SetBuffer(msgptr, objlen);
168  objlist.push_back(m_inMsg.readTObject());
169  msgptr += objlen;
170  //no need to call InMessage::Reset() here (done in SetBuffer())
171  }
172 }
Belle2::MsgHandler::m_compBuf
CharBuffer m_compBuf
EvtMessage character buffer for compressing/decompressing.
Definition: MsgHandler.h:132
Belle2::MsgHandler::m_msg
std::unique_ptr< TMessage > m_msg
Used for serialising objects into m_buf.
Definition: MsgHandler.h:133
Belle2::InMessage::SetBuffer
void SetBuffer(const void *ptr, UInt_t bufsize)
Replace buffer (doesn't take ownership).
Definition: MsgHandler.h:86
Belle2::EvtMessage
Class to manage streamed object.
Definition: EvtMessage.h:60
Belle2::MsgHandler::~MsgHandler
virtual ~MsgHandler()
Destructor.
Belle2::CharBuffer::resize
void resize(size_t size)
resize, similar to std::vector<char>::resize in that it will copy the existing buffer to a new,...
Definition: MsgHandler.h:61
Belle2::MsgHandler::encode_msg
virtual EvtMessage * encode_msg(ERecordType rectype)
Stream object list into an EvtMessage.
Definition: MsgHandler.cc:68
Belle2::CharBuffer::data
char * data()
return raw pointer.
Definition: MsgHandler.h:53
Belle2::MsgHandler::clear
virtual void clear()
Clear object list.
Definition: MsgHandler.cc:41
Belle2::MsgHandler::m_inMsg
InMessage m_inMsg
Used for deserializing in decode_msg()
Definition: MsgHandler.h:134
Belle2::EvtMessage::hasMsgFlags
bool hasMsgFlags(unsigned int flags) const
Check if the message has the given flags.
Definition: EvtMessage.h:116
Belle2::EvtMessage::msg
char * msg()
Get pointer to message body.
Definition: EvtMessage.cc:173
Belle2::MsgHandler::m_buf
CharBuffer m_buf
EvtMessage character buffer for encode_msg().
Definition: MsgHandler.h:131
Belle2::CharBuffer::clear
void clear()
reset (without deallocating)
Definition: MsgHandler.h:57
Belle2::CharBuffer::add
void add(const void *data, size_t len)
copy data to end of buffer, expanding buffer if needed.
Definition: MsgHandler.h:44
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::MsgHandler::m_complevel
int m_complevel
compression algorithm * 100 + compression level.
Definition: MsgHandler.h:135
Belle2::ERecordType
ERecordType
What type of message is this?
Definition: EvtMessage.h:26
Belle2::MsgHandler::decode_msg
virtual void decode_msg(EvtMessage *msg, std::vector< TObject * > &objlist, std::vector< std::string > &namelist)
Decode an EvtMessage into a vector list of objects with names.
Definition: MsgHandler.cc:107
Belle2::EvtMessage::c_MsgCompressed
@ c_MsgCompressed
indicates that the message body is compressed and should be uncompressed using ROOT R__unzip_header a...
Definition: EvtMessage.h:70
Belle2::InMessage::readTObject
TObject * readTObject()
Read one object from the message assuming it inherits from TObject.
Definition: MsgHandler.h:100
Belle2::CharBuffer::size
size_t size() const
return buffer size (do not access data() beyond this)
Definition: MsgHandler.h:55
Belle2::EvtMessage::msg_size
int msg_size() const
Get size of message body.
Definition: EvtMessage.cc:109
Belle2::MsgHandler::add
virtual void add(const TObject *, const std::string &name)
Add an object to be streamed.
Definition: MsgHandler.cc:47