14 from argparse
import ArgumentParser, FileType
15 from zmq_daq.utils import normalize_addresses, get_monitor_table
18 from time
import sleep
19 from collections
import defaultdict
27 tmp = pd.Series({tuple(key.split(
".")): value
for key, value
in df.items()}).unstack(0).T
29 df_to_show = pd.DataFrame(columns=tmp.index).T
31 if "input" in tmp
and "dead_workers" in tmp[
"input"]:
32 df_to_show[
"dead workers"] = tmp[
"input"][
"dead_workers"]
34 if "output" in tmp
and "ready_queue_size" in tmp[
"output"]:
35 df_to_show[
"ready queue size"] = tmp[
"output"][
"ready_queue_size"]
37 if "input" in tmp
and "output" in tmp:
38 input_df = tmp[
"input"]
39 output_df = tmp[
"output"]
42 if "data_size" in input_df
and "data_size" in output_df:
43 df_to_show[
"data size"] = (input_df[
"data_size"].fillna(0) + output_df[
"data_size"].fillna(0)) / 2
45 if "event_rate" in input_df
and "event_rate" in output_df:
46 df_to_show[
"event rate"] = (input_df[
"event_rate"].fillna(0) + output_df[
"event_rate"].fillna(0)) / 2
49 if "registered_workers" in input_df
and "registered_workers" in output_df:
50 df_to_show[
"registered workers"] = input_df[
"registered_workers"].fillna(0) + output_df[
"registered_workers"].fillna(0)
52 if "socket_state" in input_df
and "socket_state" in output_df:
53 df_to_show[
"raw socket state"] = input_df[
"socket_state"].fillna(
"") + output_df[
"socket_state"].fillna(
"")
55 df_to_show = df_to_show.T.fillna(
"")
57 worker_information = defaultdict(
lambda: defaultdict(int))
59 def add_information(col, grouped_df, prefix):
60 if "[" not in col
or not col.endswith(
"]"):
63 key, process_identifier = col[:-1].split(
"[")
64 hostname, pid = process_identifier.split(
"_", 1)
66 key = key.replace(
"_from",
"").replace(
"_to",
"")
68 if key
in [
"total_number_messages",
"hello_messages"]:
71 key = key.replace(
"received_",
"").replace(
"sent_",
"")
72 key = key.replace(
"_",
" ")
74 worker_information[prefix + hostname][key] += grouped_df[col].dropna().iloc[0]
76 if key ==
"data size":
77 worker_information[prefix + hostname][
"hosts"] += 1
80 grouped_input = tmp[
"input"]
81 for col
in grouped_input.columns:
82 add_information(col, grouped_input,
"from ")
85 grouped_output = tmp[
"output"]
86 for col
in grouped_output.columns:
87 add_information(col, grouped_output,
"to ")
89 worker_information = pd.DataFrame(worker_information).T
91 if "hosts" in worker_information:
92 worker_information[
"hosts"] = pd.to_numeric(worker_information[
"hosts"], errors=
"coerce", downcast=
"integer")
93 if "ready messages" in worker_information:
94 worker_information[
"ready messages"] = pd.to_numeric(
95 worker_information[
"ready messages"], errors=
"coerce", downcast=
"integer")
96 if "events" in worker_information:
97 worker_information[
"events"] = pd.to_numeric(worker_information[
"events"], errors=
"coerce", downcast=
"integer")
98 if "data size" in worker_information
and "hosts" in worker_information:
99 worker_information[
"data size"] = worker_information[
"data size"] / worker_information[
"hosts"]
101 worker_information = worker_information.fillna(
"")
102 worker_information = worker_information.loc[sorted(worker_information.index, key=
lambda x: x.split(
" ")[::-1])]
104 return df_to_show, worker_information
107 if __name__ ==
'__main__':
108 parser = ArgumentParser(
109 description=
"Shortcut to b2hlt_monitor.py to only show the most relevant information for the full HLT unit")
113 help=
"Monitor the given addresses. " +
114 "Valid input formats are 'tcp://<host>:<port>', '<host>:<port>' or just '<port>' in which case localhost is assumed.",
115 default=[
"tcp://hltin:7000",
"tcp://hltout:7000"])
116 parser.add_argument(
"--watch", action=
"store_true", help=
"Enter watch mode, where the script is called every 1 second.")
118 args = parser.parse_args()
120 addresses = normalize_addresses(args.addresses)
124 ctx.setsockopt(zmq.LINGER, 0)
125 sockets = {address: ctx.socket(zmq.DEALER)
for address
in addresses}
127 for address, socket
in sockets.items():
128 socket.connect(address)
133 df = get_monitor_table(sockets, show_detail=
True)
134 df_to_show, worker_information = get_overview(df)
137 print(worker_information)
142 df = get_monitor_table(sockets, show_detail=
True)
143 df_to_show, worker_information = get_overview(df)
148 print(worker_information)
151 except KeyboardInterrupt: