Belle II Software development
StorageStreamHelper.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <daq/hbasf2/utils/StorageStreamHelper.h>
9#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
10#include <framework/pcore/MsgHandler.h>
11#include <daq/dataobjects/SendHeader.h>
12#include <daq/dataobjects/SendTrailer.h>
13
14#include <TH1F.h>
15#include <TFile.h>
16#include <TDirectory.h>
17#include <TKey.h>
18#include <TClass.h>
19#include <TSystem.h>
20#include <TROOT.h>
21#include <TBufferJSON.h>
22
23#include <lz4.h>
24#include <zmq.hpp>
25
26#include <daq/storage/ONSENBinData.h>
27
28using namespace Belle2;
29
31{
32 //m_streamHelper.initialize(0, true);
33
34 m_buf = new int [10000000];
35 m_data.setBuffer(m_buf);
36 m_data_hlt.setBuffer(NULL);
37 m_data_pxd.setBuffer(NULL);
38}
39
40void StorageStreamHelper::registerStoreObjects(bool addExpressRecoObjects)
41{
42 m_eventMetaData.registerInDataStore();
43 m_rawSVDs.registerInDataStore();
44 m_rawCDCs.registerInDataStore();
45 m_rawTOPs.registerInDataStore();
46 m_rawARICHs.registerInDataStore();
47 m_rawECLs.registerInDataStore();
48 m_rawKLMs.registerInDataStore();
49 m_rawTRGs.registerInDataStore();
50 m_rawFTSWs.registerInDataStore();
51
52 //m_eventT0.registerInDataStore();
53 m_onlineEventT0.registerInDataStore();
54 //m_eventT0.isRequired();
55
56 if (addExpressRecoObjects) {
58 m_softwareTriggerResult.registerInDataStore();
59 m_softwareTriggerVariables.registerInDataStore();
60 m_triggerSummary.registerInDataStore();
61 m_rawPXDs.registerInDataStore();
62 m_roiPayload.registerInDataStore();
63 m_rois.registerInDataStore("ROIs");
64 }
65
66 m_streamer = new DataStoreStreamer();
67}
68
69void StorageStreamHelper::read(std::unique_ptr<ZMQNoIdMessage> message)
70{
71 if (message->isMessage(EMessageTypes::c_eventMessage)) {
72 B2DEBUG(10, "EMessageTypes::c_eventMessage: begin");
73 m_streamHelper.read(std::move(message));
74 B2DEBUG(10, "EMessageTypes::c_eventMessage: end");
75 } else if (message->isMessage(EMessageTypes::c_rawDataMessage)) {
76 B2DEBUG(10, "EMessageTypes::c_rawDataMessage");
77 int* eventBuffer = message->getMessagePart<1>().data<int>();
78
79 m_data.setBuffer(eventBuffer);
80
81 int nboard = m_data.getNBoard();
82 /*
83 if (m_eb2 == 0 && nboard == 1) {
84 m_data_hlt.setBuffer(m_data.getBuffer());
85 } else if (m_eb2 > 0 || nboard > 1) {
86 m_data_hlt.setBuffer(m_data.getBody());
87 }
88 */
89
90 m_data_hlt.setBuffer(m_data.getBody());
91
92 EvtMessage* msg = new EvtMessage((char*)m_data_hlt.getBody());
93 if (msg->type() == MSG_TERMINATE) {
94 B2INFO("Got Termination message");
95 delete msg;
96 return;
97 }
98 m_streamer->restoreDataStore(msg);
99 delete msg;
100
101 if (nboard > 1) {
102 unsigned int offset = m_data_hlt.getWordSize() + m_data.getHeaderWordSize();
103 for (int i = 0; i < nboard - 1; i++) {
104 m_data_pxd.setBuffer(m_data.getBuffer() + offset);
105 offset += m_data_pxd.getWordSize();
106 if (m_data_pxd.getBody()[0] != ONSENBinData::MAGIC) {
107 B2FATAL("Bad ONSEN magic for PXD = " << m_data_pxd.getTrailerMagic());
108 return;
109 } else if (m_data_pxd.getTrailerMagic() != BinData::TRAILER_MAGIC) {
110 B2FATAL("Bad tarailer magic for PXD = " << m_data_pxd.getTrailerMagic());
111 return;
112 }
113 if (m_data_pxd.getBuffer() != NULL) {
114 m_rawPXDs.appendNew((int*)m_data_pxd.getBody(), m_data_pxd.getBodyByteSize());
115 }
116 }
117 } else {
118 m_data_pxd.setBuffer(NULL);
119 }
120
121 // m_eventMetaData.create();
122 m_eventMetaData->setExperiment(m_data_hlt.getExpNumber());
123 m_eventMetaData->setRun(m_data_hlt.getRunNumber());
124 m_eventMetaData->setSubrun(m_data_hlt.getSubNumber());
125 m_eventMetaData->setEvent(m_data_hlt.getEventNumber());
126 }
127}
Stream/restore DataStore objects to/from EvtMessage.
int restoreDataStore(EvtMessage *msg)
Restore DataStore objects from EvtMessage.
@ c_DontWriteOut
Object/array should be NOT saved by output modules.
Definition: DataStore.h:71
Class to manage streamed object.
Definition: EvtMessage.h:59
ERecordType type() const
Get record type.
Definition: EvtMessage.cc:114
StoreArray< RawCDC > m_rawCDCs
Store Objects for HLT use.
void initialize()
Initialize this class. Call this e.g. in the first event.
void read(std::unique_ptr< ZMQNoIdMessage > message)
Read in a ZMQ message and rebuilt the data store from it.
StoreArray< RawTOP > m_rawTOPs
Store Objects for HLT use.
StoreArray< RawKLM > m_rawKLMs
Store Objects for HLT use.
StoreArray< ROIid > m_rois
Additional Store Objects for ExpressReco use.
StoreArray< RawSVD > m_rawSVDs
Store Objects for HLT use.
BinData m_data
Decoding input.
StoreArray< RawPXD > m_rawPXDs
Additional Store Objects for ExpressReco use.
StoreObjPtr< EventMetaData > m_eventMetaData
Store Objects for HLT use.
StoreObjPtr< TRGSummary > m_triggerSummary
Additional Store Objects for ExpressReco use.
StoreArray< RawFTSW > m_rawFTSWs
Store Objects for HLT use.
StoreArray< OnlineEventT0 > m_onlineEventT0
StoreArray of OnlineEventT0.
void registerStoreObjects(bool addExpressRecoObjects)
Register all needed store objects, either only the raw data, ROIs and event meta data (for HLT) or ad...
StreamHelper m_streamHelper
We use the framework stream helper.
StoreArray< RawECL > m_rawECLs
Store Objects for HLT use.
StoreArray< RawTRG > m_rawTRGs
Store Objects for HLT use.
StoreArray< RawARICH > m_rawARICHs
Store Objects for HLT use.
StoreObjPtr< SoftwareTriggerResult > m_softwareTriggerResult
Additional Store Objects for ExpressReco use.
StoreObjPtr< RandomGenerator > m_randomGenerator
Additional Store Objects for ExpressReco use.
StoreObjPtr< SoftwareTrigger::SoftwareTriggerVariables > m_softwareTriggerVariables
Additional Store Objects for ExpressReco use.
StoreObjPtr< ROIpayload > m_roiPayload
Store Objects for HLT use.
void read(std::unique_ptr< ZMQNoIdMessage > message)
Read in a ZMQ message and rebuilt the data store from it.
Definition: StreamHelper.cc:41
Abstract base class for different kinds of events.