Belle II Software  release-08-01-10
DqmHistoManagerModule.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <daq/dqm/modules/DqmHistoManagerModule.h>
10 
11 #include <framework/core/Environment.h>
12 #include <framework/pcore/ProcHandler.h>
13 #include <framework/pcore/RbTuple.h>
14 
15 #include "TKey.h"
16 #include "TText.h"
17 
18 using namespace Belle2;
19 using namespace std;
20 
21 //-----------------------------------------------------------------
22 // Register the Module
23 //-----------------------------------------------------------------
24 REG_MODULE(DqmHistoManager)
25 
26 //-----------------------------------------------------------------
27 // Implementation
28 //-----------------------------------------------------------------
29 
30 // Implementations
31 DqmHistoManagerModule::DqmHistoManagerModule() : Module(), m_initmain(false), m_initialized(false)
32 {
33  // Module description
34  setDescription("Module to manage histograms/Ntuples/TTrees");
35  setPropertyFlags(Module::c_HistogramManager);
36 
37  // Parameters
38  addParam("histoFileName", m_histfile, "Name of histogram output file.", string("histofile.root"));
39  addParam("workDirName", m_workdir, "Name of working directory", string("."));
40  addParam("HostName", m_hostname, "Name of host to send histograms", string("localhost"));
41  addParam("Port", m_port, "Socket port number to connect", DQM_SOCKET);
42  addParam("DumpInterval", m_interval, "Interval to dump histos", 1000);
43  addParam("WriteInterval", m_dumpinterval, "Interval to write file", 10000);
44 }
45 
46 DqmHistoManagerModule::~DqmHistoManagerModule()
47 {
48  // if (m_initmain) {
49  // if (ProcHandler::EvtProcID() == -1) { // should be called from main proc.
50  // cout << "DqmHistoManager:: destructor called from pid=" << ProcHandler::EvtProcID() << endl;
51  // if (Environment::Instance().getNumberProcesses() > 0 && ProcHandler::EvtProcID() == -1) {
52  /*
53  if (Environment::Instance().getNumberProcesses() > 0) {
54  cout << "DqmHistoManager:: adding histogram files" << endl;
55  RbTupleManager::Instance().hadd();
56  cout << "DqmHistoManager:: adding histogram files done" << endl;
57  }
58  */
59  // }
60 }
61 
63 {
64  RbTupleManager::Instance().init(Environment::Instance().getNumberProcesses(), m_histfile.c_str(), m_workdir.c_str());
65 
66  m_initmain = true;
67  // cout << "DqmHistoManager::initialization done" << endl;
68 
69  // Connect to Histogram Server
70  // m_sock = new EvtSocketSend(m_hostname, m_port);
71  // printf("EvtSocketSend : fd = %d\n", (m_sock->sock())->sock());
72 
73  // Message Handler
74  m_msg = new MsgHandler(0); // Compression level = 0
75 
76  // Clear event counter
77  m_nevent = 0;
78 }
79 
81 {
82  if (!m_initialized) {
83  // cout << "DqmHistoManager:: first pass in beginRun() : proc="
84  // << ProcHandler::EvtProcID() << endl;
86  // Connect to Histogram Server
87  m_sock = new EvtSocketSend(m_hostname, m_port);
88  printf("EvtSocketSend (Proc %d) : fd = %d\n", ProcHandler::EvtProcID(),
89  (m_sock->sock())->sock());
90 
91  m_pstep = (ProcHandler::numEventProcesses() != 0) ? m_interval / ProcHandler::numEventProcesses() : m_interval;
92  m_dstep = (ProcHandler::numEventProcesses() != 0) ? m_dumpinterval / ProcHandler::numEventProcesses() : m_dumpinterval;
93 
94  m_ptime = time(NULL);
95  m_dtime = m_ptime;
96  if (ProcHandler:: EvtProcID() < 100) {
97  m_ptime -= m_pstep * ProcHandler::EvtProcID();
98  m_dtime += m_dstep * ProcHandler::EvtProcID();
99  }
100  m_initialized = true;
101  }
102 }
103 
105 {
106  if (!m_initialized) {
107  // cout << "DqmHistoManager:: first pass in endRun(): proc="
108  // << ProcHandler::EvtProcID() << endl;
110  m_initialized = true;
111  }
112 }
113 
115 {
116  if (!m_initialized) {
117  // cout << "DqmHistoManager:: first pass in event() : proc="
118  // << ProcHandler::EvtProcID() << endl;
120  m_initialized = true;
121  }
122 
123  time_t ctime = time(NULL);
124 
125  // Transfer hitograms over network
126  if ((ctime - m_ptime) > m_interval) {
127  // printf ( "DqmHistoManager: event = %d\n", m_nevent );
128  // printf ( "DqmHistoManger: dumping histos.....\n" );
129  m_msg->clear();
130  m_nobjs = 0;
131  // Stream histograms with directory structure
132  StreamHistograms(gDirectory, m_msg);
133 
134  EvtMessage* msg = m_msg->encode_msg(MSG_EVENT);
135  // printf ( "Message Size = %d\n", msg->size() );
136 
137  printf("DqmHistoManger(proc:%d): dumping histos.....%d histos\n",
138  ProcHandler::EvtProcID(), m_nobjs);
139  fflush(stdout);
140 
141  (msg->header())->reserved[0] = 0;
142  (msg->header())->reserved[1] = m_nobjs;
143  (msg->header())->reserved[2] = 0;
144  if (m_nobjs > 0) {
145  m_sock->send(msg);
146  }
147  delete(msg);
148  m_ptime = ctime;
149  }
150  // Dump histograms to file
151  if ((ctime - m_dtime) > m_dumpinterval) {
152  // Dump histograms to file -> Turned Off
153  // RbTupleManager::Instance().dump();
154  m_dtime = ctime;
155  }
156 
157  m_nevent++;
158 
159 }
160 
162 {
163  // if ( ProcHandler::EvtProcID() >= 10000 ) {
164  // return;
165  // }
166  if (m_initialized) {
167  // cout << "DqmHistoManager::terminating event process : PID=" << ProcHandler::EvtProcID() << endl;
168  m_msg->clear();
169 
170  m_nobjs = 0;
171  StreamHistograms(gDirectory, m_msg);
172 
173  printf("terminate : m_nobjs = %d\n", m_nobjs);
174  EvtMessage* msg = m_msg->encode_msg(MSG_EVENT);
175  (msg->header())->reserved[0] = 0;
176  (msg->header())->reserved[1] = m_nobjs;
177  (msg->header())->reserved[2] = 0;
178 
179  //m_sock->send(msg);
180 
181  delete(msg);
182 
183  // Dump hitograms to file -> Turned Off (
184  // RbTupleManager::Instance().dump();
185 
186  // RbTupleManager::Instance().terminate(); <- not used
187  // delete m_sock;
188  // delete m_msg;
189  }
190 }
191 
192 int DqmHistoManagerModule::StreamHistograms(TDirectory* curdir, MsgHandler* msg)
193 {
194  TList* keylist = curdir->GetList();
195  // keylist->ls();
196 
197  TIter nextkey(keylist);
198  TKey* key = 0;
199  int nkeys = 0;
200  int nobjs = 0;
201  while ((key = (TKey*)nextkey())) {
202  nkeys++;
203  TObject* obj = curdir->FindObject(key->GetName());
204  if (obj->IsA()->InheritsFrom("TH1")) {
205  TH1* h1 = (TH1*) obj;
206  // printf ( "Key = %s, entry = %f\n", key->GetName(), h1->GetEntries() );
207  // if (h1->GetEntries() > 0) { // Do not send empty histograms
208  m_msg->add(h1, h1->GetName());
209  nobjs++;
210  m_nobjs++;
211  // }
212  } else if (obj->IsA()->InheritsFrom(TDirectory::Class())) {
213  // printf ( "New directory found %s, Go into subdir\n", obj->GetName() );
214  TDirectory* tdir = (TDirectory*) obj;
215  // m_msg->add(tdir, tdir->GetName());
216  TText subdir(0, 0, tdir->GetName());
217  m_msg->add(&subdir, "SUBDIR:" + string(obj->GetName())) ;
218  nobjs++;
219  m_nobjs++;
220  tdir->cd();
221  StreamHistograms(tdir , msg);
222  TText command(0, 0, "COMMAND:EXIT");
223  m_msg->add(&command, "SUBDIR:EXIT");
224  nobjs++;
225  m_nobjs++;
226  curdir->cd();
227  }
228  }
229  return 0;
230 }
231 
232 
233 
234 
Class definition of DqmHistoManager module.
void initialize() override
module functions
void event() override
This method is the core of the module.
void endRun() override
This method is called if the current run ends.
void terminate() override
This method is called at the end of the event processing.
void beginRun() override
Called when entering a new run.
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
Class to manage streamed object.
Definition: EvtMessage.h:59
EvtHeader * header()
Get pointer to EvtHeader.
Definition: EvtMessage.cc:161
Base class for Modules.
Definition: Module.h:72
@ c_HistogramManager
This module is used to manage histograms accumulated by other modules.
Definition: Module.h:81
A class to encode/decode an EvtMessage.
Definition: MsgHandler.h:103
static int EvtProcID()
Return ID of the current process.
Definition: ProcHandler.cc:248
static int numEventProcesses()
Return number of worker processes (configured value, not current)
Definition: ProcHandler.cc:234
static RbTupleManager & Instance()
Access to singleton.
Definition: RbTuple.cc:40
void init(int nprocess, const char *filename, const char *workdir=".")
Global initialization.
Definition: RbTuple.cc:49
int begin(int pid)
Function called by HistoManager module for the first event.
Definition: RbTuple.cc:89
REG_MODULE(arichBtest)
Register the Module.
Abstract base class for different kinds of events.