Belle II Software  release-08-01-10
DQMHistAnalysisInputPVSrv.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 //+
9 // File : DQMHistAnalysisInputPVSrv.cc
10 // Description : DQM input module, convert epics PVs to histograms for analysis
11 //-
12 
13 
14 #include <framework/core/ModuleParam.templateDetails.h>
15 #include <dqm/analysis/modules/DQMHistAnalysisInputPVSrv.h>
16 #include <TSystem.h>
17 #include <TDirectory.h>
18 #include <TH1F.h>
19 #include <TH2F.h>
20 
21 using namespace std;
22 using namespace Belle2;
23 
24 //-----------------------------------------------------------------
25 // Register the Module
26 //-----------------------------------------------------------------
27 REG_MODULE(DQMHistAnalysisInputPVSrv);
28 
29 //-----------------------------------------------------------------
30 // Implementation
31 //-----------------------------------------------------------------
32 
33 #ifdef _BELLE2_EPICS
34 static void printChidInfo(chid ichid, const char* message)
35 {
36  B2DEBUG(20, message);
37  B2DEBUG(20, "pv: " << ca_name(ichid) << " type(" << ca_field_type(ichid) << ") nelements(" << ca_element_count(
38  ichid) << ") host(" << ca_host_name(ichid)
39  << ") read(" << ca_read_access(ichid) << ") write(" << ca_write_access(ichid) << ") state(" << ca_state(ichid) << ")");
40 }
41 
42 static void exceptionCallback(struct exception_handler_args args)
43 {
44  chid ichid = args.chid;
45  long stat = args.stat; /* Channel access status code*/
46  const char* channel;
47  const char* noname = "unknown";
48 
49  channel = (ichid ? ca_name(ichid) : noname);
50 
51 
52  if (ichid) printChidInfo(ichid, "exceptionCallback");
53  printf("exceptionCallback stat %s channel %s\n", ca_message(stat), channel);
54 }
55 
56 static void connectionCallback(struct connection_handler_args args)
57 {
58  chid ichid = args.chid;
59 
60  printChidInfo(ichid, "connectionCallback");
61 }
62 
63 static void accessRightsCallback(struct access_rights_handler_args args)
64 {
65  chid ichid = args.chid;
66 
67  printChidInfo(ichid, "accessRightsCallback");
68 }
69 static void eventCallback(struct event_handler_args eha)
70 {
71  chid ichid = eha.chid;
72  MYNODE* n = (MYNODE*)eha.usr;
73 
74  if (eha.status != ECA_NORMAL) {
75  printChidInfo(ichid, "eventCallback");
76  } else {
77 // char *pdata = (char *)eha.dbr;
78 // printf("Event Callback: %s = %s (%d,%d)\n",ca_name(eha.chid),pdata,(int)eha.type,(int)eha.count);
79 // printf("Event Callback: %s (%ld,%s,%ld)\n",ca_name(n->mychid),ca_field_type(n->mychid),dbr_type_to_text(ca_field_type(n->mychid)),ca_element_count(n->mychid));
80  n->changed = true;
81  }
82 }
83 
84 #endif
85 
86 DQMHistAnalysisInputPVSrvModule::DQMHistAnalysisInputPVSrvModule()
88 {
89  //Parameter definition
90  addParam("RefreshInterval", m_interval, "Refresh interval of histograms in ms", 2000);
91  addParam("HistoList", m_histlist, "pvname, histname, histtitle, (bins,min,max[,bins,min,max])");
92  addParam("Callback", m_callback, "Using EPICS callback for changes", true);
93  addParam("Server", m_server, "Start http server on port 8082", false);
94  B2DEBUG(20, "DQMHistAnalysisInputPVSrv: Constructor done.");
95 }
96 
97 
99 {
100 #ifdef _BELLE2_EPICS
101  if (ca_current_context()) ca_context_destroy();
102 #endif
103 }
104 
106 {
107  m_eventMetaDataPtr.registerInDataStore();
108  //if (m_server) m_serv = new THttpServer("http:8082");
109 
110 #ifdef _BELLE2_EPICS
111  if (!ca_current_context()) SEVCHK(ca_context_create(ca_disable_preemptive_callback), "ca_context_create");
112  SEVCHK(ca_add_exception_event(exceptionCallback, NULL), "ca_add_exception_event");
113  for (auto& it : m_histlist) {
114  if (it.size() != 4 && it.size() != 5) {
115  B2WARNING("Histolist with wrong nr of parameters " << it.size());
116  continue;
117  }
118  auto n = (MYNODE*) callocMustSucceed(1, sizeof(MYNODE), "caMonitor");
119  pmynode.push_back(n);
120 
121  {
122  TDirectory* oldDir = gDirectory;
123  TDirectory* d = oldDir;
124  TString myl = it.at(1).c_str();
125  TString tok;
126  Ssiz_t from = 0;
127  while (myl.Tokenize(tok, from, "/")) {
128  TString dummy;
129  Ssiz_t f;
130  f = from;
131  if (myl.Tokenize(dummy, f, "/")) { // check if its the last one
132  TDirectory* e;
133  e = d->GetDirectory(tok);
134  if (e) {
135  B2DEBUG(20, "Cd Dir " << tok);
136  d = e;
137  } else {
138  B2DEBUG(20, "Create Dir " << tok);
139  d = d->mkdir(tok);
140  }
141  d->cd();
142  } else {
143  break;
144  }
145  }
146 
147  B2DEBUG(20, "Create Histo " << tok);
148 
149  Int_t x;
150  Double_t xmin, xmax;
151  strncpy(n->name, it.at(0).c_str(), MAX_PV_NAME_LEN - 1);
152  istringstream is(it.at(3));
153  is >> x;
154  is >> xmin;
155  is >> xmax;
156  if (it.size() == 4) {
157  n->histo = (TH1*)new TH1F(tok, it.at(2).c_str(), x, xmin, xmax);
158  n->binx = x;
159  n->biny = 0;
160  n->binmax = x;
161  } else {
162  Int_t y;
163  Double_t ymin, ymax;
164  istringstream iss(it.at(4));
165  iss >> y;
166  iss >> ymin;
167  iss >> ymax;
168  n->histo = (TH1*)new TH2F(tok, it.at(2).c_str(), x, xmin, xmax, y, ymin, ymax);
169  n->binx = x;
170  n->biny = y;
171  n->binmax = x * y;
172  }
173 
174  // cd back to root directory
175  oldDir->cd();
176  }
177 
178  }
179 
180  for (auto n : pmynode) {
181  SEVCHK(ca_create_channel(n->name, connectionCallback, n, 20, &n->mychid), "ca_create_channel");
182  SEVCHK(ca_replace_access_rights_event(n->mychid, accessRightsCallback), "ca_replace_access_rights_event");
183  if (m_callback) {
184  SEVCHK(ca_create_subscription(DBR_STRING, 1, n->mychid, DBE_VALUE, eventCallback, n, &n->myevid), "ca_create_subscription");
185  }
186  }
187 
188 #endif
189  B2DEBUG(20, "DQMHistAnalysisInputPVSrv: initialized.");
190 }
191 
192 
194 {
195  B2DEBUG(20, "DQMHistAnalysisInputPVSrv: beginRun called.");
196 }
197 
199 {
200  m_count++;
201  m_eventMetaDataPtr.create();
202  m_eventMetaDataPtr->setExperiment(m_expno);
203  m_eventMetaDataPtr->setRun(m_runno);
204  m_eventMetaDataPtr->setEvent(m_count);
205 
206  TTimer t(m_interval, kFALSE);// in ms
207 
208 #ifdef _BELLE2_EPICS
209  SEVCHK(ca_pend_event(0.0001), "ca_pend_event");
210 
211  for (auto n : pmynode) {
212  if (m_callback && !n->changed) continue;
213  n->changed = false;
214  if (ca_field_type(n->mychid) != DBF_LONG && ca_field_type(n->mychid) != DBF_FLOAT) continue;
215 
216  // FIXME: dbr_size_n is a preprocessor macro, it would be nice replacing it with something better
217 #pragma GCC diagnostic push
218 #pragma GCC diagnostic ignored "-Wtype-limits"
219  auto bufferorg = new char[dbr_size_n(ca_field_type(n->mychid), ca_element_count(n->mychid))];
220 #pragma GCC diagnostic pop
221 
222  void* buffer = (void*) bufferorg;
223  int status;
224 
225  status = ca_array_get(ca_field_type(n->mychid), ca_element_count(n->mychid), n->mychid, buffer);
226  SEVCHK(status, "ca_array_get()");
227  status = ca_pend_io(15.0);
228  if (status != ECA_NORMAL) {
229  B2WARNING("EPICS ca_array_get " << ca_name(n->mychid) << " didn't return a value.");
230  } else {
231  if (n->histo) {
232  // this should always be the case
233  unsigned int bins;
234  bins = ca_element_count(n->mychid) < n->binmax ? ca_element_count(n->mychid) : n->binmax;
235  TH1* histo = n->histo;
236  for (unsigned int j = ca_element_count(n->mychid); j < n->binmax; j++) histo->SetBinContent(j + 1, 0); // zero out undefined bins
237  switch (ca_field_type(n->mychid)) {
238  case DBF_CHAR: {
239  dbr_char_t* b = (dbr_char_t*)buffer;
240  for (unsigned int j = 0; j < bins; j++) {
241  histo->SetBinContent(j + 1, b[j]);
242  }
243  }; break;
244 // case DBF_INT:
245  case DBF_SHORT: { // same as INT
246  dbr_short_t* b = (dbr_short_t*)buffer;
247  for (unsigned int j = 0; j < bins; j++) {
248  histo->SetBinContent(j + 1, b[j]);
249  }
250  }; break;
251  case DBF_LONG: {
252  dbr_long_t* b = (dbr_long_t*)buffer;
253  for (unsigned int j = 0; j < bins; j++) {
254  histo->SetBinContent(j + 1, b[j]);
255  }
256  }; break;
257  case DBF_FLOAT: {
258  dbr_float_t* b = (dbr_float_t*)buffer;
259  for (unsigned int j = 0; j < bins; j++) {
260  histo->SetBinContent(j + 1, b[j]);
261  }
262  }; break;
263  case DBF_DOUBLE: {
264  dbr_double_t* b = (dbr_double_t*)buffer;
265  for (unsigned int j = 0; j < bins; j++) {
266  histo->SetBinContent(j + 1, b[j]);
267  }
268  }; break;
269  default:
270  // type not supported
271  break;
272  }
273  }
274  }
275  delete[] bufferorg;
276  }
277 #endif
278  do { // call at least once!
279  //if (m_serv) m_serv->ProcessRequests();
280  gSystem->Sleep(10); // 10 ms sleep
281  } while (!t.CheckTimer(gSystem->Now()));
282 
283 }
284 
286 {
287  B2DEBUG(20, "DQMHistAnalysisInputPVSrv: endRun called");
288 }
289 
290 
292 {
293  B2DEBUG(20, "DQMHistAnalysisInputPVSrv: terminate called");
294 }
295 
296 
297 
void initialize() override final
Definition of the histograms.
void terminate() override final
This method is called at the end of the event processing.
void event() override final
This method is called for each event.
void endRun() override final
This method is called if the current run ends.
bool m_server
Whether to start http server on port 8082.
std::vector< std::vector< std::string > > m_histlist
Parameter list for histograms.
void beginRun() override final
Called when entering a new run.
StoreObjPtr< EventMetaData > m_eventMetaDataPtr
The metadata for each event.
bool m_callback
Whether to use EPICS callback for changes.
The base class for the histogram analysis module.
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition: Module.h:560
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.