Belle II Software development
HLTStreamHelper.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <daq/hbasf2/utils/HLTStreamHelper.h>
9#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
10#include <framework/pcore/MsgHandler.h>
11#include <daq/dataobjects/SendHeader.h>
12#include <daq/dataobjects/SendTrailer.h>
13
14#include <TH1F.h>
15#include <TFile.h>
16#include <TDirectory.h>
17#include <TKey.h>
18#include <TClass.h>
19#include <TSystem.h>
20#include <TROOT.h>
21#include <TBufferJSON.h>
22
23#include <lz4.h>
24#include <zmq.hpp>
25
26using namespace Belle2;
27
28namespace {
29 void streamHistogramImpl(TDirectory* curdir, Belle2::MsgHandler& msg, const std::string& dirName = "")
30 {
31 TList* keylist = curdir->GetList();
32
33 TIter nextkey(keylist);
34 TKey* key = nullptr;
35
36 while ((key = (TKey*)nextkey())) {
37 TObject* obj = curdir->Get(key->GetName());
38 TClass* objectClass = obj->IsA();
39 std::string objectName = dirName;
40 if (not objectName.empty()) {
41 objectName += "/";
42 }
43 objectName += obj->GetName();
44
45 if (objectClass->InheritsFrom(TH1::Class())) {
46 auto* h1 = dynamic_cast<TH1*>(obj);
47 msg.add(h1, objectName);
48 } else if (objectClass->InheritsFrom(TDirectory::Class())) {
49 auto* tdir = dynamic_cast<TDirectory*>(obj);
50 // FIXME: Currently the dqm server does not understand multi-layer directory structures
51 // therefore I break this down to only show the last directory
52 streamHistogramImpl(tdir, msg, obj->GetName());
53 }
54 }
55 }
56}
57
59{
61}
62
63void HLTStreamHelper::registerStoreObjects(bool addExpressRecoObjects)
64{
65 m_eventMetaData.registerInDataStore();
66 m_rawSVDs.registerInDataStore();
67 m_rawCDCs.registerInDataStore();
68 m_rawTOPs.registerInDataStore();
69 m_rawARICHs.registerInDataStore();
70 m_rawECLs.registerInDataStore();
71 m_rawKLMs.registerInDataStore();
72 m_rawTRGs.registerInDataStore();
73 m_rawFTSWs.registerInDataStore();
74
75 if (addExpressRecoObjects) {
77 m_softwareTriggerResult.registerInDataStore();
78 m_softwareTriggerVariables.registerInDataStore();
79 m_triggerSummary.registerInDataStore();
80 m_rawPXDs.registerInDataStore();
81 m_roiPayload.registerInDataStore();
82 m_rois.registerInDataStore("ROIs");
83 }
84}
85
86std::unique_ptr<ZMQNoIdMessage> HLTStreamHelper::streamRaw()
87{
88 const auto eventMessage = m_streamHelper.stream(false, false);
89
90 // Fill Header and Trailer
91 SendHeader hdr;
92 SendTrailer trl;
93
94 // Number of total words
95 int msgsize = (eventMessage->size() - 1) / sizeof(int) + 1;
96 int total_nwrds = msgsize + hdr.GetHdrNwords() + trl.GetTrlNwords();
97
98 // Fill header and trailer
99 hdr.SetNwords(total_nwrds);
101 hdr.SetNumNodesinPacket(1);
102 hdr.SetEventNumber(m_eventMetaData->getEvent());
103 // hdr.SetExpRunWord(evtmeta->getRun());
104 hdr.SetSubRunNum(m_eventMetaData->getSubrun()); // modified on Apr. 20, 2016 by SY
105 hdr.SetRunNum(m_eventMetaData->getRun());
106 hdr.SetExpNum(m_eventMetaData->getExperiment());
107 hdr.SetNodeID(300);
108
109 zmq::message_t rawMessage(total_nwrds * sizeof(int));
110 int* buffer = rawMessage.data<int>();
111
112 // Fill header
113 memcpy(buffer, hdr.GetBuffer(), hdr.GetHdrNwords()*sizeof(int));
114
115 // Fill EvtMessage
116 memcpy(buffer + hdr.GetHdrNwords(), eventMessage->buffer(), eventMessage->size());
117
118 // Fill trailer
119 memcpy(buffer + hdr.GetHdrNwords() + msgsize, trl.GetBuffer(),
120 trl.GetTrlNwords()*sizeof(int));
121
122 zmq::message_t roiMessage = getROIMessageIfViable();
123 auto zmqMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_rawDataMessage, std::move(rawMessage), std::move(roiMessage));
124
125 return zmqMessage;
126}
127
128std::unique_ptr<ZMQNoIdMessage> HLTStreamHelper::stream(bool addPersistentDurability, bool streamTransientObjects)
129{
130 const auto eventMessage = m_streamHelper.stream(addPersistentDurability, streamTransientObjects);
131
132 zmq::message_t roiMessage = getROIMessageIfViable();
133 auto zmqMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_eventMessage, eventMessage, std::move(roiMessage));
134 return zmqMessage;
135}
136
138{
139 zmq::message_t roiMessage;
140
141 if (m_roiPayload.isValid()) {
142 const size_t length = m_roiPayload->getPacketLengthByte();
143 const char* data = reinterpret_cast<const char*>(m_roiPayload->getRootdata());
144
145 roiMessage = zmq::message_t(length);
146 char* tbuffer = roiMessage.data<char>();
147 memcpy(tbuffer, data, length);
148 }
149
150 return roiMessage;
151}
152
153std::unique_ptr<ZMQNoIdMessage> HLTStreamHelper::streamHistograms(bool compressed)
154{
155 B2ASSERT("Event Meta Data not set!", m_eventMetaData.isValid());
156
157 Belle2::MsgHandler msgHandler;
158 streamHistogramImpl(gDirectory, msgHandler);
159
160 auto evtMessage = std::unique_ptr<EvtMessage>(msgHandler.encode_msg(Belle2::ERecordType::MSG_EVENT));
161
162 EventMetaData& eventMetaData = *m_eventMetaData;
163 auto eventInformationString = TBufferJSON::ToJSON(&eventMetaData);
164 zmq::message_t additionalEventMessage(eventInformationString.Data(), eventInformationString.Length());
165
166 if (not compressed) {
167 return Belle2::ZMQMessageFactory::createMessage(Belle2::EMessageTypes::c_rawDataMessage, evtMessage,
168 std::move(additionalEventMessage));
169 }
170
171 // TODO: do I want to use compression everywhere? That is probably not worth it! -> test this!
172 if (m_outputBuffer.empty()) {
174 }
175
176 B2DEBUG(10, "Size before compression " << evtMessage->size());
177 int size = m_maximalCompressedSize;
178 size = LZ4_compress_default(evtMessage->buffer(), &m_outputBuffer[0], evtMessage->size(), size);
179 B2ASSERT("Compression failed", size > 0);
180 B2DEBUG(10, "Size after compression " << size);
181
182 zmq::message_t message(&m_outputBuffer[0], size);
183
184 return Belle2::ZMQMessageFactory::createMessage(Belle2::EMessageTypes::c_compressedDataMessage,
185 std::move(message), std::move(additionalEventMessage));
186}
187
188void HLTStreamHelper::read(std::unique_ptr<ZMQNoIdMessage> message)
189{
190 if (message->isMessage(EMessageTypes::c_eventMessage)) {
191 m_streamHelper.read(std::move(message));
192 } else if (message->isMessage(EMessageTypes::c_rawDataMessage)) {
193 int* eventBuffer = message->getMessagePart<1>().data<int>();
194
195 SendHeader sndhdr;
196 sndhdr.SetBuffer(eventBuffer);
197 int npackedevts = sndhdr.GetNumEventsinPacket();
198 if (npackedevts != 1) {
199 B2WARNING("Strange SendHeader : ");
200 // for (int i = 0; i < sndhdr.SENDHDR_NWORDS; i++) {
201 for (int i = 0; i < 10; i++) {
202 B2WARNING(std::hex << * (sndhdr.GetBuffer() + i));
203 }
204
205 B2WARNING("Raw2DsModule::number of events in packet is not 1. This process gets stuck here. Please ABORT the system. (Please see discussion of daqcore channel in https://b2rc.kek.jp/ on 2017. Nov. 30. about why this is not FATAL message.");
206 sleep(86400);
207 }
208 int ncprs = sndhdr.GetNumNodesinPacket();
209 int nwords = sndhdr.GetTotalNwords() - SendHeader::SENDHDR_NWORDS - SendTrailer::SENDTRL_NWORDS;
210
211 // Get buffer header
212 int* bufbody = eventBuffer + SendHeader::SENDHDR_NWORDS;
213
214 // Unpack buffer
215 RawDataBlock tempdblk;
216 tempdblk.SetBuffer(bufbody, nwords, false, npackedevts, ncprs);
217
218 unsigned long long int mtime = 0;
219
220 int store_time_flag = 0;
221 unsigned int error_flag = 0;
222
223 // Store data contents in Corresponding RawXXXX
224 for (int cprid = 0; cprid < ncprs * npackedevts; cprid++) {
225 // Pick up one COPPER and copy data in a temporary buffer
226 int nwds_buf = tempdblk.GetBlockNwords(cprid);
227 int* cprbuf = new int[nwds_buf];
228 memcpy(cprbuf, tempdblk.GetBuffer(cprid), nwds_buf * 4);
229
230 // Check FTSW
231 if (tempdblk.CheckFTSWID(cprid)) {
232 RawFTSW* ftsw = m_rawFTSWs.appendNew();
233 ftsw->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
234
235 // Tentative for DESY TB 2017
236 unsigned int utime = (unsigned int)(ftsw->GetTTUtime(0));
237 unsigned int ctime = (unsigned int)(ftsw->GetTTCtime(0));
238 mtime = 1000000000 * (unsigned long long int)utime + (unsigned long long int)(std::round(ctime / 0.127216));
239 store_time_flag = 1;
240 continue;
241 } else if (store_time_flag == 0) {
242 // Tentative until RawFTSW data stream is established. 2018.5.28
243 // Not store RawCOPPER here. 2018.11.23
244 RawCOPPER tempcpr_time;
245 tempcpr_time.SetBuffer(cprbuf, nwds_buf, false, 1, 1);
246 unsigned int utime = (unsigned int)(tempcpr_time.GetTTUtime(0));
247 unsigned int ctime = (unsigned int)(tempcpr_time.GetTTCtime(0));
248 mtime = 1000000000 * (unsigned long long int)utime + (unsigned long long int)(std::round(ctime / 0.127216));
249 store_time_flag = 1;
250 }
251
252 RawCOPPER tempcpr;
253 tempcpr.SetBuffer(cprbuf, nwds_buf, false, 1, 1);
254 int subsysid = tempcpr.GetNodeID(0);
255 error_flag |= (unsigned int)(tempcpr.GetDataType(0));
256
257 // Switch to each detector and register RawXXX
258 if ((subsysid & DETECTOR_MASK) == CDC_ID) {
259 (m_rawCDCs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
260 } else if ((subsysid & DETECTOR_MASK) == SVD_ID) {
261 (m_rawSVDs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
262 } else if ((subsysid & DETECTOR_MASK) == BECL_ID) {
263 (m_rawECLs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
264 } else if ((subsysid & DETECTOR_MASK) == EECL_ID) {
265 (m_rawECLs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
266 } else if ((subsysid & DETECTOR_MASK) == TOP_ID) {
267 (m_rawTOPs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
268 } else if ((subsysid & DETECTOR_MASK) == ARICH_ID) {
269 (m_rawARICHs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
270 } else if ((subsysid & DETECTOR_MASK) == BKLM_ID) {
271 (m_rawKLMs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
272 } else if ((subsysid & DETECTOR_MASK) == EKLM_ID) {
273 (m_rawKLMs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
274 } else if (((subsysid & DETECTOR_MASK) & 0xF0000000) == TRGDATA_ID) {
275 (m_rawTRGs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
276 } else {
277 // Do not store Unknown RawCOPPER object. 2018.11.25
278 B2WARNING("Unknown COPPER ID : ");
279 for (int i = 0; i < 12; i++) {
280 B2WARNING(std::hex << cprbuf[i]);
281 }
282 B2FATAL("Unknown COPPER ID is found. CPRID = " << std::hex << subsysid << " Please check. Exiting...");
283 }
284 }
285
286 if (store_time_flag != 1) {
287 B2FATAL("No time information could be extracted from Data. That should not happen. Exiting...");
288 }
289
290 m_eventMetaData.create();
291 m_eventMetaData->setExperiment(sndhdr.GetExpNum());
292 m_eventMetaData->setRun(sndhdr.GetRunNum());
293 m_eventMetaData->setSubrun(sndhdr.GetSubRunNum());
294 m_eventMetaData->setEvent(sndhdr.GetEventNumber());
295 m_eventMetaData->setTime(mtime);
296
297 if (error_flag) {
298 if (error_flag & RawHeader_latest::B2LINK_PACKET_CRC_ERROR) {
300 B2WARNING("Raw2Ds: c_B2LinkPacketCRCError flag was set in EventMetaData.");
301 }
302 if (error_flag & RawHeader_latest::B2LINK_EVENT_CRC_ERROR) {
304 B2WARNING("Raw2Ds: c_B2LinkEventCRCError flag was set in EventMetaData.");
305 }
306 }
307 }
308}
@ c_DontWriteOut
Object/array should be NOT saved by output modules.
Definition: DataStore.h:71
Store event, run, and experiment numbers.
Definition: EventMetaData.h:33
@ c_B2LinkPacketCRCError
Belle2link CRC error is detected in the event.
Definition: EventMetaData.h:44
@ c_B2LinkEventCRCError
HSLB_COPPER CRC error is detected in the event.
Definition: EventMetaData.h:45
StoreArray< RawCDC > m_rawCDCs
Store Objects for HLT use.
std::unique_ptr< ZMQNoIdMessage > streamHistograms(bool compressed=true)
Stream all objects derived from TH1 into a message. Only the last subfolder is streamed by prefixing ...
void initialize()
Initialize this class. Call this e.g. in the first event.
void read(std::unique_ptr< ZMQNoIdMessage > message)
Read in a ZMQ message and rebuilt the data store from it.
std::unique_ptr< ZMQNoIdMessage > streamRaw()
Stream the data store into an event message and add SendHeader and SendTrailer around the message....
StoreArray< RawTOP > m_rawTOPs
Store Objects for HLT use.
StoreArray< RawKLM > m_rawKLMs
Store Objects for HLT use.
StoreArray< ROIid > m_rois
Additional Store Objects for ExpressReco use.
std::vector< char > m_outputBuffer
Temporary buffer for storing the compressed result.
StoreArray< RawSVD > m_rawSVDs
Store Objects for HLT use.
StoreArray< RawPXD > m_rawPXDs
Additional Store Objects for ExpressReco use.
StoreObjPtr< EventMetaData > m_eventMetaData
Store Objects for HLT use.
StoreObjPtr< TRGSummary > m_triggerSummary
Additional Store Objects for ExpressReco use.
unsigned int m_maximalCompressedSize
Maximal size of the compression buffer.
StoreArray< RawFTSW > m_rawFTSWs
Store Objects for HLT use.
void registerStoreObjects(bool addExpressRecoObjects)
Register all needed store objects, either only the raw data, ROIs and event meta data (for HLT) or ad...
StreamHelper m_streamHelper
We use the framework stream helper.
StoreArray< RawECL > m_rawECLs
Store Objects for HLT use.
StoreArray< RawTRG > m_rawTRGs
Store Objects for HLT use.
zmq::message_t getROIMessageIfViable() const
If the ROI payload data storobject is filled, write out the roi message (otherwise an empty message)
StoreArray< RawARICH > m_rawARICHs
Store Objects for HLT use.
StoreObjPtr< SoftwareTriggerResult > m_softwareTriggerResult
Additional Store Objects for ExpressReco use.
StoreObjPtr< RandomGenerator > m_randomGenerator
Additional Store Objects for ExpressReco use.
StoreObjPtr< SoftwareTrigger::SoftwareTriggerVariables > m_softwareTriggerVariables
Additional Store Objects for ExpressReco use.
std::unique_ptr< ZMQNoIdMessage > stream(bool addPersistentDurability, bool streamTransientObjects)
Stream the data store into an event message. Add ROI as additional message (if valid).
StoreObjPtr< ROIpayload > m_roiPayload
Store Objects for HLT use.
A class to encode/decode an EvtMessage.
Definition: MsgHandler.h:103
virtual void add(const TObject *, const std::string &name)
Add an object to be streamed.
Definition: MsgHandler.cc:46
virtual EvtMessage * encode_msg(ERecordType rectype)
Stream object list into an EvtMessage.
Definition: MsgHandler.cc:67
The Raw COPPER class This class stores data received by COPPER via belle2linkt Data from all detector...
Definition: RawCOPPER.h:52
void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes) OVERRIDE_CPP17
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
Definition: RawCOPPER.cc:141
The RawDataBlock class Base class for rawdata handling.
Definition: RawDataBlock.h:27
virtual int * GetBuffer(int n)
get nth buffer pointer
Definition: RawDataBlock.h:53
virtual int CheckFTSWID(int n)
get FTSW ID to check whether this data block is FTSW data or not
Definition: RawDataBlock.h:101
virtual void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes)
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
Definition: RawDataBlock.cc:35
virtual int GetBlockNwords(int n)
get size of a data block
Definition: RawDataBlock.h:94
The Raw FTSW class.
Definition: RawFTSW.h:30
unsigned int GetTTUtime(int n)
get unixtime of the trigger
Definition: RawFTSW.h:79
int GetTTCtime(int n)
Get ctime of the trigger.
Definition: RawFTSW.h:86
void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes) OVERRIDE_CPP17
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
Definition: RawFTSW.cc:107
void SetBuffer(int *hdr)
set buffer
Definition: SendHeader.cc:37
void SetNumEventsinPacket(int num_events)
set contents of Header
Definition: SendHeader.cc:61
int GetHdrNwords()
get contents of Header
Definition: SendHeader.cc:124
void SetNwords(int total_data_nwords)
initialize Header
Definition: SendHeader.cc:51
int GetNumEventsinPacket()
get contents of Header
Definition: SendHeader.cc:125
int * GetBuffer(void)
Get Header contents.
Definition: SendHeader.cc:32
void read(std::unique_ptr< ZMQNoIdMessage > message)
Read in a ZMQ message and rebuilt the data store from it.
Definition: StreamHelper.cc:41
std::unique_ptr< EvtMessage > stream(bool addPersistentDurability=true, bool streamTransientObjects=true)
Stream the data store into an event message.
Definition: StreamHelper.cc:29
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
Definition: StreamHelper.cc:18
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
unsigned int GetTTUtime(int n)
Check if COPPER Magic words are correct.
Definition: RawCOPPER.h:614
int GetDataType(int n)
get contents of header
Definition: RawCOPPER.h:404
int GetTTCtime(int n)
Get ctime.
Definition: RawCOPPER.h:620
unsigned int GetNodeID(int n)
get node-ID from data
Definition: RawCOPPER.h:397
Abstract base class for different kinds of events.