10 #include <daq/hbasf2/utils/HLTStreamHelper.h>
11 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
12 #include <framework/pcore/MsgHandler.h>
13 #include <daq/dataobjects/SendHeader.h>
14 #include <daq/dataobjects/SendTrailer.h>
18 #include <TDirectory.h>
23 #include <TBufferJSON.h>
31 void streamHistogramImpl(TDirectory* curdir,
Belle2::MsgHandler& msg,
const std::string& dirName =
"")
33 TList* keylist = curdir->GetList();
35 TIter nextkey(keylist);
38 while ((key = (TKey*)nextkey())) {
39 TObject* obj = curdir->Get(key->GetName());
40 TClass* objectClass = obj->IsA();
41 std::string objectName = dirName;
42 if (not objectName.empty()) {
45 objectName += obj->GetName();
47 if (objectClass->InheritsFrom(TH1::Class())) {
48 auto* h1 =
dynamic_cast<TH1*
>(obj);
49 msg.
add(h1, objectName);
50 }
else if (objectClass->InheritsFrom(TDirectory::Class())) {
51 auto* tdir =
dynamic_cast<TDirectory*
>(obj);
54 streamHistogramImpl(tdir, msg, obj->GetName());
77 if (addExpressRecoObjects) {
84 m_rois.registerInDataStore(
"ROIs");
97 int msgsize = (eventMessage->size() - 1) /
sizeof(
int) + 1;
98 int total_nwrds = msgsize + hdr.
GetHdrNwords() + trl.GetTrlNwords();
103 hdr.SetNumNodesinPacket(1);
111 zmq::message_t rawMessage(total_nwrds *
sizeof(
int));
112 int* buffer = rawMessage.data<
int>();
118 memcpy(buffer + hdr.
GetHdrNwords(), eventMessage->buffer(), eventMessage->size());
121 memcpy(buffer + hdr.
GetHdrNwords() + msgsize, trl.GetBuffer(),
122 trl.GetTrlNwords()*
sizeof(
int));
132 const auto eventMessage =
m_streamHelper.
stream(addPersistentDurability, streamTransientObjects);
141 zmq::message_t roiMessage;
144 const size_t length =
m_roiPayload->getPacketLengthByte();
145 const char* data =
reinterpret_cast<const char*
>(
m_roiPayload->getRootdata());
147 roiMessage = zmq::message_t(length);
148 char* tbuffer = roiMessage.data<
char>();
149 memcpy(tbuffer, data, length);
160 streamHistogramImpl(gDirectory, msgHandler);
162 auto evtMessage = std::unique_ptr<EvtMessage>(msgHandler.
encode_msg(Belle2::ERecordType::MSG_EVENT));
165 auto eventInformationString = TBufferJSON::ToJSON(&eventMetaData);
166 zmq::message_t additionalEventMessage(eventInformationString.Data(), eventInformationString.Length());
168 if (not compressed) {
170 std::move(additionalEventMessage));
178 B2DEBUG(10,
"Size before compression " << evtMessage->size());
180 size = LZ4_compress_default(evtMessage->buffer(), &
m_outputBuffer[0], evtMessage->size(), size);
181 B2ASSERT(
"Compression failed", size > 0);
182 B2DEBUG(10,
"Size after compression " << size);
187 std::move(message), std::move(additionalEventMessage));
192 if (message->isMessage(EMessageTypes::c_eventMessage)) {
194 }
else if (message->isMessage(EMessageTypes::c_rawDataMessage)) {
195 int* eventBuffer = message->getMessagePart<1>().data<int>();
200 if (npackedevts != 1) {
201 B2WARNING(
"Strange SendHeader : ");
203 for (
int i = 0; i < 10; i++) {
204 B2WARNING(std::hex << * (sndhdr.
GetBuffer() + i));
207 B2WARNING(
"Raw2DsModule::number of events in packet is not 1. This process gets stuck here. Please ABORT the system. (Please see discussion of daqcore channel in https://b2rc.kek.jp/ on 2017. Nov. 30. about why this is not FATAL message.");
210 int ncprs = sndhdr.GetNumNodesinPacket();
211 int nwords = sndhdr.GetTotalNwords() - SendHeader::SENDHDR_NWORDS - SendTrailer::SENDTRL_NWORDS;
214 int* bufbody = eventBuffer + SendHeader::SENDHDR_NWORDS;
218 tempdblk.
SetBuffer(bufbody, nwords,
false, npackedevts, ncprs);
220 unsigned int utime = 0;
221 unsigned int ctime = 0;
222 unsigned long long int mtime = 0;
224 int store_time_flag = 0;
225 unsigned int error_flag = 0;
228 for (
int cprid = 0; cprid < ncprs * npackedevts; cprid++) {
231 int* cprbuf =
new int[nwds_buf];
232 memcpy(cprbuf, tempdblk.
GetBuffer(cprid), nwds_buf * 4);
237 ftsw->
SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
242 mtime = 1000000000 * (
unsigned long long int)utime + (
unsigned long long int)(std::round(ctime / 0.127216));
245 }
else if (store_time_flag == 0) {
249 tempcpr_time.
SetBuffer(cprbuf, nwds_buf,
false, 1, 1);
250 utime = (
unsigned int)(tempcpr_time.
GetTTUtime(0));
251 ctime = (
unsigned int)(tempcpr_time.
GetTTCtime(0));
252 mtime = 1000000000 * (
unsigned long long int)utime + (
unsigned long long int)(std::round(ctime / 0.127216));
257 tempcpr.
SetBuffer(cprbuf, nwds_buf,
false, 1, 1);
259 error_flag |= (
unsigned int)(tempcpr.
GetDataType(0));
262 if ((subsysid & DETECTOR_MASK) == CDC_ID) {
263 (
m_rawCDCs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
264 }
else if ((subsysid & DETECTOR_MASK) == SVD_ID) {
265 (
m_rawSVDs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
266 }
else if ((subsysid & DETECTOR_MASK) == BECL_ID) {
267 (
m_rawECLs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
268 }
else if ((subsysid & DETECTOR_MASK) == EECL_ID) {
269 (
m_rawECLs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
270 }
else if ((subsysid & DETECTOR_MASK) == TOP_ID) {
271 (
m_rawTOPs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
272 }
else if ((subsysid & DETECTOR_MASK) == ARICH_ID) {
273 (
m_rawARICHs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
274 }
else if ((subsysid & DETECTOR_MASK) == BKLM_ID) {
275 (
m_rawKLMs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
276 }
else if ((subsysid & DETECTOR_MASK) == EKLM_ID) {
277 (
m_rawKLMs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
278 }
else if (((subsysid & DETECTOR_MASK) & 0xF0000000) == TRGDATA_ID) {
279 (
m_rawTRGs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
282 B2WARNING(
"Unknown COPPER ID : ");
283 for (
int i = 0; i < 12; i++) {
284 B2WARNING(std::hex << cprbuf[i]);
286 B2FATAL(
"Unknown COPPER ID is found. CPRID = " << std::hex << subsysid <<
" Please check. Exiting...");
290 if (store_time_flag != 1) {
291 B2FATAL(
"No time information could be extracted from Data. That should not happen. Exiting...");
302 if (error_flag & RawHeader_latest::B2LINK_PACKET_CRC_ERROR) {
304 B2WARNING(
"Raw2Ds: c_B2LinkPacketCRCError flag was set in EventMetaData.");
306 if (error_flag & RawHeader_latest::B2LINK_EVENT_CRC_ERROR) {
308 B2WARNING(
"Raw2Ds: c_B2LinkEventCRCError flag was set in EventMetaData.");