9 #include <framework/pcore/MsgHandler.h> 
   10 #include <framework/logging/Logger.h> 
   22   const static int c_maxObjectSizeBytes = 50000000; 
 
   25 MsgHandler::MsgHandler(
int complevel):
 
   28   m_msg(new TMessage(kMESS_OBJECT))
 
   34   TMessage::EnableSchemaEvolutionForAll();
 
   35   m_msg->SetWriteMode();
 
   48   m_msg->WriteObject(obj);
 
   50   int len = 
m_msg->Length();
 
   51   char* buf = 
m_msg->Buffer();
 
   53   if (len > c_maxObjectSizeBytes) {
 
   54     B2WARNING(
"MsgHandler: Object " << name << 
" is very large (" << len  << 
" bytes), parallel processing may be slow.");
 
   58   UInt_t nameLength = name.size() + 1;
 
   59   m_buf.
add(&nameLength, 
sizeof(nameLength));
 
   69   if (rectype == MSG_TERMINATE) {
 
   70     auto* eod = 
new EvtMessage(
nullptr, 0, rectype);
 
   76   unsigned int flags = 0;
 
   84     int irep{0}, nin{(int)
m_buf.
size()}, nout{nin};
 
   86                             (ROOT::RCompressionSetting::EAlgorithm::EValues) algorithm);
 
   89     if (irep > 0 && irep <= nin) {
 
   99   auto* evtmsg = 
new EvtMessage(buf->data(), buf->size(), rectype);
 
  100   evtmsg->setMsgFlags(flags);
 
  107                             vector<string>& namelist)
 
  109   const char* msgptr = msg->
msg();
 
  110   const char* end = msgptr + msg->
msg_size();
 
  115     int nzip{0}, nout{0};
 
  117     auto* zipptr = (
unsigned char*) msgptr;
 
  119     while (zipptr < (
unsigned char*)end) {
 
  121       if (R__unzip_header(&nzip, zipptr, &nout) != 0) {
 
  122         B2FATAL(
"Cannot uncompress message header");
 
  126       if (std::distance(zipptr, (
unsigned char*) end) > nzip) {
 
  127         B2FATAL(
"Not enough bytes left to uncompress");
 
  134       R__unzip(&nzip, zipptr, &nout, (
unsigned char*)(
m_compBuf.
data() + old_size), &irep);
 
  137         B2FATAL(
"Cannot uncompress message");
 
  147   while (msgptr < end) {
 
  150     memcpy(&nameLength, msgptr, 
sizeof(nameLength));
 
  151     msgptr += 
sizeof(nameLength);
 
  152     if (nameLength == 0 || std::distance(msgptr, end) < nameLength)
 
  153       B2FATAL(
"Buffer overrun while decoding object name, check length fields!");
 
  156     namelist.emplace_back(msgptr, nameLength - 1);
 
  157     msgptr += nameLength;
 
  161     memcpy(&objlen, msgptr, 
sizeof(objlen));
 
  162     msgptr += 
sizeof(objlen);
 
  163     if (objlen == 0 || std::distance(msgptr, end) < objlen)
 
  164       B2FATAL(
"Buffer overrun while decoding object, check length fields!");
 
size_t size() const
return buffer size (do not access data() beyond this)
void resize(size_t size)
resize, similar to std::vector<char>::resize in that it will copy the existing buffer to a new,...
char * data()
return raw pointer.
void clear()
reset (without deallocating)
void add(const void *data, size_t len)
copy data to end of buffer, expanding buffer if needed.
Class to manage streamed object.
int msg_size() const
Get size of message body.
bool hasMsgFlags(unsigned int flags) const
Check if the message has the given flags.
char * msg()
Get pointer to message body.
@ c_MsgCompressed
indicates that the message body is compressed and should be uncompressed using ROOT R__unzip_header a...
void SetBuffer(const void *ptr, UInt_t bufsize)
Replace buffer (doesn't take ownership).
TObject * readTObject()
Read one object from the message assuming it inherits from TObject.
virtual ~MsgHandler()
Destructor.
int m_complevel
compression algorithm * 100 + compression level.
InMessage m_inMsg
Used for deserializing in decode_msg()
virtual void decode_msg(EvtMessage *msg, std::vector< TObject * > &objlist, std::vector< std::string > &namelist)
Decode an EvtMessage into a vector list of objects with names.
CharBuffer m_buf
EvtMessage character buffer for encode_msg().
virtual void add(const TObject *, const std::string &name)
Add an object to be streamed.
CharBuffer m_compBuf
EvtMessage character buffer for compressing/decompressing.
std::unique_ptr< TMessage > m_msg
Used for serialising objects into m_buf.
virtual EvtMessage * encode_msg(ERecordType rectype)
Stream object list into an EvtMessage.
virtual void clear()
Clear object list.
ERecordType
What type of message is this?
Abstract base class for different kinds of events.