Belle II Software  release-06-00-14
DQMHistAnalysisInputPVSrv.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 //+
9 // File : DQMHistAnalysisInputPVSrv.cc
10 // Description : DQM input module, convert epics PVs to histograms for analysis
11 //-
12 
13 
14 #include <framework/core/ModuleParam.templateDetails.h>
15 #include <dqm/analysis/modules/DQMHistAnalysisInputPVSrv.h>
16 #include <TSystem.h>
17 #include <TDirectory.h>
18 #include <TH1F.h>
19 #include <TH2F.h>
20 
21 using namespace std;
22 using namespace Belle2;
23 
24 //-----------------------------------------------------------------
25 // Register the Module
26 //-----------------------------------------------------------------
27 REG_MODULE(DQMHistAnalysisInputPVSrv)
28 
29 //-----------------------------------------------------------------
30 // Implementation
31 //-----------------------------------------------------------------
32 
33 #ifdef _BELLE2_EPICS
34 static void printChidInfo(chid ichid, const char* message)
35 {
36  B2DEBUG(20, message);
37  B2DEBUG(20, "pv: " << ca_name(ichid) << " type(" << ca_field_type(ichid) << ") nelements(" << ca_element_count(
38  ichid) << ") host(" << ca_host_name(ichid)
39  << ") read(" << ca_read_access(ichid) << ") write(" << ca_write_access(ichid) << ") state(" << ca_state(ichid) << ")");
40 }
41 
42 static void exceptionCallback(struct exception_handler_args args)
43 {
44  chid ichid = args.chid;
45  long stat = args.stat; /* Channel access status code*/
46  const char* channel;
47  const char* noname = "unknown";
48 
49  channel = (ichid ? ca_name(ichid) : noname);
50 
51 
52  if (ichid) printChidInfo(ichid, "exceptionCallback");
53  printf("exceptionCallback stat %s channel %s\n", ca_message(stat), channel);
54 }
55 
56 static void connectionCallback(struct connection_handler_args args)
57 {
58  chid ichid = args.chid;
59 
60  printChidInfo(ichid, "connectionCallback");
61 }
62 
63 static void accessRightsCallback(struct access_rights_handler_args args)
64 {
65  chid ichid = args.chid;
66 
67  printChidInfo(ichid, "accessRightsCallback");
68 }
69 static void eventCallback(struct event_handler_args eha)
70 {
71  chid ichid = eha.chid;
72  MYNODE* n = (MYNODE*)eha.usr;
73 
74  if (eha.status != ECA_NORMAL) {
75  printChidInfo(ichid, "eventCallback");
76  } else {
77 // char *pdata = (char *)eha.dbr;
78 // printf("Event Callback: %s = %s (%d,%d)\n",ca_name(eha.chid),pdata,(int)eha.type,(int)eha.count);
79 // printf("Event Callback: %s (%ld,%s,%ld)\n",ca_name(n->mychid),ca_field_type(n->mychid),dbr_type_to_text(ca_field_type(n->mychid)),ca_element_count(n->mychid));
80  n->changed = true;
81  }
82 }
83 
84 #endif
85 
86 DQMHistAnalysisInputPVSrvModule::DQMHistAnalysisInputPVSrvModule()
88 {
89  //Parameter definition
90  addParam("RefreshInterval", m_interval, "Refresh interval of histograms in ms", 2000);
91  addParam("HistoList", m_histlist, "pvname, histname, histtitle, (bins,min,max[,bins,min,max])");
92  addParam("Callback", m_callback, "Using EPICS callback for changes", true);
93  addParam("Server", m_server, "Start http server on port 8082", false);
94  B2DEBUG(20, "DQMHistAnalysisInputPVSrv: Constructor done.");
95 }
96 
97 
98 DQMHistAnalysisInputPVSrvModule::~DQMHistAnalysisInputPVSrvModule()
99 {
100 #ifdef _BELLE2_EPICS
101  if (ca_current_context()) ca_context_destroy();
102 #endif
103 }
104 
106 {
107  m_eventMetaDataPtr.registerInDataStore();
108  //if (m_server) m_serv = new THttpServer("http:8082");
109 
110 #ifdef _BELLE2_EPICS
111  if (!ca_current_context()) SEVCHK(ca_context_create(ca_disable_preemptive_callback), "ca_context_create");
112  SEVCHK(ca_add_exception_event(exceptionCallback, NULL), "ca_add_exception_event");
113  for (auto& it : m_histlist) {
114  if (it.size() != 4 && it.size() != 5) {
115  B2WARNING("Histolist with wrong nr of parameters " << it.size());
116  continue;
117  }
118  auto n = (MYNODE*) callocMustSucceed(1, sizeof(MYNODE), "caMonitor");
119  pmynode.push_back(n);
120 
121  {
122  TDirectory* oldDir = gDirectory;
123  TDirectory* d = oldDir;
124  TString myl = it.at(1).c_str();
125  TString tok;
126  Ssiz_t from = 0;
127  while (myl.Tokenize(tok, from, "/")) {
128  TString dummy;
129  Ssiz_t f;
130  f = from;
131  if (myl.Tokenize(dummy, f, "/")) { // check if its the last one
132  TDirectory* e;
133  e = d->GetDirectory(tok);
134  if (e) {
135  B2DEBUG(20, "Cd Dir " << tok);
136  d = e;
137  } else {
138  B2DEBUG(20, "Create Dir " << tok);
139  d = d->mkdir(tok);
140  }
141  d->cd();
142  } else {
143  break;
144  }
145  }
146 
147  B2DEBUG(20, "Create Histo " << tok);
148 
149  Int_t x;
150  Double_t xmin, xmax;
151  strncpy(n->name, it.at(0).c_str(), MAX_PV_NAME_LEN - 1);
152  istringstream is(it.at(3));
153  is >> x;
154  is >> xmin;
155  is >> xmax;
156  if (it.size() == 4) {
157  n->histo = (TH1*)new TH1F(tok, it.at(2).c_str(), x, xmin, xmax);
158  n->binx = x;
159  n->biny = 0;
160  n->binmax = x;
161  } else {
162  Int_t y;
163  Double_t ymin, ymax;
164  istringstream iss(it.at(4));
165  iss >> y;
166  iss >> ymin;
167  iss >> ymax;
168  n->histo = (TH1*)new TH2F(tok, it.at(2).c_str(), x, xmin, xmax, y, ymin, ymax);
169  n->binx = x;
170  n->biny = y;
171  n->binmax = x * y;
172  }
173 
174  // cd back to root directory
175  oldDir->cd();
176  }
177 
178  }
179 
180  for (auto n : pmynode) {
181  SEVCHK(ca_create_channel(n->name, connectionCallback, n, 20, &n->mychid), "ca_create_channel");
182  SEVCHK(ca_replace_access_rights_event(n->mychid, accessRightsCallback), "ca_replace_access_rights_event");
183  if (m_callback) {
184  SEVCHK(ca_create_subscription(DBR_STRING, 1, n->mychid, DBE_VALUE, eventCallback, n, &n->myevid), "ca_create_subscription");
185  }
186  }
187 
188 #endif
189  B2DEBUG(20, "DQMHistAnalysisInputPVSrv: initialized.");
190 }
191 
192 
194 {
195  B2DEBUG(20, "DQMHistAnalysisInputPVSrv: beginRun called.");
196 }
197 
199 {
200  m_count++;
201  m_eventMetaDataPtr.create();
202  m_eventMetaDataPtr->setExperiment(m_expno);
203  m_eventMetaDataPtr->setRun(m_runno);
204  m_eventMetaDataPtr->setEvent(m_count);
205 
206  TTimer t(m_interval, kFALSE);// in ms
207 
208 #ifdef _BELLE2_EPICS
209  SEVCHK(ca_pend_event(0.0001), "ca_pend_event");
210 
211  for (auto n : pmynode) {
212  if (m_callback && !n->changed) continue;
213  n->changed = false;
214 
215  auto bufferorg = new char[dbr_size_n(ca_field_type(n->mychid), ca_element_count(n->mychid))];
216  void* buffer = (void*) bufferorg;
217  int status;
218 
219  if (ca_field_type(n->mychid) != DBF_LONG && ca_field_type(n->mychid) != DBF_FLOAT) continue;
220  status = ca_array_get(ca_field_type(n->mychid), ca_element_count(n->mychid), n->mychid, buffer);
221  SEVCHK(status, "ca_array_get()");
222  status = ca_pend_io(15.0);
223  if (status != ECA_NORMAL) {
224  B2WARNING("EPICS ca_array_get " << ca_name(n->mychid) << " didn't return a value.");
225  } else {
226  if (!n->histo) {
227  // this should NEVER happen
228  continue;
229 // B2DEBUG(20, "Create Histo " << tok);
230 // n->histo=new TH1F(ca_name(n->mychid),ca_name(n->mychid),ca_element_count(n->mychid),0,ca_element_count(n->mychid));
231  }
232  unsigned int bins;
233  bins = ca_element_count(n->mychid) < n->binmax ? ca_element_count(n->mychid) : n->binmax;
234  TH1* histo = n->histo;
235  for (unsigned int j = ca_element_count(n->mychid); j < n->binmax; j++) histo->SetBinContent(j + 1, 0); // zero out undefined bins
236  switch (ca_field_type(n->mychid)) {
237  case DBF_CHAR: {
238  dbr_char_t* b = (dbr_char_t*)buffer;
239  for (unsigned int j = 0; j < bins; j++) {
240  histo->SetBinContent(j + 1, b[j]);
241  }
242  }; break;
243 // case DBF_INT:
244  case DBF_SHORT: { // same as INT
245  dbr_short_t* b = (dbr_short_t*)buffer;
246  for (unsigned int j = 0; j < bins; j++) {
247  histo->SetBinContent(j + 1, b[j]);
248  }
249  }; break;
250  case DBF_LONG: {
251  dbr_long_t* b = (dbr_long_t*)buffer;
252  for (unsigned int j = 0; j < bins; j++) {
253  histo->SetBinContent(j + 1, b[j]);
254  }
255  }; break;
256  case DBF_FLOAT: {
257  dbr_float_t* b = (dbr_float_t*)buffer;
258  for (unsigned int j = 0; j < bins; j++) {
259  histo->SetBinContent(j + 1, b[j]);
260  }
261  }; break;
262  case DBF_DOUBLE: {
263  dbr_double_t* b = (dbr_double_t*)buffer;
264  for (unsigned int j = 0; j < bins; j++) {
265  histo->SetBinContent(j + 1, b[j]);
266  }
267  }; break;
268  default:
269  // type not supported
270  break;
271  }
272  }
273  delete[] bufferorg;
274  }
275 #endif
276  do { // call at least once!
277  //if (m_serv) m_serv->ProcessRequests();
278  gSystem->Sleep(10); // 10 ms sleep
279  } while (!t.CheckTimer(gSystem->Now()));
280 
281 }
282 
284 {
285  B2DEBUG(20, "DQMHistAnalysisInputPVSrv: endRun called");
286 }
287 
288 
290 {
291  B2DEBUG(20, "DQMHistAnalysisInputPVSrv: terminate called");
292 }
293 
294 
295 
virtual void initialize() override
Module functions to be called from main process.
virtual void event() override
This method is the core of the module.
virtual void endRun() override
This method is called if the current run ends.
virtual void terminate() override
This method is called at the end of the event processing.
virtual void beginRun() override
Module functions to be called from event process.
bool m_server
Whether to start http server on port 8082.
std::vector< std::vector< std::string > > m_histlist
Parameter list for histograms.
StoreObjPtr< EventMetaData > m_eventMetaDataPtr
The metadata for each event.
bool m_callback
Whether to use EPICS callback for changes.
The base class for the histogram analysis module.
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition: Module.h:560
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.