Belle II Software  release-08-01-10
StorageStreamHelper.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <daq/hbasf2/utils/StorageStreamHelper.h>
9 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
10 #include <framework/pcore/MsgHandler.h>
11 #include <daq/dataobjects/SendHeader.h>
12 #include <daq/dataobjects/SendTrailer.h>
13 
14 #include <TH1F.h>
15 #include <TFile.h>
16 #include <TDirectory.h>
17 #include <TKey.h>
18 #include <TClass.h>
19 #include <TSystem.h>
20 #include <TROOT.h>
21 #include <TBufferJSON.h>
22 
23 #include <lz4.h>
24 #include <zmq.hpp>
25 
26 #include <daq/storage/ONSENBinData.h>
27 
28 using namespace Belle2;
29 
31 {
32  //m_streamHelper.initialize(0, true);
33 
34  m_buf = new int [10000000];
35  m_data.setBuffer(m_buf);
36  m_data_hlt.setBuffer(NULL);
37  m_data_pxd.setBuffer(NULL);
38 }
39 
40 void StorageStreamHelper::registerStoreObjects(bool addExpressRecoObjects)
41 {
42  m_eventMetaData.registerInDataStore();
43  m_rawSVDs.registerInDataStore();
44  m_rawCDCs.registerInDataStore();
45  m_rawTOPs.registerInDataStore();
46  m_rawARICHs.registerInDataStore();
47  m_rawECLs.registerInDataStore();
48  m_rawKLMs.registerInDataStore();
49  m_rawTRGs.registerInDataStore();
50  m_rawFTSWs.registerInDataStore();
51 
52  //m_eventT0.registerInDataStore();
53  m_onlineEventT0.registerInDataStore();
54  //m_eventT0.isRequired();
55 
56  if (addExpressRecoObjects) {
57  m_randomGenerator.registerInDataStore(DataStore::c_DontWriteOut);
58  m_softwareTriggerResult.registerInDataStore();
59  m_softwareTriggerVariables.registerInDataStore();
60  m_triggerSummary.registerInDataStore();
61  m_rawPXDs.registerInDataStore();
62  m_roiPayload.registerInDataStore();
63  m_rois.registerInDataStore("ROIs");
64  }
65 
66  m_streamer = new DataStoreStreamer();
67 }
68 
69 void StorageStreamHelper::read(std::unique_ptr<ZMQNoIdMessage> message)
70 {
71  if (message->isMessage(EMessageTypes::c_eventMessage)) {
72  B2DEBUG(10, "EMessageTypes::c_eventMessage: begin");
73  m_streamHelper.read(std::move(message));
74  B2DEBUG(10, "EMessageTypes::c_eventMessage: end");
75  } else if (message->isMessage(EMessageTypes::c_rawDataMessage)) {
76  B2DEBUG(10, "EMessageTypes::c_rawDataMessage");
77  int* eventBuffer = message->getMessagePart<1>().data<int>();
78 
79  m_data.setBuffer(eventBuffer);
80 
81  int nboard = m_data.getNBoard();
82  /*
83  if (m_eb2 == 0 && nboard == 1) {
84  m_data_hlt.setBuffer(m_data.getBuffer());
85  } else if (m_eb2 > 0 || nboard > 1) {
86  m_data_hlt.setBuffer(m_data.getBody());
87  }
88  */
89 
90  m_data_hlt.setBuffer(m_data.getBody());
91 
92  EvtMessage* msg = new EvtMessage((char*)m_data_hlt.getBody());
93  if (msg->type() == MSG_TERMINATE) {
94  B2INFO("Got Termination message");
95  delete msg;
96  return;
97  }
98  m_streamer->restoreDataStore(msg);
99  delete msg;
100 
101  if (nboard > 1) {
102  unsigned int offset = m_data_hlt.getWordSize() + m_data.getHeaderWordSize();
103  for (int i = 0; i < nboard - 1; i++) {
104  m_data_pxd.setBuffer(m_data.getBuffer() + offset);
105  offset += m_data_pxd.getWordSize();
106  if (m_data_pxd.getBody()[0] != ONSENBinData::MAGIC) {
107  B2FATAL("Bad ONSEN magic for PXD = " << m_data_pxd.getTrailerMagic());
108  return;
109  } else if (m_data_pxd.getTrailerMagic() != BinData::TRAILER_MAGIC) {
110  B2FATAL("Bad tarailer magic for PXD = " << m_data_pxd.getTrailerMagic());
111  return;
112  }
113  if (m_data_pxd.getBuffer() != NULL) {
114  m_rawPXDs.appendNew((int*)m_data_pxd.getBody(), m_data_pxd.getBodyByteSize());
115  }
116  }
117  } else {
118  m_data_pxd.setBuffer(NULL);
119  }
120 
121  // m_eventMetaData.create();
122  m_eventMetaData->setExperiment(m_data_hlt.getExpNumber());
123  m_eventMetaData->setRun(m_data_hlt.getRunNumber());
124  m_eventMetaData->setSubrun(m_data_hlt.getSubNumber());
125  m_eventMetaData->setEvent(m_data_hlt.getEventNumber());
126  }
127 }
Stream/restore DataStore objects to/from EvtMessage.
int restoreDataStore(EvtMessage *msg)
Restore DataStore objects from EvtMessage.
@ c_DontWriteOut
Object/array should be NOT saved by output modules.
Definition: DataStore.h:71
Class to manage streamed object.
Definition: EvtMessage.h:59
ERecordType type() const
Get record type.
Definition: EvtMessage.cc:114
StoreArray< RawCDC > m_rawCDCs
Store Objects for HLT use.
void initialize()
Initialize this class. Call this e.g. in the first event.
void read(std::unique_ptr< ZMQNoIdMessage > message)
Read in a ZMQ message and rebuilt the data store from it.
StoreArray< RawTOP > m_rawTOPs
Store Objects for HLT use.
StoreArray< RawKLM > m_rawKLMs
Store Objects for HLT use.
StoreArray< ROIid > m_rois
Additional Store Objects for ExpressReco use.
StoreArray< RawSVD > m_rawSVDs
Store Objects for HLT use.
BinData m_data
Decoding input.
StoreArray< RawPXD > m_rawPXDs
Additional Store Objects for ExpressReco use.
StoreObjPtr< EventMetaData > m_eventMetaData
Store Objects for HLT use.
StoreObjPtr< TRGSummary > m_triggerSummary
Additional Store Objects for ExpressReco use.
StoreArray< RawFTSW > m_rawFTSWs
Store Objects for HLT use.
StoreArray< OnlineEventT0 > m_onlineEventT0
StoreArray of OnlineEventT0.
void registerStoreObjects(bool addExpressRecoObjects)
Register all needed store objects, either only the raw data, ROIs and event meta data (for HLT) or ad...
StreamHelper m_streamHelper
We use the framework stream helper.
StoreArray< RawECL > m_rawECLs
Store Objects for HLT use.
StoreArray< RawTRG > m_rawTRGs
Store Objects for HLT use.
StoreArray< RawARICH > m_rawARICHs
Store Objects for HLT use.
StoreObjPtr< SoftwareTriggerResult > m_softwareTriggerResult
Additional Store Objects for ExpressReco use.
StoreObjPtr< RandomGenerator > m_randomGenerator
Additional Store Objects for ExpressReco use.
StoreObjPtr< SoftwareTrigger::SoftwareTriggerVariables > m_softwareTriggerVariables
Additional Store Objects for ExpressReco use.
StoreObjPtr< ROIpayload > m_roiPayload
Store Objects for HLT use.
void read(std::unique_ptr< ZMQNoIdMessage > message)
Read in a ZMQ message and rebuilt the data store from it.
Definition: StreamHelper.cc:41
Abstract base class for different kinds of events.