Belle II Software  release-08-01-10
HLTStreamHelper.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <daq/hbasf2/utils/HLTStreamHelper.h>
9 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
10 #include <framework/pcore/MsgHandler.h>
11 #include <daq/dataobjects/SendHeader.h>
12 #include <daq/dataobjects/SendTrailer.h>
13 
14 #include <TH1F.h>
15 #include <TFile.h>
16 #include <TDirectory.h>
17 #include <TKey.h>
18 #include <TClass.h>
19 #include <TSystem.h>
20 #include <TROOT.h>
21 #include <TBufferJSON.h>
22 
23 #include <lz4.h>
24 #include <zmq.hpp>
25 
26 using namespace Belle2;
27 
28 namespace {
29  void streamHistogramImpl(TDirectory* curdir, Belle2::MsgHandler& msg, const std::string& dirName = "")
30  {
31  TList* keylist = curdir->GetList();
32 
33  TIter nextkey(keylist);
34  TKey* key = nullptr;
35 
36  while ((key = (TKey*)nextkey())) {
37  TObject* obj = curdir->Get(key->GetName());
38  TClass* objectClass = obj->IsA();
39  std::string objectName = dirName;
40  if (not objectName.empty()) {
41  objectName += "/";
42  }
43  objectName += obj->GetName();
44 
45  if (objectClass->InheritsFrom(TH1::Class())) {
46  auto* h1 = dynamic_cast<TH1*>(obj);
47  msg.add(h1, objectName);
48  } else if (objectClass->InheritsFrom(TDirectory::Class())) {
49  auto* tdir = dynamic_cast<TDirectory*>(obj);
50  // FIXME: Currently the dqm server does not understand multi-layer directory structures
51  // therefore I break this down to only show the last directory
52  streamHistogramImpl(tdir, msg, obj->GetName());
53  }
54  }
55  }
56 }
57 
59 {
60  m_streamHelper.initialize(0, true);
61 }
62 
63 void HLTStreamHelper::registerStoreObjects(bool addExpressRecoObjects)
64 {
65  m_eventMetaData.registerInDataStore();
66  m_rawSVDs.registerInDataStore();
67  m_rawCDCs.registerInDataStore();
68  m_rawTOPs.registerInDataStore();
69  m_rawARICHs.registerInDataStore();
70  m_rawECLs.registerInDataStore();
71  m_rawKLMs.registerInDataStore();
72  m_rawTRGs.registerInDataStore();
73  m_rawFTSWs.registerInDataStore();
74 
75  if (addExpressRecoObjects) {
76  m_randomGenerator.registerInDataStore(DataStore::c_DontWriteOut);
77  m_softwareTriggerResult.registerInDataStore();
78  m_softwareTriggerVariables.registerInDataStore();
79  m_triggerSummary.registerInDataStore();
80  m_rawPXDs.registerInDataStore();
81  m_roiPayload.registerInDataStore();
82  m_rois.registerInDataStore("ROIs");
83  }
84 }
85 
86 std::unique_ptr<ZMQNoIdMessage> HLTStreamHelper::streamRaw()
87 {
88  const auto eventMessage = m_streamHelper.stream(false, false);
89 
90  // Fill Header and Trailer
91  SendHeader hdr;
92  SendTrailer trl;
93 
94  // Number of total words
95  int msgsize = (eventMessage->size() - 1) / sizeof(int) + 1;
96  int total_nwrds = msgsize + hdr.GetHdrNwords() + trl.GetTrlNwords();
97 
98  // Fill header and trailer
99  hdr.SetNwords(total_nwrds);
100  hdr.SetNumEventsinPacket(1);
101  hdr.SetNumNodesinPacket(1);
102  hdr.SetEventNumber(m_eventMetaData->getEvent());
103  // hdr.SetExpRunWord(evtmeta->getRun());
104  hdr.SetSubRunNum(m_eventMetaData->getSubrun()); // modified on Apr. 20, 2016 by SY
105  hdr.SetRunNum(m_eventMetaData->getRun());
106  hdr.SetExpNum(m_eventMetaData->getExperiment());
107  hdr.SetNodeID(300);
108 
109  zmq::message_t rawMessage(total_nwrds * sizeof(int));
110  int* buffer = rawMessage.data<int>();
111 
112  // Fill header
113  memcpy(buffer, hdr.GetBuffer(), hdr.GetHdrNwords()*sizeof(int));
114 
115  // Fill EvtMessage
116  memcpy(buffer + hdr.GetHdrNwords(), eventMessage->buffer(), eventMessage->size());
117 
118  // Fill trailer
119  memcpy(buffer + hdr.GetHdrNwords() + msgsize, trl.GetBuffer(),
120  trl.GetTrlNwords()*sizeof(int));
121 
122  zmq::message_t roiMessage = getROIMessageIfViable();
123  auto zmqMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_rawDataMessage, std::move(rawMessage), std::move(roiMessage));
124 
125  return zmqMessage;
126 }
127 
128 std::unique_ptr<ZMQNoIdMessage> HLTStreamHelper::stream(bool addPersistentDurability, bool streamTransientObjects)
129 {
130  const auto eventMessage = m_streamHelper.stream(addPersistentDurability, streamTransientObjects);
131 
132  zmq::message_t roiMessage = getROIMessageIfViable();
133  auto zmqMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_eventMessage, eventMessage, std::move(roiMessage));
134  return zmqMessage;
135 }
136 
138 {
139  zmq::message_t roiMessage;
140 
141  if (m_roiPayload.isValid()) {
142  const size_t length = m_roiPayload->getPacketLengthByte();
143  const char* data = reinterpret_cast<const char*>(m_roiPayload->getRootdata());
144 
145  roiMessage = zmq::message_t(length);
146  char* tbuffer = roiMessage.data<char>();
147  memcpy(tbuffer, data, length);
148  }
149 
150  return roiMessage;
151 }
152 
153 std::unique_ptr<ZMQNoIdMessage> HLTStreamHelper::streamHistograms(bool compressed)
154 {
155  B2ASSERT("Event Meta Data not set!", m_eventMetaData.isValid());
156 
157  Belle2::MsgHandler msgHandler;
158  streamHistogramImpl(gDirectory, msgHandler);
159 
160  auto evtMessage = std::unique_ptr<EvtMessage>(msgHandler.encode_msg(Belle2::ERecordType::MSG_EVENT));
161 
162  EventMetaData& eventMetaData = *m_eventMetaData;
163  auto eventInformationString = TBufferJSON::ToJSON(&eventMetaData);
164  zmq::message_t additionalEventMessage(eventInformationString.Data(), eventInformationString.Length());
165 
166  if (not compressed) {
167  return Belle2::ZMQMessageFactory::createMessage(Belle2::EMessageTypes::c_rawDataMessage, evtMessage,
168  std::move(additionalEventMessage));
169  }
170 
171  // TODO: do I want to use compression everywhere? That is probably not worth it! -> test this!
172  if (m_outputBuffer.empty()) {
174  }
175 
176  B2DEBUG(10, "Size before compression " << evtMessage->size());
177  int size = m_maximalCompressedSize;
178  size = LZ4_compress_default(evtMessage->buffer(), &m_outputBuffer[0], evtMessage->size(), size);
179  B2ASSERT("Compression failed", size > 0);
180  B2DEBUG(10, "Size after compression " << size);
181 
182  zmq::message_t message(&m_outputBuffer[0], size);
183 
184  return Belle2::ZMQMessageFactory::createMessage(Belle2::EMessageTypes::c_compressedDataMessage,
185  std::move(message), std::move(additionalEventMessage));
186 }
187 
188 void HLTStreamHelper::read(std::unique_ptr<ZMQNoIdMessage> message)
189 {
190  if (message->isMessage(EMessageTypes::c_eventMessage)) {
191  m_streamHelper.read(std::move(message));
192  } else if (message->isMessage(EMessageTypes::c_rawDataMessage)) {
193  int* eventBuffer = message->getMessagePart<1>().data<int>();
194 
195  SendHeader sndhdr;
196  sndhdr.SetBuffer(eventBuffer);
197  int npackedevts = sndhdr.GetNumEventsinPacket();
198  if (npackedevts != 1) {
199  B2WARNING("Strange SendHeader : ");
200  // for (int i = 0; i < sndhdr.SENDHDR_NWORDS; i++) {
201  for (int i = 0; i < 10; i++) {
202  B2WARNING(std::hex << * (sndhdr.GetBuffer() + i));
203  }
204 
205  B2WARNING("Raw2DsModule::number of events in packet is not 1. This process gets stuck here. Please ABORT the system. (Please see discussion of daqcore channel in https://b2rc.kek.jp/ on 2017. Nov. 30. about why this is not FATAL message.");
206  sleep(86400);
207  }
208  int ncprs = sndhdr.GetNumNodesinPacket();
209  int nwords = sndhdr.GetTotalNwords() - SendHeader::SENDHDR_NWORDS - SendTrailer::SENDTRL_NWORDS;
210 
211  // Get buffer header
212  int* bufbody = eventBuffer + SendHeader::SENDHDR_NWORDS;
213 
214  // Unpack buffer
215  RawDataBlock tempdblk;
216  tempdblk.SetBuffer(bufbody, nwords, false, npackedevts, ncprs);
217 
218  unsigned int utime = 0;
219  unsigned int ctime = 0;
220  unsigned long long int mtime = 0;
221 
222  int store_time_flag = 0;
223  unsigned int error_flag = 0;
224 
225  // Store data contents in Corresponding RawXXXX
226  for (int cprid = 0; cprid < ncprs * npackedevts; cprid++) {
227  // Pick up one COPPER and copy data in a temporary buffer
228  int nwds_buf = tempdblk.GetBlockNwords(cprid);
229  int* cprbuf = new int[nwds_buf];
230  memcpy(cprbuf, tempdblk.GetBuffer(cprid), nwds_buf * 4);
231 
232  // Check FTSW
233  if (tempdblk.CheckFTSWID(cprid)) {
234  RawFTSW* ftsw = m_rawFTSWs.appendNew();
235  ftsw->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
236 
237  // Tentative for DESY TB 2017
238  utime = (unsigned int)(ftsw->GetTTUtime(0));
239  ctime = (unsigned int)(ftsw->GetTTCtime(0));
240  mtime = 1000000000 * (unsigned long long int)utime + (unsigned long long int)(std::round(ctime / 0.127216));
241  store_time_flag = 1;
242  continue;
243  } else if (store_time_flag == 0) {
244  // Tentative until RawFTSW data stream is established. 2018.5.28
245  // Not store RawCOPPER here. 2018.11.23
246  RawCOPPER tempcpr_time;
247  tempcpr_time.SetBuffer(cprbuf, nwds_buf, false, 1, 1);
248  utime = (unsigned int)(tempcpr_time.GetTTUtime(0));
249  ctime = (unsigned int)(tempcpr_time.GetTTCtime(0));
250  mtime = 1000000000 * (unsigned long long int)utime + (unsigned long long int)(std::round(ctime / 0.127216));
251  store_time_flag = 1;
252  }
253 
254  RawCOPPER tempcpr;
255  tempcpr.SetBuffer(cprbuf, nwds_buf, false, 1, 1);
256  int subsysid = tempcpr.GetNodeID(0);
257  error_flag |= (unsigned int)(tempcpr.GetDataType(0));
258 
259  // Switch to each detector and register RawXXX
260  if ((subsysid & DETECTOR_MASK) == CDC_ID) {
261  (m_rawCDCs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
262  } else if ((subsysid & DETECTOR_MASK) == SVD_ID) {
263  (m_rawSVDs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
264  } else if ((subsysid & DETECTOR_MASK) == BECL_ID) {
265  (m_rawECLs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
266  } else if ((subsysid & DETECTOR_MASK) == EECL_ID) {
267  (m_rawECLs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
268  } else if ((subsysid & DETECTOR_MASK) == TOP_ID) {
269  (m_rawTOPs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
270  } else if ((subsysid & DETECTOR_MASK) == ARICH_ID) {
271  (m_rawARICHs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
272  } else if ((subsysid & DETECTOR_MASK) == BKLM_ID) {
273  (m_rawKLMs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
274  } else if ((subsysid & DETECTOR_MASK) == EKLM_ID) {
275  (m_rawKLMs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
276  } else if (((subsysid & DETECTOR_MASK) & 0xF0000000) == TRGDATA_ID) {
277  (m_rawTRGs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
278  } else {
279  // Do not store Unknown RawCOPPER object. 2018.11.25
280  B2WARNING("Unknown COPPER ID : ");
281  for (int i = 0; i < 12; i++) {
282  B2WARNING(std::hex << cprbuf[i]);
283  }
284  B2FATAL("Unknown COPPER ID is found. CPRID = " << std::hex << subsysid << " Please check. Exiting...");
285  }
286  }
287 
288  if (store_time_flag != 1) {
289  B2FATAL("No time information could be extracted from Data. That should not happen. Exiting...");
290  }
291 
292  m_eventMetaData.create();
293  m_eventMetaData->setExperiment(sndhdr.GetExpNum());
294  m_eventMetaData->setRun(sndhdr.GetRunNum());
295  m_eventMetaData->setSubrun(sndhdr.GetSubRunNum());
296  m_eventMetaData->setEvent(sndhdr.GetEventNumber());
297  m_eventMetaData->setTime(mtime);
298 
299  if (error_flag) {
300  if (error_flag & RawHeader_latest::B2LINK_PACKET_CRC_ERROR) {
302  B2WARNING("Raw2Ds: c_B2LinkPacketCRCError flag was set in EventMetaData.");
303  }
304  if (error_flag & RawHeader_latest::B2LINK_EVENT_CRC_ERROR) {
306  B2WARNING("Raw2Ds: c_B2LinkEventCRCError flag was set in EventMetaData.");
307  }
308  }
309  }
310 }
@ c_DontWriteOut
Object/array should be NOT saved by output modules.
Definition: DataStore.h:71
Store event, run, and experiment numbers.
Definition: EventMetaData.h:33
@ c_B2LinkPacketCRCError
Belle2link CRC error is detected in the event.
Definition: EventMetaData.h:44
@ c_B2LinkEventCRCError
HSLB_COPPER CRC error is detected in the event.
Definition: EventMetaData.h:45
StoreArray< RawCDC > m_rawCDCs
Store Objects for HLT use.
std::unique_ptr< ZMQNoIdMessage > streamHistograms(bool compressed=true)
Stream all objects derived from TH1 into a message. Only the last subfolder is streamed by prefixing ...
void initialize()
Initialize this class. Call this e.g. in the first event.
void read(std::unique_ptr< ZMQNoIdMessage > message)
Read in a ZMQ message and rebuilt the data store from it.
std::unique_ptr< ZMQNoIdMessage > streamRaw()
Stream the data store into an event message and add SendHeader and SendTrailer around the message....
StoreArray< RawTOP > m_rawTOPs
Store Objects for HLT use.
StoreArray< RawKLM > m_rawKLMs
Store Objects for HLT use.
StoreArray< ROIid > m_rois
Additional Store Objects for ExpressReco use.
std::vector< char > m_outputBuffer
Temporary buffer for storing the compressed result.
StoreArray< RawSVD > m_rawSVDs
Store Objects for HLT use.
StoreArray< RawPXD > m_rawPXDs
Additional Store Objects for ExpressReco use.
StoreObjPtr< EventMetaData > m_eventMetaData
Store Objects for HLT use.
StoreObjPtr< TRGSummary > m_triggerSummary
Additional Store Objects for ExpressReco use.
unsigned int m_maximalCompressedSize
Maximal size of the compression buffer.
StoreArray< RawFTSW > m_rawFTSWs
Store Objects for HLT use.
void registerStoreObjects(bool addExpressRecoObjects)
Register all needed store objects, either only the raw data, ROIs and event meta data (for HLT) or ad...
StreamHelper m_streamHelper
We use the framework stream helper.
StoreArray< RawECL > m_rawECLs
Store Objects for HLT use.
StoreArray< RawTRG > m_rawTRGs
Store Objects for HLT use.
zmq::message_t getROIMessageIfViable() const
If the ROI payload data storobject is filled, write out the roi message (otherwise an empty message)
StoreArray< RawARICH > m_rawARICHs
Store Objects for HLT use.
StoreObjPtr< SoftwareTriggerResult > m_softwareTriggerResult
Additional Store Objects for ExpressReco use.
StoreObjPtr< RandomGenerator > m_randomGenerator
Additional Store Objects for ExpressReco use.
StoreObjPtr< SoftwareTrigger::SoftwareTriggerVariables > m_softwareTriggerVariables
Additional Store Objects for ExpressReco use.
std::unique_ptr< ZMQNoIdMessage > stream(bool addPersistentDurability, bool streamTransientObjects)
Stream the data store into an event message. Add ROI as additional message (if valid).
StoreObjPtr< ROIpayload > m_roiPayload
Store Objects for HLT use.
A class to encode/decode an EvtMessage.
Definition: MsgHandler.h:103
virtual void add(const TObject *, const std::string &name)
Add an object to be streamed.
Definition: MsgHandler.cc:46
virtual EvtMessage * encode_msg(ERecordType rectype)
Stream object list into an EvtMessage.
Definition: MsgHandler.cc:67
The Raw COPPER class This class stores data received by COPPER via belle2linkt Data from all detector...
Definition: RawCOPPER.h:52
void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes) OVERRIDE_CPP17
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
Definition: RawCOPPER.cc:141
The RawDataBlock class Base class for rawdata handling.
Definition: RawDataBlock.h:27
virtual int CheckFTSWID(int n)
get FTSW ID to check whether this data block is FTSW data or not
Definition: RawDataBlock.h:101
virtual int * GetBuffer(int n)
get nth buffer pointer
Definition: RawDataBlock.h:53
virtual void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes)
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
Definition: RawDataBlock.cc:35
virtual int GetBlockNwords(int n)
get size of a data block
Definition: RawDataBlock.h:94
The Raw FTSW class.
Definition: RawFTSW.h:30
unsigned int GetTTUtime(int n)
get unixtime of the trigger
Definition: RawFTSW.h:79
int GetTTCtime(int n)
Get ctime of the trigger.
Definition: RawFTSW.h:86
void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes) OVERRIDE_CPP17
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
Definition: RawFTSW.cc:107
void SetBuffer(int *hdr)
set buffer
Definition: SendHeader.cc:37
void SetNumEventsinPacket(int num_events)
set contents of Header
Definition: SendHeader.cc:61
int GetHdrNwords()
get contents of Header
Definition: SendHeader.cc:124
void SetNwords(int total_data_nwords)
initialize Header
Definition: SendHeader.cc:51
int GetNumEventsinPacket()
get contents of Header
Definition: SendHeader.cc:125
int * GetBuffer(void)
Get Header contents.
Definition: SendHeader.cc:32
void read(std::unique_ptr< ZMQNoIdMessage > message)
Read in a ZMQ message and rebuilt the data store from it.
Definition: StreamHelper.cc:41
std::unique_ptr< EvtMessage > stream(bool addPersistentDurability=true, bool streamTransientObjects=true)
Stream the data store into an event message.
Definition: StreamHelper.cc:29
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
Definition: StreamHelper.cc:18
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
unsigned int GetTTUtime(int n)
Check if COPPER Magic words are correct.
Definition: RawCOPPER.h:614
int GetDataType(int n)
get contents of header
Definition: RawCOPPER.h:404
int GetTTCtime(int n)
Get ctime.
Definition: RawCOPPER.h:620
unsigned int GetNodeID(int n)
get node-ID from data
Definition: RawCOPPER.h:397
Abstract base class for different kinds of events.