Belle II Software development
HistoServer2.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <daq/dqm/HistoServer2.h>
9
10#include <framework/pcore/MsgHandler.h>
11#include <ctime>
12#include <unistd.h>
13#include <arpa/inet.h>
14
15using namespace Belle2;
16using namespace std;
17
18// Constructor / Destructor
19HistoServer2::HistoServer2(int port, const string& filename)
20{
21 m_port = port;
22 m_force_exit = 0;
23 m_filename = filename;
24}
25
26HistoServer2::~HistoServer2()
27{
28 delete m_sock;
29}
30
31// Initialize socket
32
33int HistoServer2::init()
34{
35 m_sock = new EvtSocketRecv(m_port, false);
36 m_man = new EvtSocketManager(m_sock);
37 m_hman = new HistoManager2(m_filename);
38 return 0;
39
40}
41// Server function to collect histograms
42
43int HistoServer2::server()
44{
45 SocketIO sio;
46 MsgHandler msghdl(0);
47 char mbstr[100];
48 time_t now;
49 char* buffer = new char[c_maxBufSize];
50 // vector<int> recvsock;
51 int loop_counter = 0;
52 bool updated = false;
54 while (m_force_exit == 0) {
55 fflush(stdout);
56 int exam_stat = m_man->examine();
57 if (exam_stat == 0) {
58 } else if (exam_stat == 1) { //
59 // printf ( "Histo data ready on socket\n" );
60 vector<int>& recvsock = m_man->connected_socket_list();
61 for (vector<int>::iterator it = recvsock.begin();
62 it != recvsock.end(); ++it) {
63 int fd = *it;
64 if (m_man->connected(fd)) {
65 struct sockaddr_in isa;
66 socklen_t isize = sizeof(isa);
67 getpeername(fd, (struct sockaddr*)&isa, &isize);
68 char address[INET_ADDRSTRLEN];
69 strcpy(address, inet_ntoa(isa.sin_addr));
70 char* ptr = strrchr(address, '.');
71 int nr = -1;
72 if (ptr) {
73 nr = atoi(ptr + 1);
74 }
75 m_unit_last_conn_time[address] = now;
76
77 int is = sio.get(fd, buffer, c_maxBufSize);
78 if (is <= 0) {
79 now = time(0);
80 strftime(mbstr, sizeof(mbstr), "%F %T", localtime(&now));
81 printf("[%s] HistoServer2: fd %d / %s disconnected\n", mbstr, fd, address);
82 m_man->remove(fd);
83 m_units_connected[address] = std::pair(nr, false);
84 break;
85 }
86 m_units_connected[address] = std::pair(nr, true);
87 // printf ( "EvtMessage received : size = %d from fd=%d\n", is, fd );
88
89 EvtMessage* hmsg = new EvtMessage(buffer);
90 vector<TObject*> objlist;
91 vector<string> strlist;
92 msghdl.decode_msg(hmsg, objlist, strlist);
93 int nobjs = (hmsg->header())->reserved[1];
94 // string subdir = "ROOT";
95 string subdir = "";
96 now = time(0);
97 strftime(mbstr, sizeof(mbstr), "%F %T", localtime(&now));
98 m_unit_last_packet_time[address] = now;
99 printf("[%s] HistoServer2 : received nobjs = %d from %s\n", mbstr, nobjs, address);
100 if (nobjs > 0) m_unit_last_content_time[address] = now;
101 for (int i = 0; i < nobjs; i++) {
102 // printf ( "Object : %s received, class = %s\n", (strlist.at(i)).c_str(),
103 // (objlist.at(i))->ClassName() );
104 string objname = strlist.at(i);
105 if (objname == string("DQMRC:CLEAR")) {
106 m_hman->clear();
107 m_hman->merge();
108 updated = false; // have merged, thus reset updated
109 m_last_merge_time = time(0);
110 strftime(mbstr, sizeof(mbstr), "%F %T", localtime(&m_last_merge_time));
111 printf("[%s] HistoServer2: CLEAR\n", mbstr);
112 continue;
113 }
114 if (objname == string("DQMRC:MERGE")) {
115 m_hman->merge();
116 updated = false; // have merged, thus reset updated
117 m_last_merge_time = time(0);
118 strftime(mbstr, sizeof(mbstr), "%F %T", localtime(&m_last_merge_time));
119 printf("[%s] HistoServer2: MERGE\n", mbstr);
120 continue;
121 }
122 auto lpos = objname.find("DQMRC:SAVE:");
123 if (lpos != string::npos) {
124 auto filename = objname.substr(11);
125 m_hman->filedump(filename);
126 continue;
127 }
128 lpos = objname.find("SUBDIR:");
129 if (lpos != string::npos) {
130 subdir = objname.substr(7);
131 if (subdir == "EXIT") subdir = "";
132 // printf("HistoServer2 : subdirectory set to %s (%s)\n", subdir.c_str(), objname.c_str());
133 // no update to histograms ...
134 } else {
135 m_hman->update(subdir, strlist.at(i), fd, (TH1*)objlist.at(i));
136 updated = true; // histograms have been updated
137 }
138 }
139 }
140 }
141 }
142 usleep(1000);
143 loop_counter++;
144 if (loop_counter % c_mergeIntervall == 0) {
145 if (updated) {
146 m_last_merge_time = time(0);
147 strftime(mbstr, sizeof(mbstr), "%F %T", localtime(&m_last_merge_time));
148 printf("[%s] HistoServer2: merging histograms\n", mbstr);
149 m_hman->merge();
150 updated = false; // have merged, thus reset updated
151 }
152 write_state();
153 }
154 }
155 return 0;
156}
157
159{
160 char mbstr[100];
161 char mbstr2[100];
162 char mbstr3[100];
163 std::string name = "/tmp/dqm_hserver_state_" + m_filename;
164 FILE* fh = fopen(name.c_str(), "wt+");
165 if (fh) {
166 time_t now = time(0);
167 strftime(mbstr, sizeof(mbstr), "%F %T", localtime(&now));
168 strftime(mbstr2, sizeof(mbstr2), "%F %T", localtime(&m_last_merge_time));
169 fprintf(fh, "%s,%s,%s\n", m_filename.c_str(), mbstr, mbstr2);
170
171 for (auto& it : m_units_connected) {
172 int nr = it.second.first;
173 int con = it.second.second;
174 strftime(mbstr, sizeof(mbstr), "%F %T", localtime(&m_unit_last_conn_time[it.first]));
175 strftime(mbstr2, sizeof(mbstr2), "%F %T", localtime(&m_unit_last_packet_time[it.first]));
176 strftime(mbstr3, sizeof(mbstr3), "%F %T", localtime(&m_unit_last_content_time[it.first]));
177 if (it.first == "127.0.0.1") {
178 fprintf(fh, "RUNCONTROL,");
179 } else {
180 if (nr >= 0 and nr < 20) {
181 fprintf(fh, "HLT%d,", nr);
182 } else if (nr > 100 and nr < 110) {
183 fprintf(fh, "ERECO%d,", nr - 100);
184 } else {
185 fprintf(fh, "UNKNOWN,");
186 }
187 }
188 fprintf(fh, "%s,%d,%s,%s,%s\n", it.first.c_str(), con, mbstr, mbstr2, mbstr3);
189 }
190 fclose(fh);
191 }
192}
Class to manage streamed object.
Definition: EvtMessage.h:59
EvtHeader * header()
Get pointer to EvtHeader.
Definition: EvtMessage.cc:161
std::map< std::string, std::pair< int, bool > > m_units_connected
connection IP, state and last update time
Definition: HistoServer2.h:52
void write_state(void)
Write connection state to a file.
A class to encode/decode an EvtMessage.
Definition: MsgHandler.h:103
Abstract base class for different kinds of events.
STL namespace.