103 def testEventPropagation(self):
104 """test function"""
105 self.assertNotHasOutputFile("outputFile.root")
106
107 first_monitoring_socket = self.create_socket(self.first_monitoring_port)
108 second_monitoring_socket = self.create_socket(self.second_monitoring_port)
109 final_monitoring_socket = self.create_socket(self.final_collector_monitoring_port)
110
111 input_sockets = [
112 self.create_socket(self.first_input_port, identity="1"),
113 self.create_socket(self.first_input_port, identity="2"),
114 self.create_socket(self.second_input_port, identity="3"),
115 self.create_socket(self.second_input_port, identity="4"),
116 self.create_socket(self.second_input_port, identity="5"),
117 ]
118
119 for input_socket in input_sockets:
120 self.send(input_socket, "h")
121 self.assertIsMsgType(input_socket, "c")
122
123
124 self.assertMonitoring(first_monitoring_socket, "input.registered_workers", 2)
125 self.assertMonitoring(second_monitoring_socket, "input.registered_workers", 3)
126 self.assertMonitoring(final_monitoring_socket, "input.registered_workers", 2)
127
128
129 self.assertMonitoring(first_monitoring_socket, "input.received_stop_messages", 0)
130 self.assertMonitoring(second_monitoring_socket, "input.received_stop_messages", 0)
131 self.assertMonitoring(final_monitoring_socket, "input.received_stop_messages", 0)
132
133 self.assertMonitoring(first_monitoring_socket, "input.all_stop_messages", False)
134 self.assertMonitoring(second_monitoring_socket, "input.all_stop_messages", False)
135 self.assertMonitoring(final_monitoring_socket, "input.all_stop_messages", False)
136
137
138 for _ in range(10):
139 for input_socket in input_sockets:
140 self.send(input_socket, "v", self.histogram_data, self.event_data)
141 self.assertIsMsgType(input_socket, "c")
142
143 self.assertMonitoring(first_monitoring_socket, "input.received_events", 20)
144 self.assertMonitoring(second_monitoring_socket, "input.received_events", 30)
145 self.assertMonitoring(final_monitoring_socket, "input.received_events", 2)
146
147
148 self.check_histogram_repeated("outputFile.root", 5)
149
150
151 for input_socket in input_sockets:
152 self.send(input_socket, "l")
153 self.assertIsMsgType(input_socket, "c")
154
155 self.assertMonitoring(first_monitoring_socket, "input.received_stop_messages", 2)
156 self.assertMonitoring(first_monitoring_socket, "input.all_stop_messages", True)
157 self.assertMonitoring(second_monitoring_socket, "input.received_stop_messages", 3)
158 self.assertMonitoring(second_monitoring_socket, "input.all_stop_messages", True)
159
160 self.assertMonitoring(final_monitoring_socket, "input.received_stop_messages", 2)
161 self.assertMonitoring(final_monitoring_socket, "input.all_stop_messages", True)
162 self.assertHasOutputFile("outputFile.root", unlink=False)
163 self.check_histogram_repeated("outputFile.root", 5)
164
165
166 self.send(first_monitoring_socket, "n")
167 self.send(second_monitoring_socket, "n")
168 self.send(final_monitoring_socket, "n")
169
170
171 self.assertNotHasOutputFile("outputFile.root", timeout=1)
172
173 self.assertMonitoring(first_monitoring_socket, "input.received_stop_messages", 0)
174 self.assertMonitoring(first_monitoring_socket, "input.all_stop_messages", False)
175 self.assertMonitoring(second_monitoring_socket, "input.received_stop_messages", 0)
176 self.assertMonitoring(second_monitoring_socket, "input.all_stop_messages", False)
177
178 self.assertMonitoring(final_monitoring_socket, "input.received_stop_messages", 0)
179 self.assertMonitoring(final_monitoring_socket, "input.all_stop_messages", False)
180
181
182 for _ in range(5):
183 for input_socket in input_sockets[:2]:
184 self.send(input_socket, "v", self.histogram_data, self.event_data)
185 self.assertIsMsgType(input_socket, "c")
186
187
188 self.check_histogram_repeated("outputFile.root", 2)
189
190
191 for input_socket in input_sockets:
192 self.send(input_socket, "x")
193 self.assertIsMsgType(input_socket, "c")
194
195
196 self.assertHasOutputFile("outputFile.root", unlink=False)
197 self.check_histogram_repeated("outputFile.root", 2)
198
199 self.assertIsDown("first")
200 self.assertIsDown("second")
201 self.assertIsDown("final_collector")
202