9 #include <framework/pcore/MsgHandler.h>
10 #include <framework/logging/Logger.h>
22 const static int c_maxObjectSizeBytes = 50000000;
25 MsgHandler::MsgHandler(
int complevel):
28 m_msg(new TMessage(kMESS_OBJECT))
34 TMessage::EnableSchemaEvolutionForAll();
35 m_msg->SetWriteMode();
48 m_msg->WriteObject(obj);
50 int len =
m_msg->Length();
51 char* buf =
m_msg->Buffer();
53 if (len > c_maxObjectSizeBytes) {
54 B2WARNING(
"MsgHandler: Object " << name <<
" is very large (" << len <<
" bytes), parallel processing may be slow.");
58 UInt_t nameLength = name.size() + 1;
59 m_buf.
add(&nameLength,
sizeof(nameLength));
69 if (rectype == MSG_TERMINATE) {
70 auto* eod =
new EvtMessage(
nullptr, 0, rectype);
76 unsigned int flags = 0;
84 int irep{0}, nin{(int)
m_buf.
size()}, nout{nin};
86 (ROOT::RCompressionSetting::EAlgorithm::EValues) algorithm);
89 if (irep > 0 && irep <= nin) {
99 auto* evtmsg =
new EvtMessage(buf->data(), buf->size(), rectype);
100 evtmsg->setMsgFlags(flags);
107 vector<string>& namelist)
109 const char* msgptr = msg->
msg();
110 const char* end = msgptr + msg->
msg_size();
115 int nzip{0}, nout{0};
117 auto* zipptr = (
unsigned char*) msgptr;
119 while (zipptr < (
unsigned char*)end) {
121 if (R__unzip_header(&nzip, zipptr, &nout) != 0) {
122 B2FATAL(
"Cannot uncompress message header");
126 if (std::distance(zipptr, (
unsigned char*) end) > nzip) {
127 B2FATAL(
"Not enough bytes left to uncompress");
134 R__unzip(&nzip, zipptr, &nout, (
unsigned char*)(
m_compBuf.
data() + old_size), &irep);
137 B2FATAL(
"Cannot uncompress message");
147 while (msgptr < end) {
150 memcpy(&nameLength, msgptr,
sizeof(nameLength));
151 msgptr +=
sizeof(nameLength);
152 if (nameLength == 0 || std::distance(msgptr, end) < nameLength)
153 B2FATAL(
"Buffer overrun while decoding object name, check length fields!");
156 namelist.emplace_back(msgptr, nameLength - 1);
157 msgptr += nameLength;
161 memcpy(&objlen, msgptr,
sizeof(objlen));
162 msgptr +=
sizeof(objlen);
163 if (objlen == 0 || std::distance(msgptr, end) < objlen)
164 B2FATAL(
"Buffer overrun while decoding object, check length fields!");
size_t size() const
return buffer size (do not access data() beyond this)
void resize(size_t size)
resize, similar to std::vector<char>::resize in that it will copy the existing buffer to a new,...
char * data()
return raw pointer.
void clear()
reset (without deallocating)
void add(const void *data, size_t len)
copy data to end of buffer, expanding buffer if needed.
Class to manage streamed object.
int msg_size() const
Get size of message body.
bool hasMsgFlags(unsigned int flags) const
Check if the message has the given flags.
char * msg()
Get pointer to message body.
@ c_MsgCompressed
indicates that the message body is compressed and should be uncompressed using ROOT R__unzip_header a...
void SetBuffer(const void *ptr, UInt_t bufsize)
Replace buffer (doesn't take ownership).
TObject * readTObject()
Read one object from the message assuming it inherits from TObject.
virtual ~MsgHandler()
Destructor.
int m_complevel
compression algorithm * 100 + compression level.
InMessage m_inMsg
Used for deserializing in decode_msg()
virtual void decode_msg(EvtMessage *msg, std::vector< TObject * > &objlist, std::vector< std::string > &namelist)
Decode an EvtMessage into a vector list of objects with names.
CharBuffer m_buf
EvtMessage character buffer for encode_msg().
virtual void add(const TObject *, const std::string &name)
Add an object to be streamed.
CharBuffer m_compBuf
EvtMessage character buffer for compressing/decompressing.
std::unique_ptr< TMessage > m_msg
Used for serialising objects into m_buf.
virtual EvtMessage * encode_msg(ERecordType rectype)
Stream object list into an EvtMessage.
virtual void clear()
Clear object list.
ERecordType
What type of message is this?
Abstract base class for different kinds of events.