44{
47 char mbstr[100];
48 time_t now;
49 char* buffer = new char[c_maxBufSize];
50
51 int loop_counter = 0;
52 bool updated = false;
54 while (m_force_exit == 0) {
55 fflush(stdout);
56 int exam_stat = m_man->examine();
57 if (exam_stat == 0) {
58 } else if (exam_stat == 1) {
59
60 vector<int>& recvsock = m_man->connected_socket_list();
61 for (vector<int>::iterator it = recvsock.begin();
62 it != recvsock.end(); ++it) {
63 int fd = *it;
64 if (m_man->connected(fd)) {
65 struct sockaddr_in isa;
66 socklen_t isize = sizeof(isa);
67 getpeername(fd, (struct sockaddr*)&isa, &isize);
68 char address[INET_ADDRSTRLEN];
69 strcpy(address, inet_ntoa(isa.sin_addr));
70 char* ptr = strrchr(address, '.');
71 int nr = -1;
72 if (ptr) {
73 nr = atoi(ptr + 1);
74 }
75 m_unit_last_conn_time[address] = now;
76
77 int is = sio.get(fd, buffer, c_maxBufSize);
78 if (is <= 0) {
79 now = time(0);
80 strftime(mbstr, sizeof(mbstr), "%F %T", localtime(&now));
81 printf("[%s] HistoServer2: fd %d / %s disconnected\n", mbstr, fd, address);
82 m_man->remove(fd);
84 break;
85 }
87
88
90 vector<TObject*> objlist;
91 vector<string> strlist;
92 msghdl.decode_msg(hmsg, objlist, strlist);
93 int nobjs = (hmsg->
header())->reserved[1];
94
95 string subdir = "";
96 now = time(0);
97 strftime(mbstr, sizeof(mbstr), "%F %T", localtime(&now));
98 m_unit_last_packet_time[address] = now;
99 printf("[%s] HistoServer2 : received nobjs = %d from %s\n", mbstr, nobjs, address);
100 if (nobjs > 0) m_unit_last_content_time[address] = now;
101 for (int i = 0; i < nobjs; i++) {
102
103
104 string objname = strlist.at(i);
105 if (objname == string("DQMRC:CLEAR")) {
106 m_hman->clear();
107 m_hman->merge();
108 updated = false;
109 m_last_merge_time = time(0);
110 strftime(mbstr, sizeof(mbstr), "%F %T", localtime(&m_last_merge_time));
111 printf("[%s] HistoServer2: CLEAR\n", mbstr);
112 continue;
113 }
114 if (objname == string("DQMRC:MERGE")) {
115 m_hman->merge();
116 updated = false;
117 m_last_merge_time = time(0);
118 strftime(mbstr, sizeof(mbstr), "%F %T", localtime(&m_last_merge_time));
119 printf("[%s] HistoServer2: MERGE\n", mbstr);
120 continue;
121 }
122 auto lpos = objname.find("DQMRC:SAVE:");
123 if (lpos != string::npos) {
124 auto filename = objname.substr(11);
125 m_hman->filedump(filename);
126 continue;
127 }
128 lpos = objname.find("SUBDIR:");
129 if (lpos != string::npos) {
130 subdir = objname.substr(7);
131 if (subdir == "EXIT") subdir = "";
132
133
134 } else {
135 m_hman->update(subdir, strlist.at(i), fd, (TH1*)objlist.at(i));
136 updated = true;
137 }
138 }
139 }
140 }
141 }
142 usleep(1000);
143 loop_counter++;
144 if (loop_counter % c_mergeIntervall == 0) {
145 if (updated) {
146 m_last_merge_time = time(0);
147 strftime(mbstr, sizeof(mbstr), "%F %T", localtime(&m_last_merge_time));
148 printf("[%s] HistoServer2: merging histograms\n", mbstr);
149 m_hman->merge();
150 updated = false;
151 }
153 }
154 }
155 return 0;
156}
Class to manage streamed object.
EvtHeader * header()
Get pointer to EvtHeader.
std::map< std::string, std::pair< int, bool > > m_units_connected
connection IP, state and last update time
void write_state(void)
Write connection state to a file.
A class to encode/decode an EvtMessage.