247 def testStopPropagation(self):
248 """test function"""
249 monitoring_socket = self.create_socket(self.monitoring_port)
250
251 input_socket = self.create_socket(self.input_port)
252 self.send(input_socket, "h")
253 self.assertIsMsgType(input_socket, "c")
254
255 second_input_socket = self.create_socket(self.input_port, identity="other_socket")
256 self.send(second_input_socket, "h")
257 self.assertIsMsgType(second_input_socket, "c")
258
259
260 self.assertMonitoring(monitoring_socket, "input.registered_workers", 2)
261 self.assertNotHasOutputFile("outputFile.root", timeout=1)
262
263
264 self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 0)
265 self.assertMonitoring(monitoring_socket, "input.all_stop_messages", False)
266
267
268 self.send(input_socket, "l")
269 self.assertIsMsgType(input_socket, "c")
270 self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 1)
271 self.assertMonitoring(monitoring_socket, "input.all_stop_messages", False)
272 self.assertNotHasOutputFile("outputFile.root", timeout=1)
273
274
275 self.send(second_input_socket, "l")
276 self.assertIsMsgType(second_input_socket, "c")
277 self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 2)
278 self.assertMonitoring(monitoring_socket, "input.all_stop_messages", True)
279 self.assertNotHasOutputFile("outputFile.root", timeout=1)
280
281
282 self.send(monitoring_socket, "n")
283 self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 0)
284 self.assertMonitoring(monitoring_socket, "input.all_stop_messages", False)
285
286
287 self.send(input_socket, "v", self.histogram_data, self.event_data)
288 self.assertIsMsgType(input_socket, "c")
289
290 self.send(input_socket, "v", self.histogram_data, self.event_data)
291 self.assertIsMsgType(input_socket, "c")
292
293 self.send(second_input_socket, "v", self.histogram_data, self.event_data)
294 self.assertIsMsgType(second_input_socket, "c")
295
296 self.send(input_socket, "v", self.histogram_data, self.event_data)
297 self.assertIsMsgType(input_socket, "c")
298
299 self.send(second_input_socket, "v", self.histogram_data, self.event_data)
300 self.assertIsMsgType(second_input_socket, "c")
301
302 self.send(second_input_socket, "v", self.histogram_data, self.event_data)
303 self.assertIsMsgType(second_input_socket, "c")
304
305
306 self.assertMonitoring(monitoring_socket, "input.received_events", 6)
307 self.assertNotHasOutputFile("outputFile.root", timeout=1)
308
309
310 self.send(input_socket, "l")
311 self.assertIsMsgType(input_socket, "c")
312 self.send(second_input_socket, "l")
313 self.assertIsMsgType(second_input_socket, "c")
314
315
316 self.assertMonitoring(monitoring_socket, "input.received_stop_messages", 2)
317 self.assertMonitoring(monitoring_socket, "input.all_stop_messages", True)
318 self.assertHasOutputFile("outputFile.root", unlink=False)
319 self.assertTrue(check_histogram_output("outputFile.root", 2))
320
321
322 self.send(input_socket, "x")
323 self.assertIsMsgType(input_socket, "c")
324 self.send(second_input_socket, "x")
325 self.assertIsMsgType(second_input_socket, "c")
326
327
328 self.assertNotHasOutputFile("outputFile.root")
329
330 self.assertIsDown("histoserver")
331
332