43{
46 char mbstr[100];
47 time_t now;
48 char* buffer = new char[c_maxBufSize];
49
50 int loop_counter = 0;
51 bool updated = false;
53 while (m_force_exit == 0) {
54 fflush(stdout);
55 int exam_stat = m_man->examine();
56 if (exam_stat == 0) {
57 } else if (exam_stat == 1) {
58
59 vector<int>& recvsock = m_man->connected_socket_list();
60 for (vector<int>::iterator it = recvsock.begin();
61 it != recvsock.end(); ++it) {
62 int fd = *it;
63 if (m_man->connected(fd)) {
64 struct sockaddr_in isa;
65 socklen_t isize = sizeof(isa);
66 getpeername(fd, (struct sockaddr*)&isa, &isize);
67 char address[INET_ADDRSTRLEN];
68 strcpy(address, inet_ntoa(isa.sin_addr));
69 char* ptr = strrchr(address, '.');
70 int nr = -1;
71 if (ptr) {
72 nr = atoi(ptr + 1);
73 }
74 m_unit_last_conn_time[address] = now;
75
76 int is = sio.get(fd, buffer, c_maxBufSize);
77 if (is <= 0) {
78 now = time(0);
79 strftime(mbstr, sizeof(mbstr), "%F %T", localtime(&now));
80 printf("[%s] HistoServer2: fd %d / %s disconnected\n", mbstr, fd, address);
81 m_man->remove(fd);
83 break;
84 }
86
87
89 vector<TObject*> objlist;
90 vector<string> strlist;
91 msghdl.decode_msg(hmsg, objlist, strlist);
92 int nobjs = (hmsg->
header())->reserved[1];
93
94 string subdir = "";
95 now = time(0);
96 strftime(mbstr, sizeof(mbstr), "%F %T", localtime(&now));
97 m_unit_last_packet_time[address] = now;
98 printf("[%s] HistoServer2 : received nobjs = %d from %s\n", mbstr, nobjs, address);
99 if (nobjs > 0) m_unit_last_content_time[address] = now;
100 for (int i = 0; i < nobjs; i++) {
101
102
103 string objname = strlist.at(i);
104 if (objname == string("DQMRC:CLEAR")) {
105 m_hman->clear();
106 m_hman->merge();
107 updated = false;
108 m_last_merge_time = time(0);
109 strftime(mbstr, sizeof(mbstr), "%F %T", localtime(&m_last_merge_time));
110 printf("[%s] HistoServer2: CLEAR\n", mbstr);
111 continue;
112 }
113 if (objname == string("DQMRC:MERGE")) {
114 m_hman->merge();
115 updated = false;
116 m_last_merge_time = time(0);
117 strftime(mbstr, sizeof(mbstr), "%F %T", localtime(&m_last_merge_time));
118 printf("[%s] HistoServer2: MERGE\n", mbstr);
119 continue;
120 }
121 auto lpos = objname.find("DQMRC:SAVE:");
122 if (lpos != string::npos) {
123 auto filename = objname.substr(11);
124 m_hman->filedump(filename);
125 continue;
126 }
127 lpos = objname.find("SUBDIR:");
128 if (lpos != string::npos) {
129 subdir = objname.substr(7);
130 if (subdir == "EXIT") subdir = "";
131
132
133 } else {
134 m_hman->update(subdir, strlist.at(i), fd, (TH1*)objlist.at(i));
135 updated = true;
136 }
137 }
138 }
139 }
140 }
141 usleep(1000);
142 loop_counter++;
143 if (loop_counter % c_mergeIntervall == 0) {
144 if (updated) {
145 m_last_merge_time = time(0);
146 strftime(mbstr, sizeof(mbstr), "%F %T", localtime(&m_last_merge_time));
147 printf("[%s] HistoServer2: merging histograms\n", mbstr);
148 m_hman->merge();
149 updated = false;
150 }
152 }
153 }
154 return 0;
155}
Class to manage streamed object.
EvtHeader * header()
Get pointer to EvtHeader.
std::map< std::string, std::pair< int, bool > > m_units_connected
connection IP, state and last update time
void write_state(void)
Write connection state to a file.
A class to encode/decode an EvtMessage.