194{
195 int* temp_buf = NULL;
196 int flag = 0;
197
198 vector <int> each_buf_nwords;
199 each_buf_nwords.clear();
200 vector <int> each_buf_nodes;
201 each_buf_nodes.clear();
202 vector <int> each_buf_events;
203 each_buf_events.clear();
204
205 *total_buf_nwords = 0;
206 *num_nodes_in_sendblock = 0;
207 *num_events_in_sendblock = 0;
208
209
210
211
212 int send_hdr_buf[ SendHeader::SENDHDR_NWORDS ];
213
214
215 for (int i = 0; i < (int)(m_socket_recv.size()); i++) {
216
217 recvFD(m_socket_recv[ i ], (
char*)send_hdr_buf,
sizeof(
int)*SendHeader::SENDHDR_NWORDS, flag);
218
221
223 int temp_num_nodes = send_hdr.GetNumNodesinPacket();
224
225 if (i == 0) {
226 *num_events_in_sendblock = temp_num_events;
227 } else if (*num_events_in_sendblock != temp_num_events) {
228#ifndef NO_DATA_CHECK
229 char err_buf[500];
230 sprintf(err_buf,
231 "[FATAL] CORRUPTED DATA: Different # of events or nodes in SendBlocks( # of eve : %d(socket 0) %d(socket %d), # of nodes: %d(socket 0) %d(socket %d). Exiting...\n",
232 *num_events_in_sendblock, temp_num_events, i, *num_nodes_in_sendblock, temp_num_nodes, i);
233 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
234 sleep(1234567);
235 exit(1);
236#endif
237 }
238
239 *num_nodes_in_sendblock += temp_num_nodes;
240
241 int rawblk_nwords = send_hdr.GetTotalNwords()
242 - SendHeader::SENDHDR_NWORDS
243 - SendTrailer::SENDTRL_NWORDS;
244 *total_buf_nwords += rawblk_nwords;
245
246
247
248
249 if (rawblk_nwords > (int)(2.5e6) || rawblk_nwords <= 0) {
250 printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
251 char err_buf[500];
252 sprintf(err_buf, "CORRUPTED DATA: Too large event : Header %d %d %d %d :block size %d words\n", i, temp_num_events, temp_num_nodes,
253 send_hdr.GetTotalNwords(), rawblk_nwords);
254 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
255 sleep(123456);
256 exit(1);
257
258 }
259
260 each_buf_nwords.push_back(rawblk_nwords);
261 each_buf_events.push_back(temp_num_events);
262 each_buf_nodes.push_back(temp_num_nodes);
263
264 }
265
266
267 temp_buf =
getNewBuffer(*total_buf_nwords, delete_flag);
268
269
270
271 int total_recvd_byte = 0;
272 for (int i = 0; i < (int)(m_socket_recv.size()); i++) {
273
274 try {
275 total_recvd_byte +=
recvFD(m_socket_recv[ i ], (
char*)temp_buf + total_recvd_byte,
276 each_buf_nwords[ i ] * sizeof(int), flag);
277 } catch (const string& err_str) {
278 if (*delete_flag) {
279
280 printf("[WARNING] Delete buffer before going to Run-pause state\n"); fflush(stdout);
281 delete temp_buf;
282 }
283 throw (std::move(err_str));
284 }
285
286
287
288 int temp_length = 0;
289 for (int j = 0; j < each_buf_nodes[ i ] * each_buf_events[ i ]; j++) {
290 int this_length = *((int*)((char*)temp_buf + total_recvd_byte - each_buf_nwords[ i ] * sizeof(int) + temp_length));
291 temp_length += this_length * sizeof(int);
292 }
293 if (temp_length != (int)(each_buf_nwords[ i ] * sizeof(int))) {
294 printf("[DEBUG]*******SENDHDR*********** \n");
295 printData(send_hdr_buf, SendHeader::SENDHDR_NWORDS);
296 printf("[DEBUG]*******BODY***********\n ");
297 printData(temp_buf, (
int)(total_recvd_byte /
sizeof(
int)));
298 char err_buf[500];
299 sprintf(err_buf, "CORRUPTED DATA: Length written on SendHeader(%d) is invalid. Actual data size is %d. Exting...",
300 (int)(*total_buf_nwords * sizeof(int)), temp_length);
301 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
302 sleep(1234567);
303 exit(-1);
304 }
305
306 }
307
308 if ((int)(*total_buf_nwords * sizeof(int)) != total_recvd_byte) {
309 char err_buf[500];
310 sprintf(err_buf, "CORRUPTED DATA: Received data size (%d byte) is not same as expected one (%d) from Sendheader. Exting...",
311 total_recvd_byte, (int)(*total_buf_nwords * sizeof(int)));
312 print_err.PrintError(err_buf, __FILE__, __PRETTY_FUNCTION__, __LINE__);
313 sleep(1234567);
314 exit(-1);
315 }
316
317
318 int send_trl_buf[(unsigned int)(SendTrailer::SENDTRL_NWORDS) ];
319 for (int i = 0; i < (int)(m_socket_recv.size()); i++) {
320 try {
321 recvFD(m_socket_recv[ i ], (
char*)send_trl_buf, SendTrailer::SENDTRL_NWORDS *
sizeof(
int), flag);
322 } catch (const string& err_str) {
323 if (*delete_flag) {
324
325 printf("[WARNING] Delete buffer before going to Run-pause state\n"); fflush(stdout);
326 delete temp_buf;
327 }
328 throw (std::move(err_str));
329 }
330 }
331
332 return temp_buf;
333}
int recvFD(int fd, char *buf, int data_size_byte, int flag)
receive data