Belle II Software  release-06-01-15
ROISenderModule.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <tracking/modules/pxdDataReduction/ROISenderModule.h>
10 #include <sys/stat.h>
11 #include <chrono>
12 
13 using namespace std;
14 using namespace Belle2;
15 
16 //-----------------------------------------------------------------
17 // Register the Module
18 //-----------------------------------------------------------------
19 
20 REG_MODULE(ROISender)
21 
22 //-----------------------------------------------------------------
23 // Implementation
24 //-----------------------------------------------------------------
25 
27  Module(), m_messageQueueNameCstring(nullptr)
28 {
29  //Set module properties
30  setDescription("Send the ROI payload to the external ring buffer");
31 
32  addParam("MessageQueueName", m_messageQueueName, "name of the output message queue", std::string("/roi"));
33  addParam("ROIpayloadName", m_ROIpayloadName, "name of the payload of ROIs", std::string(""));
34  addParam("MessageQueueDepth", m_messageQueueDepth, "depth of the output message queue", 10);
35  addParam("MessageSize", m_messageQueueMsgSize, "max size of themessage", 8192);
36 
37  B2INFO("ROI Sender created");
38 }
39 
40 
41 void
42 ROISenderModule::initialize()
43 {
44  // Required input
45  m_eventMetaData.isRequired();
46  m_roiPayload.isRequired(m_ROIpayloadName);
47 
48  m_messageQueueNameCstring = m_messageQueueName.c_str();
49 
50  bool slashFree = (nullptr == strchr(m_messageQueueNameCstring + 1 , '/'));
51 
52  if (! slashFree || m_messageQueueNameCstring[0] != '/')
53  B2FATAL(__FILE__ << ":" << __LINE__ <<
54  m_messageQueueName << " invalid. cfr: man mq_overview ");
55 
56  m_histo.resize(101, 0); // 0-99, 100 is used as overflow bin
57 
58  // unlinkMessageQueue("on initialize");
59  openMessageQueue("on initialize");
60 
61 }
62 
63 
64 void
65 ROISenderModule::event()
66 {
67 
68  int length = m_roiPayload->getPacketLengthByte();
69  const char* data = (const char*) m_roiPayload->getRootdata();
70 
71  mqd_t ret;
72 
73  if (length <= m_messageQueueMsgSize) {
74  ret = mq_send(m_messageQueue, data, length, 0 /* priority */);
75 
76  if (ret == (mqd_t) - 1) {
77  B2FATAL(std::string(__FILE__) << ":" << __LINE__ <<
78  ": error: " <<
79  strerror(errno) <<
80  " on mq_send");
81  }
82  } else {
83  B2FATAL(std::string(__FILE__) << ":" << __LINE__ <<
84  "ROI payload too long." << endl <<
85  "Payload length = " << length << endl <<
86  "Message max length = " << m_messageQueueMsgSize << endl <<
87  "We stop here, as this will result in event mismatch on EB! Please increase mqueue message length on HLT and/or check size limit in ROIPayload Assembler"
88  << endl);
89  }
90 
91  // Calculate the time difference between now and the trigger time
92  // This tells you how much delay we have summed up (it is NOT the processing time!)
94  unsigned long long int meta_time = m_eventMetaData->getTime();
95 
96  using namespace std::chrono;
97  nanoseconds ns = duration_cast< nanoseconds >(system_clock::now().time_since_epoch());
98  Float_t deltaT = (std::chrono::duration_cast<seconds> (ns - (nanoseconds)meta_time)).count();
99  if (deltaT < 0) {
100  m_histo[0]++;// just in case the clocks are slightly out of sync
101  } else if (deltaT < 100) {
102  m_histo[int(deltaT)]++;
103  } else {
104  m_histo[100]++;// overflow bin
105  }
106  if (deltaT > 60) {
107  B2ERROR("Event took too long on HLT, PXD data for Event might be lost!" << LogVar("deltaT in s", deltaT));
108  } else if (deltaT > 30) {
109  B2WARNING("Event took too long on HLT, PXD data for Event might be lost!" << LogVar("deltaT in s", deltaT));
110  }
111 }
112 
113 
114 
115 
116 void
117 ROISenderModule::terminate()
118 {
119  closeMessageQueue("on terminate");
120  // unlinkMessageQueue("on terminate");
121  string str = "HLT Delay time distribution: ( ";
122  for (auto& a : m_histo) str += to_string(a) + ";";
123  str += " )";
124  B2RESULT(str);
125 }
126 
127 void
128 ROISenderModule::openMessageQueue(const char* log_string)
129 {
130  struct mq_attr attr;
131  attr.mq_flags = 0;
132  attr.mq_maxmsg = m_messageQueueDepth;
133  attr.mq_msgsize = m_messageQueueMsgSize; // bytes
134 
135  int oflags = O_WRONLY ; //| O_CREAT | O_EXCL;
136  mode_t mode = S_IRUSR | S_IWUSR | S_IROTH | S_IRGRP ;
137 
138 
139  mqd_t ret = mq_open(m_messageQueueNameCstring, oflags, mode, &attr);
140 
141 
142  if (ret == (mqd_t) - 1)
143  B2FATAL(std::string(__FILE__) << ":" <<
144  __LINE__ << ": error: " <<
145  strerror(errno) <<
146  " on mq_open " << log_string);
147 
148  m_messageQueue = ret;
149 
150 }
151 
152 void
153 ROISenderModule::closeMessageQueue(const char* log_string)
154 {
155  mqd_t ret;
156 
157  ret = mq_close(m_messageQueue);
158  if (ret == (mqd_t) - 1)
159  B2WARNING(std::string(__FILE__) << ":" <<
160  __LINE__ << ": error: " <<
161  strerror(errno) <<
162  " on mq_close " << log_string);
163 
164 }
165 
166 
168 void
169 ROISenderModule::unlinkMessageQueue(const char* log_string)
170 {
171  mqd_t ret;
172  ret = mq_unlink(m_messageQueueNameCstring);
173  if (ret == (mqd_t) - 1)
174  B2WARNING(std::string(__FILE__) << ":" <<
175  __LINE__ << ": error: " <<
176  strerror(errno) <<
177  " on mq_unlink " << log_string);
178 
179 }
180 
Base class for Modules.
Definition: Module.h:72
The ROI to ONSEN Module.
Class to store variables with their name which were sent to the logging service.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.