Belle II Software  release-08-01-10
ROISenderModule.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <tracking/modules/pxdDataReduction/ROISenderModule.h>
10 #include <sys/stat.h>
11 #include <chrono>
12 
13 using namespace Belle2;
14 
15 //-----------------------------------------------------------------
16 // Register the Module
17 //-----------------------------------------------------------------
18 
19 REG_MODULE(ROISender);
20 
21 //-----------------------------------------------------------------
22 // Implementation
23 //-----------------------------------------------------------------
24 
26  Module(), m_messageQueueNameCstring(nullptr)
27 {
28  //Set module properties
29  setDescription("Send the ROI payload to the external ring buffer");
30 
31  addParam("MessageQueueName", m_messageQueueName, "name of the output message queue", std::string("/roi"));
32  addParam("ROIpayloadName", m_ROIpayloadName, "name of the payload of ROIs", std::string(""));
33  addParam("MessageQueueDepth", m_messageQueueDepth, "depth of the output message queue", 10);
34  addParam("MessageSize", m_messageQueueMsgSize, "max size of themessage", 8192);
35 
36  B2INFO("ROI Sender created");
37 }
38 
39 
40 void
42 {
43  // Required input
44  m_eventMetaData.isRequired();
45  m_roiPayload.isRequired(m_ROIpayloadName);
46 
48 
49  bool slashFree = (nullptr == strchr(m_messageQueueNameCstring + 1, '/'));
50 
51  if (! slashFree || m_messageQueueNameCstring[0] != '/')
52  B2FATAL(__FILE__ << ":" << __LINE__ <<
53  m_messageQueueName << " invalid. cfr: man mq_overview ");
54 
55  m_histo.resize(101, 0); // 0-99, 100 is used as overflow bin
56 
57  // unlinkMessageQueue("on initialize");
58  openMessageQueue("on initialize");
59 
60 }
61 
62 
63 void
65 {
66 
67  int length = m_roiPayload->getPacketLengthByte();
68  const char* data = (const char*) m_roiPayload->getRootdata();
69 
70  mqd_t ret;
71 
72  if (length <= m_messageQueueMsgSize) {
73  ret = mq_send(m_messageQueue, data, length, 0 /* priority */);
74 
75  if (ret == (mqd_t) - 1) {
76  B2FATAL(std::string(__FILE__) << ":" << __LINE__ <<
77  ": error: " <<
78  strerror(errno) <<
79  " on mq_send");
80  }
81  } else {
82  B2FATAL(std::string(__FILE__) << ":" << __LINE__ <<
83  "ROI payload too long." << std::endl <<
84  "Payload length = " << length << std::endl <<
85  "Message max length = " << m_messageQueueMsgSize << std::endl <<
86  "We stop here, as this will result in event mismatch on EB! Please increase mqueue message length on HLT and/or check size limit in ROIPayload Assembler"
87  << std::endl);
88  }
89 
90  // Calculate the time difference between now and the trigger time
91  // This tells you how much delay we have summed up (it is NOT the processing time!)
93  unsigned long long int meta_time = m_eventMetaData->getTime();
94 
95  using namespace std::chrono;
96  nanoseconds ns = duration_cast< nanoseconds >(system_clock::now().time_since_epoch());
97  Float_t deltaT = (std::chrono::duration_cast<seconds> (ns - (nanoseconds)meta_time)).count();
98  if (deltaT < 0) {
99  m_histo[0]++;// just in case the clocks are slightly out of sync
100  } else if (deltaT < 100) {
101  m_histo[int(deltaT)]++;
102  } else {
103  m_histo[100]++;// overflow bin
104  }
105  if (deltaT > 60) {
106  B2ERROR("Event took too long on HLT, PXD data for Event might be lost!" << LogVar("deltaT in s", deltaT));
107  } else if (deltaT > 30) {
108  B2WARNING("Event took too long on HLT, PXD data for Event might be lost!" << LogVar("deltaT in s", deltaT));
109  }
110 }
111 
112 
113 
114 
115 void
117 {
118  closeMessageQueue("on terminate");
119  // unlinkMessageQueue("on terminate");
120  std::string str = "HLT Delay time distribution: ( ";
121  for (auto& a : m_histo) str += std::to_string(a) + ";";
122  str += " )";
123  B2RESULT(str);
124 }
125 
126 void
127 ROISenderModule::openMessageQueue(const char* log_string)
128 {
129  struct mq_attr attr;
130  attr.mq_flags = 0;
131  attr.mq_maxmsg = m_messageQueueDepth;
132  attr.mq_msgsize = m_messageQueueMsgSize; // bytes
133 
134  int oflags = O_WRONLY ; //| O_CREAT | O_EXCL;
135  mode_t mode = S_IRUSR | S_IWUSR | S_IROTH | S_IRGRP ;
136 
137 
138  mqd_t ret = mq_open(m_messageQueueNameCstring, oflags, mode, &attr);
139 
140 
141  if (ret == (mqd_t) - 1)
142  B2FATAL(std::string(__FILE__) << ":" <<
143  __LINE__ << ": error: " <<
144  strerror(errno) <<
145  " on mq_open " << log_string);
146 
147  m_messageQueue = ret;
148 
149 }
150 
151 void
152 ROISenderModule::closeMessageQueue(const char* log_string)
153 {
154  mqd_t ret;
155 
156  ret = mq_close(m_messageQueue);
157  if (ret == (mqd_t) - 1)
158  B2WARNING(std::string(__FILE__) << ":" <<
159  __LINE__ << ": error: " <<
160  strerror(errno) <<
161  " on mq_close " << log_string);
162 
163 }
164 
165 
167 void
168 ROISenderModule::unlinkMessageQueue(const char* log_string)
169 {
170  mqd_t ret;
171  ret = mq_unlink(m_messageQueueNameCstring);
172  if (ret == (mqd_t) - 1)
173  B2WARNING(std::string(__FILE__) << ":" <<
174  __LINE__ << ": error: " <<
175  strerror(errno) <<
176  " on mq_unlink " << log_string);
177 
178 }
179 
Base class for Modules.
Definition: Module.h:72
void setDescription(const std::string &description)
Sets the description of the module.
Definition: Module.cc:214
void openMessageQueue(const char *log_string)
open message queue
void initialize() override final
Initializes the Module.
void unlinkMessageQueue(const char *log_string)
unlink message queue
std::string m_messageQueueName
message queue name
std::string m_ROIpayloadName
ROI payload name.
const char * m_messageQueueNameCstring
message queue name c string
ROISenderModule()
Constructor of the module.
int m_messageQueueMsgSize
message queue message size
int m_messageQueueDepth
message queue depth
mqd_t m_messageQueue
message queue
void terminate() override final
Termination action.
StoreObjPtr< EventMetaData > m_eventMetaData
Input ptr for EventMetaData.
void event() override final
This method is the core of the module.
void closeMessageQueue(const char *log_string)
close message queue
std::vector< int > m_histo
poor mans histogramming in a vector
StoreObjPtr< ROIpayload > m_roiPayload
Input ptr for RoiPayload.
Class to store variables with their name which were sent to the logging service.
REG_MODULE(arichBtest)
Register the Module.
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition: Module.h:560
Abstract base class for different kinds of events.