Belle II Software development
HistoServer.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <daq/dqm/HistoServer.h>
9
10#include <framework/pcore/MsgHandler.h>
11#include <ctime>
12#include <unistd.h>
13
14using namespace Belle2;
15using namespace std;
16
17// Constructor / Destructor
18HistoServer::HistoServer(int port, const string& filename)
19{
20 m_port = port;
21 m_force_exit = 0;
22 m_filename = filename;
23}
24
25HistoServer::~HistoServer()
26{
27 delete m_sock;
28}
29
30// Initialize socket
31
32int HistoServer:: init()
33{
34 m_sock = new EvtSocketRecv(m_port, false);
35 m_man = new EvtSocketManager(m_sock);
36 // m_mapfile = TMapFile::Create(m_filename.c_str(), "RECREATE", MAPFILESIZE);
37 m_memfile = new DqmMemFile(m_filename, "write", DqmMemFile::c_memFileSize);
38 m_hman = new HistoManager(m_memfile);
39
40 // Semaphore to ensure exclusive access to shm
41 // m_mapfile->CreateSemaphore();
42 // m_mapfile->ReleaseSemaphore();
43 return 0;
44
45}
46// Server function to collect histograms
47
48int HistoServer::server()
49{
50 SocketIO sio;
51 MsgHandler msghdl(0);
52 char mbstr[100];
53 time_t now;
54 char* buffer = new char[c_maxBufSize];
55 // vector<int> recvsock;
56 int loop_counter = 0;
57 bool updated = false;
58 while (m_force_exit == 0) {
59 fflush(stdout);
60 int exam_stat = m_man->examine();
61 if (exam_stat == 0) {
62 } else if (exam_stat == 1) { //
63 // printf ( "Histo data ready on socket\n" );
64 vector<int>& recvsock = m_man->connected_socket_list();
65 for (vector<int>::iterator it = recvsock.begin();
66 it != recvsock.end(); ++it) {
67 int fd = *it;
68 if (m_man->connected(fd)) {
69 int is = sio.get(fd, buffer, c_maxBufSize);
70 if (is <= 0) {
71 now = time(0);
72 strftime(mbstr, sizeof(mbstr), "%c", localtime(&now));
73 printf("[%s] HistoServer: fd %d disconnected\n", mbstr, fd);
74 m_man->remove(fd);
75 break;
76 }
77 updated = true;
78 // printf ( "EvtMessage received : size = %d from fd=%d\n", is, fd );
79
80 EvtMessage* hmsg = new EvtMessage(buffer);
81 vector<TObject*> objlist;
82 vector<string> strlist;
83 msghdl.decode_msg(hmsg, objlist, strlist);
84 int nobjs = (hmsg->header())->reserved[1];
85 // string subdir = "ROOT";
86 string subdir = "";
87 now = time(0);
88 strftime(mbstr, sizeof(mbstr), "%c", localtime(&now));
89 printf("[%s] HistoServer : received nobjs = %d\n", mbstr, nobjs);
90 for (int i = 0; i < nobjs; i++) {
91 // printf ( "Object : %s received, class = %s\n", (strlist.at(i)).c_str(),
92 // (objlist.at(i))->ClassName() );
93 string objname = strlist.at(i);
94 if (objname == string("DQMRC:CLEAR")) {
95 m_hman->clear();
96 m_hman->merge();
97 now = time(0);
98 strftime(mbstr, sizeof(mbstr), "%c", localtime(&now));
99 printf("[%s] HistoServer: CLEAR\n", mbstr);
100 updated = false;
101 continue;
102 }
103 if (objname == string("DQMRC:MERGE")) {
104 m_hman->merge();
105 now = time(0);
106 strftime(mbstr, sizeof(mbstr), "%c", localtime(&now));
107 printf("[%s] HistoServer: MERGE\n", mbstr);
108 updated = false;
109 continue;
110 }
111 auto lpos = objname.find("DQMRC:SAVE:");
112 if (lpos != string::npos) {
113 auto filename = objname.substr(11);
114 m_hman->filedump(filename);
115 continue;
116 }
117 lpos = objname.find("SUBDIR:");
118 if (lpos != string::npos) {
119 subdir = objname.substr(7);
120 if (subdir == "EXIT") subdir = "";
121 // printf("HistoServer : subdirectory set to %s (%s)\n", subdir.c_str(), objname.c_str());
122 } else {
123 m_hman->update(subdir, strlist.at(i), fd, (TH1*)objlist.at(i));
124 }
125 }
126 }
127 }
128 }
129 usleep(1000);
130 loop_counter++;
131 if (loop_counter % c_mergeIntervall == 0 && updated) {
132 now = time(0);
133 strftime(mbstr, sizeof(mbstr), "%c", localtime(&now));
134 printf("[%s] HistoServer: merging histograms\n", mbstr);
135 // m_mapfile->AcquireSemaphore();
136 m_hman->merge();
137 // m_mapfile->ReleaseSemaphore();
138 updated = false;
139 }
140 }
141 return 0;
142}
143
144
Class to manage streamed object.
Definition: EvtMessage.h:59
EvtHeader * header()
Get pointer to EvtHeader.
Definition: EvtMessage.cc:161
A class to encode/decode an EvtMessage.
Definition: MsgHandler.h:103
Abstract base class for different kinds of events.
STL namespace.