12 from argparse
import ArgumentParser
13 from zmq_daq.utils import normalize_addresses, get_monitor_table
16 from time
import sleep
17 from collections
import defaultdict
25 tmp = pd.Series({tuple(key.split(
".")): value
for key, value
in df.items()}).unstack(0).T
27 df_to_show = pd.DataFrame(columns=tmp.index).T
29 if "input" in tmp
and "dead_workers" in tmp[
"input"]:
30 df_to_show[
"dead workers"] = tmp[
"input"][
"dead_workers"]
32 if "output" in tmp
and "ready_queue_size" in tmp[
"output"]:
33 df_to_show[
"ready queue size"] = tmp[
"output"][
"ready_queue_size"]
35 if "input" in tmp
and "output" in tmp:
36 input_df = tmp[
"input"]
37 output_df = tmp[
"output"]
40 if "data_size" in input_df
and "data_size" in output_df:
41 df_to_show[
"data size"] = (input_df[
"data_size"].fillna(0) + output_df[
"data_size"].fillna(0)) / 2
43 if "event_rate" in input_df
and "event_rate" in output_df:
44 df_to_show[
"event rate"] = (input_df[
"event_rate"].fillna(0) + output_df[
"event_rate"].fillna(0)) / 2
47 if "registered_workers" in input_df
and "registered_workers" in output_df:
48 df_to_show[
"registered workers"] = input_df[
"registered_workers"].fillna(0) + output_df[
"registered_workers"].fillna(0)
50 if "socket_state" in input_df
and "socket_state" in output_df:
51 df_to_show[
"raw socket state"] = input_df[
"socket_state"].fillna(
"") + output_df[
"socket_state"].fillna(
"")
53 df_to_show = df_to_show.T.fillna(
"")
55 worker_information = defaultdict(
lambda: defaultdict(int))
57 def add_information(col, grouped_df, prefix):
58 if "[" not in col
or not col.endswith(
"]"):
61 key, process_identifier = col[:-1].split(
"[")
62 hostname, pid = process_identifier.split(
"_", 1)
64 key = key.replace(
"_from",
"").replace(
"_to",
"")
66 if key
in [
"total_number_messages",
"hello_messages"]:
69 key = key.replace(
"received_",
"").replace(
"sent_",
"")
70 key = key.replace(
"_",
" ")
72 worker_information[prefix + hostname][key] += grouped_df[col].dropna().iloc[0]
74 if key ==
"data size":
75 worker_information[prefix + hostname][
"hosts"] += 1
78 grouped_input = tmp[
"input"]
79 for col
in grouped_input.columns:
80 add_information(col, grouped_input,
"from ")
83 grouped_output = tmp[
"output"]
84 for col
in grouped_output.columns:
85 add_information(col, grouped_output,
"to ")
87 worker_information = pd.DataFrame(worker_information).T
89 if "hosts" in worker_information:
90 worker_information[
"hosts"] = pd.to_numeric(worker_information[
"hosts"], errors=
"coerce", downcast=
"integer")
91 if "ready messages" in worker_information:
92 worker_information[
"ready messages"] = pd.to_numeric(
93 worker_information[
"ready messages"], errors=
"coerce", downcast=
"integer")
94 if "events" in worker_information:
95 worker_information[
"events"] = pd.to_numeric(worker_information[
"events"], errors=
"coerce", downcast=
"integer")
96 if "data size" in worker_information
and "hosts" in worker_information:
97 worker_information[
"data size"] = worker_information[
"data size"] / worker_information[
"hosts"]
99 worker_information = worker_information.fillna(
"")
100 worker_information = worker_information.loc[sorted(worker_information.index, key=
lambda x: x.split(
" ")[::-1])]
102 return df_to_show, worker_information
105 if __name__ ==
'__main__':
106 parser = ArgumentParser(
107 description=
"Shortcut to b2hlt_monitor.py to only show the most relevant information for the full HLT unit")
111 help=
"Monitor the given addresses. " +
112 "Valid input formats are 'tcp://<host>:<port>', '<host>:<port>' or just '<port>' in which case localhost is assumed.",
113 default=[
"tcp://hltin:7000",
"tcp://hltout:7000"])
114 parser.add_argument(
"--watch", action=
"store_true", help=
"Enter watch mode, where the script is called every 1 second.")
116 args = parser.parse_args()
118 addresses = normalize_addresses(args.addresses)
122 ctx.setsockopt(zmq.LINGER, 0)
123 sockets = {address: ctx.socket(zmq.DEALER)
for address
in addresses}
125 for address, socket
in sockets.items():
126 socket.connect(address)
131 df = get_monitor_table(sockets, show_detail=
True)
132 df_to_show, worker_information = get_overview(df)
135 print(worker_information)
140 df = get_monitor_table(sockets, show_detail=
True)
141 df_to_show, worker_information = get_overview(df)
146 print(worker_information)
149 except KeyboardInterrupt: