101 def testEvents(self):
102 """test function"""
103 monitoring_socket = self.create_socket(self.monitoring_port)
104
105
106 input_socket = self.create_socket(self.input_port, socket_type=zmq.STREAM, bind=True)
107 identity, _ = self.recv(input_socket)
108
109
110 output_socket = self.create_socket(self.output_port)
111 second_output_socket = self.create_socket(self.output_port, identity="other_socket")
112
113 self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 0)
114 self.assertMonitoring(monitoring_socket, "output.registered_workers", 0)
115
116
117
118
119 self.send(output_socket, "r")
120
121
122 self.assertNothingMore(output_socket)
123 self.assertNothingMore(second_output_socket)
124
125
126 self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 1)
127 self.assertMonitoring(monitoring_socket, "output.registered_workers", 1)
128 self.assertMonitoring(monitoring_socket, "output.ready_messages[socket]", 1)
129 self.assertMonitoring(monitoring_socket, "input.socket_state", "connected")
130 self.assertMonitoring(monitoring_socket, "input.socket_connects", 1)
131 self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 0)
132
133
134 input_socket.send_multipart([identity, self.event_data])
135
136 self.assertIsMsgType(output_socket, "u")
137 self.assertNothingMore(second_output_socket)
138
139
140 self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 0)
141 self.assertMonitoring(monitoring_socket, "output.registered_workers", 1)
142 self.assertMonitoring(monitoring_socket, "output.ready_messages[socket]", 0)
143 self.assertMonitoring(monitoring_socket, "input.socket_state", "connected")
144 self.assertMonitoring(monitoring_socket, "input.socket_connects", 1)
145 self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 0)
146
147
148 input_socket.send_multipart([identity, self.event_data])
149
150
151 self.assertNothingMore(output_socket)
152 self.assertNothingMore(second_output_socket)
153
154
155 self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 0)
156 self.assertMonitoring(monitoring_socket, "output.registered_workers", 1)
157 self.assertMonitoring(monitoring_socket, "output.ready_messages[socket]", 0)
158 self.assertMonitoring(monitoring_socket, "input.socket_state", "connected")
159 self.assertMonitoring(monitoring_socket, "input.socket_connects", 1)
160 self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 0)
161
162
163 self.send(second_output_socket, "r")
164
165 self.assertNothingMore(output_socket)
166 self.assertIsMsgType(second_output_socket, "u")
167
168 self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 0)
169 self.assertMonitoring(monitoring_socket, "output.registered_workers", 2)
170 self.assertMonitoring(monitoring_socket, "input.socket_state", "connected")
171 self.assertMonitoring(monitoring_socket, "input.socket_connects", 1)
172 self.assertMonitoring(monitoring_socket, "input.socket_disconnects", 0)
173
174
175 input_socket.send_multipart([identity, self.event_data])
176
177 self.send(second_output_socket, "r")
178 self.assertNothingMore(output_socket)
179 self.assertIsMsgType(second_output_socket, "u")
180
181
182 self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 0)
183 self.assertMonitoring(monitoring_socket, "output.registered_workers", 2)
184
185
186 self.send(output_socket, "r")
187 self.send(output_socket, "r")
188 self.send(second_output_socket, "r")
189 self.send(second_output_socket, "r")
190
191 self.assertMonitoring(monitoring_socket, "output.ready_queue_size", 4)
192 self.assertMonitoring(monitoring_socket, "output.registered_workers", 2)
193