Belle II Software  release-08-01-10
HistoServer.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <daq/dqm/HistoServer.h>
9 
10 #include <framework/pcore/MsgHandler.h>
11 #include <ctime>
12 
13 using namespace Belle2;
14 using namespace std;
15 
16 // Constructor / Destructor
17 HistoServer::HistoServer(int port, const string& filename)
18 {
19  m_port = port;
20  m_force_exit = 0;
21  m_filename = filename;
22 }
23 
24 HistoServer::~HistoServer()
25 {
26  delete m_sock;
27 }
28 
29 // Initialize socket
30 
31 int HistoServer:: init()
32 {
33  m_sock = new EvtSocketRecv(m_port, false);
34  m_man = new EvtSocketManager(m_sock);
35  // m_mapfile = TMapFile::Create(m_filename.c_str(), "RECREATE", MAPFILESIZE);
36  m_memfile = new DqmMemFile(m_filename, "write", MEMFILESIZE);
37  m_hman = new HistoManager(m_memfile);
38 
39  // Semaphore to ensure exclusive access to shm
40  // m_mapfile->CreateSemaphore();
41  // m_mapfile->ReleaseSemaphore();
42  return 0;
43 
44 }
45 // Server function to collect histograms
46 
47 int HistoServer::server()
48 {
49  SocketIO sio;
50  MsgHandler msghdl(0);
51  char mbstr[100];
52  time_t now;
53  char* buffer = new char[MAXBUFSIZE];
54  // vector<int> recvsock;
55  int loop_counter = 0;
56  bool updated = false;
57  while (m_force_exit == 0) {
58  fflush(stdout);
59  int exam_stat = m_man->examine();
60  if (exam_stat == 0) {
61  } else if (exam_stat == 1) { //
62  // printf ( "Histo data ready on socket\n" );
63  vector<int>& recvsock = m_man->connected_socket_list();
64  for (vector<int>::iterator it = recvsock.begin();
65  it != recvsock.end(); ++it) {
66  int fd = *it;
67  if (m_man->connected(fd)) {
68  int is = sio.get(fd, buffer, MAXBUFSIZE);
69  if (is <= 0) {
70  now = time(0);
71  strftime(mbstr, sizeof(mbstr), "%c", localtime(&now));
72  printf("[%s] HistoServer: fd %d disconnected\n", mbstr, fd);
73  m_man->remove(fd);
74  break;
75  }
76  updated = true;
77  // printf ( "EvtMessage received : size = %d from fd=%d\n", is, fd );
78 
79  EvtMessage* hmsg = new EvtMessage(buffer);
80  vector<TObject*> objlist;
81  vector<string> strlist;
82  msghdl.decode_msg(hmsg, objlist, strlist);
83  int nobjs = (hmsg->header())->reserved[1];
84  // string subdir = "ROOT";
85  string subdir = "";
86  now = time(0);
87  strftime(mbstr, sizeof(mbstr), "%c", localtime(&now));
88  printf("[%s] HistoServer : received nobjs = %d\n", mbstr, nobjs);
89  for (int i = 0; i < nobjs; i++) {
90  // printf ( "Object : %s received, class = %s\n", (strlist.at(i)).c_str(),
91  // (objlist.at(i))->ClassName() );
92  string objname = strlist.at(i);
93  if (objname == string("DQMRC:CLEAR")) {
94  m_hman->clear();
95  m_hman->merge();
96  now = time(0);
97  strftime(mbstr, sizeof(mbstr), "%c", localtime(&now));
98  printf("[%s] HistoServer: CLEAR\n", mbstr);
99  updated = false;
100  continue;
101  }
102  if (objname == string("DQMRC:MERGE")) {
103  m_hman->merge();
104  now = time(0);
105  strftime(mbstr, sizeof(mbstr), "%c", localtime(&now));
106  printf("[%s] HistoServer: MERGE\n", mbstr);
107  updated = false;
108  continue;
109  }
110  auto lpos = objname.find("DQMRC:SAVE:");
111  if (lpos != string::npos) {
112  auto filename = objname.substr(11);
113  m_hman->filedump(filename);
114  continue;
115  }
116  lpos = objname.find("SUBDIR:");
117  if (lpos != string::npos) {
118  subdir = objname.substr(7);
119  if (subdir == "EXIT") subdir = "";
120  // printf("HistoServer : subdirectory set to %s (%s)\n", subdir.c_str(), objname.c_str());
121  } else {
122  m_hman->update(subdir, strlist.at(i), fd, (TH1*)objlist.at(i));
123  }
124  }
125  }
126  }
127  }
128  usleep(1000);
129  loop_counter++;
130  if (loop_counter % MERGE_INTERVAL == 0 && updated) {
131  now = time(0);
132  strftime(mbstr, sizeof(mbstr), "%c", localtime(&now));
133  printf("[%s] HistoServer: merging histograms\n", mbstr);
134  // m_mapfile->AcquireSemaphore();
135  m_hman->merge();
136  // m_mapfile->ReleaseSemaphore();
137  updated = false;
138  }
139  }
140  return 0;
141 }
142 
143 
Class to manage streamed object.
Definition: EvtMessage.h:59
EvtHeader * header()
Get pointer to EvtHeader.
Definition: EvtMessage.cc:161
A class to encode/decode an EvtMessage.
Definition: MsgHandler.h:103
Abstract base class for different kinds of events.