Belle II Software  release-08-01-10
MsgHandler.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <framework/pcore/MsgHandler.h>
10 #include <framework/logging/Logger.h>
11 
12 #include <TObject.h>
13 #include <TMessage.h>
14 #include <RZip.h>
15 
16 using namespace std;
17 using namespace Belle2;
18 
19 
20 namespace {
22  const static int c_maxObjectSizeBytes = 50000000; //50MB
23 }
24 
25 MsgHandler::MsgHandler(int complevel):
26  m_buf(100000),
27  m_compBuf(0),
28  m_msg(new TMessage(kMESS_OBJECT))
29 {
30  m_complevel = complevel;
31 
32  //Schema evolution is needed to stream genfit tracks
33  //If disabled, streamers will crash when reading data.
34  TMessage::EnableSchemaEvolutionForAll();
35  m_msg->SetWriteMode();
36 }
37 
38 MsgHandler::~MsgHandler() = default;
39 
41 {
42  m_buf.clear();
43  m_compBuf.clear();
44 }
45 
46 void MsgHandler::add(const TObject* obj, const string& name)
47 {
48  m_msg->WriteObject(obj);
49 
50  int len = m_msg->Length();
51  char* buf = m_msg->Buffer();
52 
53  if (len > c_maxObjectSizeBytes) {
54  B2WARNING("MsgHandler: Object " << name << " is very large (" << len << " bytes), parallel processing may be slow.");
55  }
56 
57  // Put name of object in output buffer including a final 0-byte
58  UInt_t nameLength = name.size() + 1;
59  m_buf.add(&nameLength, sizeof(nameLength));
60  m_buf.add(name.c_str(), nameLength);
61  // Copy object into buffer
62  m_buf.add(&len, sizeof(len));
63  m_buf.add(buf, len);
64  m_msg->Reset();
65 }
66 
68 {
69  if (rectype == MSG_TERMINATE) {
70  auto* eod = new EvtMessage(nullptr, 0, rectype);
71  return eod;
72  }
73 
74  // which buffer to send? defaults to uncompressed
75  auto buf = &m_buf;
76  unsigned int flags = 0;
77  // but if we have compression enabled then please compress.
78  if (m_complevel > 0) {
79  // make sure buffer for the compression is big enough.
81  // And call the root compression function
82  const int algorithm = m_complevel / 100;
83  const int level = m_complevel % 100;
84  int irep{0}, nin{(int)m_buf.size()}, nout{nin};
85  R__zipMultipleAlgorithm(level, &nin, m_buf.data(), &nout, m_compBuf.data(), &irep,
86  (ROOT::RCompressionSetting::EAlgorithm::EValues) algorithm);
87  // it returns the number of bytes of the output in irep. If that is zero or
88  // to big compression failed and we transmit uncompressed.
89  if (irep > 0 && irep <= nin) {
90  //set correct size of compressed message
91  m_compBuf.resize(irep);
92  // and set pointer to correct buffer for creating message
93  buf = &m_compBuf;
94  // also add a flag indicating it's compressed
96  }
97  }
98 
99  auto* evtmsg = new EvtMessage(buf->data(), buf->size(), rectype);
100  evtmsg->setMsgFlags(flags);
101  clear();
102 
103  return evtmsg;
104 }
105 
106 void MsgHandler::decode_msg(EvtMessage* msg, vector<TObject*>& objlist,
107  vector<string>& namelist)
108 {
109  const char* msgptr = msg->msg();
110  const char* end = msgptr + msg->msg_size();
111 
113  // apparently message is compressed, let's decompress
114  m_compBuf.clear();
115  int nzip{0}, nout{0};
116  // ROOT wants unsigned char so make a new pointer to the data
117  auto* zipptr = (unsigned char*) msgptr;
118  // and uncompress everything
119  while (zipptr < (unsigned char*)end) {
120  // first get a header of the next block so we know how big the output will be
121  if (R__unzip_header(&nzip, zipptr, &nout) != 0) {
122  B2FATAL("Cannot uncompress message header");
123  }
124  // no more output? fine
125  if (!nout) break;
126  if (std::distance(zipptr, (unsigned char*) end) > nzip) {
127  B2FATAL("Not enough bytes left to uncompress");
128  }
129  // otherwise make sure output buffer is large enough
130  int old_size = m_compBuf.size();
131  m_compBuf.resize(old_size + nout);
132  // and uncompress, the amount of bytes will be returned as irep
133  int irep{0};
134  R__unzip(&nzip, zipptr, &nout, (unsigned char*)(m_compBuf.data() + old_size), &irep);
135  // if that is not positive an error happend, bail
136  if (irep <= 0) {
137  B2FATAL("Cannot uncompress message");
138  }
139  // otherwise advance pointer by the amount of bytes compressed bytes in the block
140  zipptr += nzip;
141  }
142  // ok, decompressed succesfully, set msg pointer to the correct area
143  msgptr = m_compBuf.data();
144  end = msgptr + m_compBuf.size();
145  }
146 
147  while (msgptr < end) {
148  // Restore object name
149  UInt_t nameLength;
150  memcpy(&nameLength, msgptr, sizeof(nameLength));
151  msgptr += sizeof(nameLength);
152  if (nameLength == 0 || std::distance(msgptr, end) < nameLength)
153  B2FATAL("Buffer overrun while decoding object name, check length fields!");
154 
155  // read full string but omit final 0-byte. This safeguards against strings containing 0-bytes
156  namelist.emplace_back(msgptr, nameLength - 1);
157  msgptr += nameLength;
158 
159  // Restore object
160  UInt_t objlen;
161  memcpy(&objlen, msgptr, sizeof(objlen));
162  msgptr += sizeof(objlen);
163  if (objlen == 0 || std::distance(msgptr, end) < objlen)
164  B2FATAL("Buffer overrun while decoding object, check length fields!");
165 
166  m_inMsg.SetBuffer(msgptr, objlen);
167  objlist.push_back(m_inMsg.readTObject());
168  msgptr += objlen;
169  //no need to call InMessage::Reset() here (done in SetBuffer())
170  }
171 }
size_t size() const
return buffer size (do not access data() beyond this)
Definition: MsgHandler.h:54
void resize(size_t size)
resize, similar to std::vector<char>::resize in that it will copy the existing buffer to a new,...
Definition: MsgHandler.h:60
char * data()
return raw pointer.
Definition: MsgHandler.h:52
void clear()
reset (without deallocating)
Definition: MsgHandler.h:56
void add(const void *data, size_t len)
copy data to end of buffer, expanding buffer if needed.
Definition: MsgHandler.h:43
Class to manage streamed object.
Definition: EvtMessage.h:59
int msg_size() const
Get size of message body.
Definition: EvtMessage.cc:108
bool hasMsgFlags(unsigned int flags) const
Check if the message has the given flags.
Definition: EvtMessage.h:115
char * msg()
Get pointer to message body.
Definition: EvtMessage.cc:172
@ c_MsgCompressed
indicates that the message body is compressed and should be uncompressed using ROOT R__unzip_header a...
Definition: EvtMessage.h:69
void SetBuffer(const void *ptr, UInt_t bufsize)
Replace buffer (doesn't take ownership).
Definition: MsgHandler.h:85
TObject * readTObject()
Read one object from the message assuming it inherits from TObject.
Definition: MsgHandler.h:99
virtual ~MsgHandler()
Destructor.
int m_complevel
compression algorithm * 100 + compression level.
Definition: MsgHandler.h:134
InMessage m_inMsg
Used for deserializing in decode_msg()
Definition: MsgHandler.h:133
virtual void decode_msg(EvtMessage *msg, std::vector< TObject * > &objlist, std::vector< std::string > &namelist)
Decode an EvtMessage into a vector list of objects with names.
Definition: MsgHandler.cc:106
CharBuffer m_buf
EvtMessage character buffer for encode_msg().
Definition: MsgHandler.h:130
virtual void add(const TObject *, const std::string &name)
Add an object to be streamed.
Definition: MsgHandler.cc:46
CharBuffer m_compBuf
EvtMessage character buffer for compressing/decompressing.
Definition: MsgHandler.h:131
std::unique_ptr< TMessage > m_msg
Used for serialising objects into m_buf.
Definition: MsgHandler.h:132
virtual EvtMessage * encode_msg(ERecordType rectype)
Stream object list into an EvtMessage.
Definition: MsgHandler.cc:67
virtual void clear()
Clear object list.
Definition: MsgHandler.cc:40
ERecordType
What type of message is this?
Definition: EvtMessage.h:25
Abstract base class for different kinds of events.