Belle II Software development
b2hlt_unitinfo.py
1#!/usr/bin/env python3
2
3
10
11import zmq
12from argparse import ArgumentParser
13from zmq_daq.utils import normalize_addresses, get_monitor_table
14
15import os
16from time import sleep
17from collections import defaultdict
18
19
20def get_overview(df):
21 if df is None:
22 return None, None
23
24 import pandas as pd
25 tmp = pd.Series({tuple(key.split(".")): value for key, value in df.items()}).unstack(0).T
26
27 df_to_show = pd.DataFrame(columns=tmp.index).T
28
29 if "input" in tmp and "dead_workers" in tmp["input"]:
30 df_to_show["dead workers"] = tmp["input"]["dead_workers"]
31
32 if "output" in tmp and "ready_queue_size" in tmp["output"]:
33 df_to_show["ready queue size"] = tmp["output"]["ready_queue_size"]
34
35 if "input" in tmp and "output" in tmp:
36 input_df = tmp["input"]
37 output_df = tmp["output"]
38
39 # average between input and output
40 if "data_size" in input_df and "data_size" in output_df:
41 df_to_show["data size"] = (input_df["data_size"].fillna(0) + output_df["data_size"].fillna(0)) / 2
42
43 if "event_rate" in input_df and "event_rate" in output_df:
44 df_to_show["event rate"] = (input_df["event_rate"].fillna(0) + output_df["event_rate"].fillna(0)) / 2
45
46 # this actually only exists for one, so we fill the other with empty data
47 if "registered_workers" in input_df and "registered_workers" in output_df:
48 df_to_show["registered workers"] = input_df["registered_workers"].fillna(0) + output_df["registered_workers"].fillna(0)
49
50 if "socket_state" in input_df and "socket_state" in output_df:
51 df_to_show["raw socket state"] = input_df["socket_state"].fillna("") + output_df["socket_state"].fillna("")
52
53 df_to_show = df_to_show.T.fillna("")
54
55 worker_information = defaultdict(lambda: defaultdict(int))
56
57 def add_information(col, grouped_df, prefix):
58 if "[" not in col or not col.endswith("]"):
59 return
60
61 key, process_identifier = col[:-1].split("[")
62 hostname, pid = process_identifier.split("_", 1)
63
64 key = key.replace("_from", "").replace("_to", "")
65
66 if key in ["total_number_messages", "hello_messages"]:
67 return
68
69 key = key.replace("received_", "").replace("sent_", "")
70 key = key.replace("_", " ")
71
72 worker_information[prefix + hostname][key] += grouped_df[col].dropna().iloc[0]
73
74 if key == "data size":
75 worker_information[prefix + hostname]["hosts"] += 1
76
77 if "input" in tmp:
78 grouped_input = tmp["input"]
79 for col in grouped_input.columns:
80 add_information(col, grouped_input, "from ")
81
82 if "output" in tmp:
83 grouped_output = tmp["output"]
84 for col in grouped_output.columns:
85 add_information(col, grouped_output, "to ")
86
87 worker_information = pd.DataFrame(worker_information).T
88
89 if "hosts" in worker_information:
90 worker_information["hosts"] = pd.to_numeric(worker_information["hosts"], errors="coerce", downcast="integer")
91 if "ready messages" in worker_information:
92 worker_information["ready messages"] = pd.to_numeric(
93 worker_information["ready messages"], errors="coerce", downcast="integer")
94 if "events" in worker_information:
95 worker_information["events"] = pd.to_numeric(worker_information["events"], errors="coerce", downcast="integer")
96 if "data size" in worker_information and "hosts" in worker_information:
97 worker_information["data size"] = worker_information["data size"] / worker_information["hosts"]
98
99 worker_information = worker_information.fillna("")
100 worker_information = worker_information.loc[sorted(worker_information.index, key=lambda x: x.split(" ")[::-1])]
101
102 return df_to_show, worker_information
103
104
105if __name__ == '__main__':
106 parser = ArgumentParser(
107 description="Shortcut to b2hlt_monitor.py to only show the most relevant information for the full HLT unit")
108 parser.add_argument(
109 "addresses",
110 nargs='*',
111 help="Monitor the given addresses. " +
112 "Valid input formats are 'tcp://<host>:<port>', '<host>:<port>' or just '<port>' in which case localhost is assumed.",
113 default=["tcp://hltin:7000", "tcp://hltout:7000"])
114 parser.add_argument("--watch", action="store_true", help="Enter watch mode, where the script is called every 1 second.")
115
116 args = parser.parse_args()
117
118 addresses = normalize_addresses(args.addresses)
119
120 # Create and connect all needed sockets
121 ctx = zmq.Context()
122 ctx.setsockopt(zmq.LINGER, 0)
123 sockets = {address: ctx.socket(zmq.DEALER) for address in addresses}
124
125 for address, socket in sockets.items():
126 socket.connect(address)
127
128 try:
129 # When no additional things are requested, just show the table once and exit
130 if not args.watch:
131 df = get_monitor_table(sockets, show_detail=True)
132 df_to_show, worker_information = get_overview(df)
133 print(df_to_show)
134 print("")
135 print(worker_information)
136 exit()
137
138 # Else we go into a main loop
139 while True:
140 df = get_monitor_table(sockets, show_detail=True)
141 df_to_show, worker_information = get_overview(df)
142
143 os.system("clear")
144 print(df_to_show)
145 print("")
146 print(worker_information)
147
148 sleep(1.0)
149 except KeyboardInterrupt:
150 pass