Belle II Software  release-06-00-14
HistoServer.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <daq/dqm/HistoServer.h>
9 
10 #include <framework/pcore/MsgHandler.h>
11 #include <ctime>
12 
13 using namespace Belle2;
14 using namespace std;
15 
16 // Constructor / Destructor
17 HistoServer::HistoServer(int port, string filename)
18 {
19  m_port = port;
20  m_force_exit = 0;
21  m_filename = filename;
22 }
23 
24 HistoServer::~HistoServer()
25 {
26  delete m_sock;
27 }
28 
29 // Initialize socket
30 
31 int HistoServer:: init()
32 {
33  m_sock = new EvtSocketRecv(m_port, false);
34  m_man = new EvtSocketManager(m_sock);
35  // m_mapfile = TMapFile::Create(m_filename.c_str(), "RECREATE", MAPFILESIZE);
36  m_memfile = new DqmMemFile(m_filename, "write", MEMFILESIZE);
37  m_hman = new HistoManager(m_memfile);
38 
39  // Semaphore to ensure exclusive access to shm
40  // m_mapfile->CreateSemaphore();
41  // m_mapfile->ReleaseSemaphore();
42  return 0;
43 
44 }
45 // Server function to collect histograms
46 
47 int HistoServer::server()
48 {
49  SocketIO sio;
50  MsgHandler msghdl(0);
51  char mbstr[100];
52  time_t now;
53  char* buffer = new char[MAXBUFSIZE];
54  // vector<int> recvsock;
55  int loop_counter = 0;
56  bool updated = false;
57  while (m_force_exit == 0) {
58  fflush(stdout);
59  int exam_stat = m_man->examine();
60  if (exam_stat == 0) {
61  } else if (exam_stat == 1) { //
62  // printf ( "Histo data ready on socket\n" );
63  vector<int>& recvsock = m_man->connected_socket_list();
64  for (vector<int>::iterator it = recvsock.begin();
65  it != recvsock.end(); ++it) {
66  int fd = *it;
67  if (m_man->connected(fd)) {
68  int is = sio.get(fd, buffer, MAXBUFSIZE);
69  if (is <= 0) {
70  now = time(0);
71  strftime(mbstr, sizeof(mbstr), "%c", localtime(&now));
72  printf("[%s] HistoServer: fd %d disconnected\n", mbstr, fd);
73  m_man->remove(fd);
74  break;
75  }
76  updated = true;
77  // printf ( "EvtMessage received : size = %d from fd=%d\n", is, fd );
78 
79  EvtMessage* hmsg = new EvtMessage(buffer);
80  vector<TObject*> objlist;
81  vector<string> strlist;
82  msghdl.decode_msg(hmsg, objlist, strlist);
83  int nobjs = (hmsg->header())->reserved[1];
84  int narys = (hmsg->header())->reserved[2];
85  // string subdir = "ROOT";
86  string subdir = "";
87  now = time(0);
88  strftime(mbstr, sizeof(mbstr), "%c", localtime(&now));
89  printf("[%s] HistoServer : received nobjs = %d\n", mbstr, nobjs);
90  for (int i = 0; i < nobjs; i++) {
91  // printf ( "Object : %s received, class = %s\n", (strlist.at(i)).c_str(),
92  // (objlist.at(i))->ClassName() );
93  string objname = strlist.at(i);
94  if (objname == string("DQMRC:CLEAR")) {
95  m_hman->clear();
96  m_hman->merge();
97  now = time(0);
98  strftime(mbstr, sizeof(mbstr), "%c", localtime(&now));
99  printf("[%s] HistoServer: CLEAR\n", mbstr);
100  updated = false;
101  continue;
102  }
103  if (objname == string("DQMRC:MERGE")) {
104  m_hman->merge();
105  now = time(0);
106  strftime(mbstr, sizeof(mbstr), "%c", localtime(&now));
107  printf("[%s] HistoServer: MERGE\n", mbstr);
108  updated = false;
109  continue;
110  }
111  int lpos = objname.find("SUBDIR:");
112  if (lpos != string::npos) {
113  subdir = objname.substr(7);
114  if (subdir == "EXIT") subdir = "";
115  // printf("HistoServer : subdirectory set to %s (%s)\n", subdir.c_str(), objname.c_str());
116  } else {
117  m_hman->update(subdir, strlist.at(i), fd, (TH1*)objlist.at(i));
118  }
119  }
120  }
121  }
122  }
123  usleep(1000);
124  loop_counter++;
125  if (loop_counter % MERGE_INTERVAL == 0 && updated) {
126  now = time(0);
127  strftime(mbstr, sizeof(mbstr), "%c", localtime(&now));
128  printf("[%s] HistoServer: merging histograms\n", mbstr);
129  // m_mapfile->AcquireSemaphore();
130  m_hman->merge();
131  // m_mapfile->ReleaseSemaphore();
132  updated = false;
133  }
134  }
135  return 0;
136 }
137 
138 
Class to manage streamed object.
Definition: EvtMessage.h:59
EvtHeader * header()
Get pointer to EvtHeader.
Definition: EvtMessage.cc:161
A class to encode/decode an EvtMessage.
Definition: MsgHandler.h:103
Abstract base class for different kinds of events.