310 def testFullRun(self):
311 """test function"""
312 distributor_monitoring_socket = self.create_socket(self.distributor_monitoring_port)
313 final_collector_monitoring_socket = self.create_socket(self.final_collector_monitoring_port)
314
315 input_socket = self.create_socket(self.distributor_input_port, socket_type=zmq.STREAM, bind=True)
316 input_identity, _ = self.recv(input_socket)
317
318 output_socket = self.create_socket(self.final_collector_output_port, socket_type=zmq.STREAM, bind=True)
319 output_identity, _ = self.recv(output_socket)
320
321
322 self.assertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
323 self.assertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
324 self.assertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
325 self.assertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 40)
326 self.assertMonitoring(distributor_monitoring_socket, "output.registered_workers", 2)
327
328 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 2)
329
330 self.assertHasOutputFile("initialize_called", timeout=1)
331 self.assertHasOutputFile("dying_initialize_called", timeout=1)
332
333
334 for _ in range(100):
335 input_socket.send_multipart([input_identity, self.event_data])
336
337 self.assertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
338 self.assertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
339 self.assertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
340 self.assertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 40)
341 self.assertMonitoring(distributor_monitoring_socket, "output.registered_workers", 2)
342 self.assertMonitoring(distributor_monitoring_socket, "output.sent_events", 100)
343
344 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 2)
345 self.assertMonitoring(final_collector_monitoring_socket, "input.received_events", 100)
346 self.assertMonitoring(final_collector_monitoring_socket, "output.sent_events", 100)
347
348 self.assertHasOutputFile("beginrun_called", timeout=1)
349 self.assertHasOutputFile("dying_beginrun_called", timeout=1)
350
351 buffer = b""
352 while output_socket.poll(0):
353 _, msg = self.recv(output_socket)
354 buffer += msg
355 self.assertNothingMore(output_socket)
356
357
358 self.assertEqual(len(buffer), 122638 * 100)
359
360
361 Path("dying_exit_request").touch()
362
363
364 for _ in range(100):
365 input_socket.send_multipart([input_identity, self.event_data])
366
367 self.assertHasOutputFile("dying_exit_called", timeout=1)
368 self.assertIsDown("dying_worker", timeout=10)
369
370 buffer = b""
371 while output_socket.poll(0):
372 _, msg = self.recv(output_socket)
373 buffer += msg
374 self.assertNothingMore(output_socket)
375
376
377
378 self.assertEqual(len(buffer), 122638 * 79)
379
380
381 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
382
383
384 self.send(distributor_monitoring_socket, "l")
385
386 self.assertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
387 self.assertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
388 self.assertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
389 self.assertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
390
391 self.assertMonitoring(distributor_monitoring_socket, "output.registered_workers", 2)
392 self.assertMonitoring(distributor_monitoring_socket, "output.sent_events", 200)
393
394 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
395 self.assertMonitoring(final_collector_monitoring_socket, "input.received_events", 179)
396 self.assertMonitoring(final_collector_monitoring_socket, "input.received_stop_messages", 1)
397 self.assertMonitoring(final_collector_monitoring_socket, "input.all_stop_messages", 1)
398
399
400 self.assertHasOutputFile("endrun_called", timeout=1)
401
402
403 self.send(distributor_monitoring_socket, "n")
404 self.send(final_collector_monitoring_socket, "n")
405
406 self.assertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
407 self.assertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
408 self.assertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
409 self.assertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
410 self.assertMonitoring(distributor_monitoring_socket, "output.registered_workers", 2)
411
412 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
413 self.assertMonitoring(final_collector_monitoring_socket, "input.received_events", 179)
414 self.assertMonitoring(final_collector_monitoring_socket, "input.received_stop_messages", 0)
415 self.assertMonitoring(final_collector_monitoring_socket, "input.all_stop_messages", 0)
416
417 for _ in range(100):
418 input_socket.send_multipart([input_identity, self.event_data])
419
420 self.assertMonitoring(distributor_monitoring_socket, "input.socket_state", "connected")
421 self.assertMonitoring(distributor_monitoring_socket, "input.socket_connects", 1)
422 self.assertMonitoring(distributor_monitoring_socket, "input.socket_disconnects", 0)
423 self.assertMonitoring(distributor_monitoring_socket, "output.ready_queue_size", 20)
424 self.assertMonitoring(distributor_monitoring_socket, "output.registered_workers", 2)
425 self.assertMonitoring(distributor_monitoring_socket, "output.sent_events", 300)
426
427 self.assertMonitoring(final_collector_monitoring_socket, "input.registered_workers", 1)
428 self.assertMonitoring(final_collector_monitoring_socket, "input.received_events", 279)
429 self.assertMonitoring(final_collector_monitoring_socket, "output.sent_events", 279)
430
431 self.assertNotHasOutputFile("endrun_called", timeout=1)
432 self.assertHasOutputFile("beginrun_called", timeout=1)
433
434 buffer = b""
435 while output_socket.poll(0):
436 _, msg = self.recv(output_socket)
437 buffer += msg
438 self.assertNothingMore(output_socket)
439
440
441 self.assertEqual(len(buffer), 122638 * 100)
442
443
444 self.send(distributor_monitoring_socket, "x")
445
446 self.assertIsDown("final_collector")
447 self.assertIsDown("distributor")
448 self.assertIsDown("worker")
449
450