8#include <daq/hbasf2/utils/HLTStreamHelper.h>
9#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
10#include <framework/pcore/MsgHandler.h>
11#include <daq/dataobjects/SendHeader.h>
12#include <daq/dataobjects/SendTrailer.h>
16#include <TDirectory.h>
21#include <TBufferJSON.h>
29 void streamHistogramImpl(TDirectory* curdir,
Belle2::MsgHandler& msg,
const std::string& dirName =
"")
31 TList* keylist = curdir->GetList();
33 TIter nextkey(keylist);
36 while ((key = (TKey*)nextkey())) {
37 TObject* obj = curdir->Get(key->GetName());
38 TClass* objectClass = obj->IsA();
39 std::string objectName = dirName;
40 if (not objectName.empty()) {
43 objectName += obj->GetName();
45 if (objectClass->InheritsFrom(TH1::Class())) {
46 auto* h1 =
dynamic_cast<TH1*
>(obj);
47 msg.
add(h1, objectName);
48 }
else if (objectClass->InheritsFrom(TDirectory::Class())) {
49 auto* tdir =
dynamic_cast<TDirectory*
>(obj);
52 streamHistogramImpl(tdir, msg, obj->GetName());
75 if (addExpressRecoObjects) {
82 m_rois.registerInDataStore(
"ROIs");
95 int msgsize = (eventMessage->size() - 1) /
sizeof(
int) + 1;
96 int total_nwrds = msgsize + hdr.
GetHdrNwords() + trl.GetTrlNwords();
101 hdr.SetNumNodesinPacket(1);
109 zmq::message_t rawMessage(total_nwrds *
sizeof(
int));
110 int* buffer = rawMessage.data<
int>();
116 memcpy(buffer + hdr.
GetHdrNwords(), eventMessage->buffer(), eventMessage->size());
119 memcpy(buffer + hdr.
GetHdrNwords() + msgsize, trl.GetBuffer(),
120 trl.GetTrlNwords()*
sizeof(
int));
130 const auto eventMessage =
m_streamHelper.
stream(addPersistentDurability, streamTransientObjects);
139 zmq::message_t roiMessage;
142 const size_t length =
m_roiPayload->getPacketLengthByte();
143 const char* data =
reinterpret_cast<const char*
>(
m_roiPayload->getRootdata());
145 roiMessage = zmq::message_t(length);
146 char* tbuffer = roiMessage.data<
char>();
147 memcpy(tbuffer, data, length);
158 streamHistogramImpl(gDirectory, msgHandler);
160 auto evtMessage = std::unique_ptr<EvtMessage>(msgHandler.
encode_msg(Belle2::ERecordType::MSG_EVENT));
163 auto eventInformationString = TBufferJSON::ToJSON(&eventMetaData);
164 zmq::message_t additionalEventMessage(eventInformationString.Data(), eventInformationString.Length());
166 if (not compressed) {
168 std::move(additionalEventMessage));
176 B2DEBUG(10,
"Size before compression " << evtMessage->size());
178 size = LZ4_compress_default(evtMessage->buffer(), &
m_outputBuffer[0], evtMessage->size(), size);
179 B2ASSERT(
"Compression failed", size > 0);
180 B2DEBUG(10,
"Size after compression " << size);
185 std::move(message), std::move(additionalEventMessage));
190 if (message->isMessage(EMessageTypes::c_eventMessage)) {
192 }
else if (message->isMessage(EMessageTypes::c_rawDataMessage)) {
193 int* eventBuffer = message->getMessagePart<1>().data<int>();
198 if (npackedevts != 1) {
199 B2WARNING(
"Strange SendHeader : ");
201 for (
int i = 0; i < 10; i++) {
202 B2WARNING(std::hex << * (sndhdr.
GetBuffer() + i));
205 B2WARNING(
"Raw2DsModule::number of events in packet is not 1. This process gets stuck here. Please ABORT the system. (Please see discussion of daqcore channel in https://b2rc.kek.jp/ on 2017. Nov. 30. about why this is not FATAL message.");
208 int ncprs = sndhdr.GetNumNodesinPacket();
209 int nwords = sndhdr.GetTotalNwords() - SendHeader::SENDHDR_NWORDS - SendTrailer::SENDTRL_NWORDS;
212 int* bufbody = eventBuffer + SendHeader::SENDHDR_NWORDS;
216 tempdblk.
SetBuffer(bufbody, nwords,
false, npackedevts, ncprs);
218 unsigned long long int mtime = 0;
220 int store_time_flag = 0;
221 unsigned int error_flag = 0;
224 for (
int cprid = 0; cprid < ncprs * npackedevts; cprid++) {
227 int* cprbuf =
new int[nwds_buf];
228 memcpy(cprbuf, tempdblk.
GetBuffer(cprid), nwds_buf * 4);
233 ftsw->
SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
236 unsigned int utime = (
unsigned int)(ftsw->
GetTTUtime(0));
237 unsigned int ctime = (
unsigned int)(ftsw->
GetTTCtime(0));
238 mtime = 1000000000 * (
unsigned long long int)utime + (
unsigned long long int)(std::round(ctime / 0.127216));
241 }
else if (store_time_flag == 0) {
245 tempcpr_time.
SetBuffer(cprbuf, nwds_buf,
false, 1, 1);
246 unsigned int utime = (
unsigned int)(tempcpr_time.
GetTTUtime(0));
247 unsigned int ctime = (
unsigned int)(tempcpr_time.
GetTTCtime(0));
248 mtime = 1000000000 * (
unsigned long long int)utime + (
unsigned long long int)(std::round(ctime / 0.127216));
253 tempcpr.
SetBuffer(cprbuf, nwds_buf,
false, 1, 1);
255 error_flag |= (
unsigned int)(tempcpr.
GetDataType(0));
258 if ((subsysid & DETECTOR_MASK) == CDC_ID) {
259 (
m_rawCDCs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
260 }
else if ((subsysid & DETECTOR_MASK) == SVD_ID) {
261 (
m_rawSVDs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
262 }
else if ((subsysid & DETECTOR_MASK) == BECL_ID) {
263 (
m_rawECLs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
264 }
else if ((subsysid & DETECTOR_MASK) == EECL_ID) {
265 (
m_rawECLs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
266 }
else if ((subsysid & DETECTOR_MASK) == TOP_ID) {
267 (
m_rawTOPs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
268 }
else if ((subsysid & DETECTOR_MASK) == ARICH_ID) {
269 (
m_rawARICHs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
270 }
else if ((subsysid & DETECTOR_MASK) == BKLM_ID) {
271 (
m_rawKLMs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
272 }
else if ((subsysid & DETECTOR_MASK) == EKLM_ID) {
273 (
m_rawKLMs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
274 }
else if (((subsysid & DETECTOR_MASK) & 0xF0000000) == TRGDATA_ID) {
275 (
m_rawTRGs.appendNew())->SetBuffer(cprbuf, nwds_buf, 1, 1, 1);
278 B2WARNING(
"Unknown COPPER ID : ");
279 for (
int i = 0; i < 12; i++) {
280 B2WARNING(std::hex << cprbuf[i]);
282 B2FATAL(
"Unknown COPPER ID is found. CPRID = " << std::hex << subsysid <<
" Please check. Exiting...");
286 if (store_time_flag != 1) {
287 B2FATAL(
"No time information could be extracted from Data. That should not happen. Exiting...");
298 if (error_flag & RawHeader_latest::B2LINK_PACKET_CRC_ERROR) {
300 B2WARNING(
"Raw2Ds: c_B2LinkPacketCRCError flag was set in EventMetaData.");
302 if (error_flag & RawHeader_latest::B2LINK_EVENT_CRC_ERROR) {
304 B2WARNING(
"Raw2Ds: c_B2LinkEventCRCError flag was set in EventMetaData.");
@ c_DontWriteOut
Object/array should be NOT saved by output modules.
StoreArray< RawCDC > m_rawCDCs
Store Objects for HLT use.
std::unique_ptr< ZMQNoIdMessage > streamHistograms(bool compressed=true)
Stream all objects derived from TH1 into a message. Only the last subfolder is streamed by prefixing ...
void initialize()
Initialize this class. Call this e.g. in the first event.
void read(std::unique_ptr< ZMQNoIdMessage > message)
Read in a ZMQ message and rebuilt the data store from it.
std::unique_ptr< ZMQNoIdMessage > streamRaw()
Stream the data store into an event message and add SendHeader and SendTrailer around the message....
StoreArray< RawTOP > m_rawTOPs
Store Objects for HLT use.
StoreArray< RawKLM > m_rawKLMs
Store Objects for HLT use.
StoreArray< ROIid > m_rois
Additional Store Objects for ExpressReco use.
std::vector< char > m_outputBuffer
Temporary buffer for storing the compressed result.
StoreArray< RawSVD > m_rawSVDs
Store Objects for HLT use.
StoreArray< RawPXD > m_rawPXDs
Additional Store Objects for ExpressReco use.
StoreObjPtr< EventMetaData > m_eventMetaData
Store Objects for HLT use.
StoreObjPtr< TRGSummary > m_triggerSummary
Additional Store Objects for ExpressReco use.
unsigned int m_maximalCompressedSize
Maximal size of the compression buffer.
StoreArray< RawFTSW > m_rawFTSWs
Store Objects for HLT use.
void registerStoreObjects(bool addExpressRecoObjects)
Register all needed store objects, either only the raw data, ROIs and event meta data (for HLT) or ad...
StreamHelper m_streamHelper
We use the framework stream helper.
StoreArray< RawECL > m_rawECLs
Store Objects for HLT use.
StoreArray< RawTRG > m_rawTRGs
Store Objects for HLT use.
zmq::message_t getROIMessageIfViable() const
If the ROI payload data storobject is filled, write out the roi message (otherwise an empty message)
StoreArray< RawARICH > m_rawARICHs
Store Objects for HLT use.
StoreObjPtr< SoftwareTriggerResult > m_softwareTriggerResult
Additional Store Objects for ExpressReco use.
StoreObjPtr< RandomGenerator > m_randomGenerator
Additional Store Objects for ExpressReco use.
StoreObjPtr< SoftwareTrigger::SoftwareTriggerVariables > m_softwareTriggerVariables
Additional Store Objects for ExpressReco use.
std::unique_ptr< ZMQNoIdMessage > stream(bool addPersistentDurability, bool streamTransientObjects)
Stream the data store into an event message. Add ROI as additional message (if valid).
StoreObjPtr< ROIpayload > m_roiPayload
Store Objects for HLT use.
A class to encode/decode an EvtMessage.
virtual void add(const TObject *, const std::string &name)
Add an object to be streamed.
virtual EvtMessage * encode_msg(ERecordType rectype)
Stream object list into an EvtMessage.
The Raw COPPER class This class stores data received by COPPER via belle2linkt Data from all detector...
void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes) OVERRIDE_CPP17
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
The RawDataBlock class Base class for rawdata handling.
virtual int * GetBuffer(int n)
get nth buffer pointer
virtual int CheckFTSWID(int n)
get FTSW ID to check whether this data block is FTSW data or not
virtual void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes)
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
virtual int GetBlockNwords(int n)
get size of a data block
unsigned int GetTTUtime(int n)
get unixtime of the trigger
int GetTTCtime(int n)
Get ctime of the trigger.
void SetBuffer(int *bufin, int nwords, int delete_flag, int num_events, int num_nodes) OVERRIDE_CPP17
set buffer ( delete_flag : m_buffer is freeed( = 0 )/ not freeed( = 1 ) in Destructer )
void read(std::unique_ptr< ZMQNoIdMessage > message)
Read in a ZMQ message and rebuilt the data store from it.
std::unique_ptr< EvtMessage > stream(bool addPersistentDurability=true, bool streamTransientObjects=true)
Stream the data store into an event message.
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
unsigned int GetTTUtime(int n)
Check if COPPER Magic words are correct.
int GetDataType(int n)
get contents of header
int GetTTCtime(int n)
Get ctime.
unsigned int GetNodeID(int n)
get node-ID from data
Abstract base class for different kinds of events.