15import pyarrow.parquet
as pq
22def merge_root_to_parquet(root_dir, parquet_dir, mask_value, uniqueIdentifier, tree_name="tflat_variables"):
24 Merges tflat sampled root files into one parquet file
26 files = sorted(glob.glob(os.path.join(root_dir, f
"{uniqueIdentifier}_training_data*.root")))
29 print(
"Merging root files into parquet")
30 for i
in range(len(files)):
31 print(f
"\r{i+1}/{len(files)}", end=
"", flush=
True)
33 with uproot.open(f)[tree_name]
as tree:
34 df = tree.arrays(library=
"pd")
38 df[
"qrCombined"] = df[
"qrCombined"].where(df[
"qrCombined"] != -1, 0)
41 assert set(df[
"qrCombined"].unique()).issubset([0, 1])
44 df = df.fillna(mask_value)
46 for column
in df.columns:
47 if (column[0:2] ==
'__') & (column[-2:] ==
'__'):
48 df = df.drop(column, axis=1)
50 table = pa.Table.from_pandas(df)
52 writer = pq.ParquetWriter(
53 os.path.join(parquet_dir, f
'{uniqueIdentifier}_samples_merged.parquet'),
54 table.schema, compression=
'snappy')
55 writer.write_table(table)
58 print(f
'\nNumber of events: {n_rows}')
61def create_dataset(pf, parquet_path, index, chunk_size, n_rowgroups, rowgroup_edges):
63 Picks rows from parquet file according to the given index array.
64 Created paruet file is segmented into rowgroups with maximum size given by chunk_size.
67 n_chunks = (len(index)-1)//chunk_size + 1
68 for chunk
in range(n_chunks):
69 print(f
"\r{chunk+1}/{n_chunks}", end=
"", flush=
True)
70 chunk_df = pd.DataFrame()
72 if (chunk+1)*chunk_size > len(index):
73 index_chunks = index[chunk*chunk_size:]
75 index_chunks = index[chunk*chunk_size:(chunk+1)*chunk_size]
77 for rowgroup
in range(n_rowgroups):
78 upper_edge = rowgroup_edges[rowgroup]
80 rows_to_fetch = index_chunks[(index_chunks >= lower_edge) & (index_chunks < upper_edge)]
81 if len(rows_to_fetch) > 0:
82 table = pf.read_row_group(rowgroup)
83 df = table.to_pandas()
84 df = df.iloc[(rows_to_fetch-lower_edge)]
85 chunk_df = pd.concat([chunk_df, df])
86 lower_edge = upper_edge
87 table = pa.Table.from_pandas(chunk_df)
89 writer = pq.ParquetWriter(parquet_path, table.schema, compression=
'NONE')
90 writer.write_table(table)
94def shuffle_and_chunk_parquet(parquet_dir, val_split, chunk_size, uniqueIdentifier):
96 Splits single parquet file into a training and validation parquet file.
97 The data contained in the resulting files is shuffled and segmented into chunks.
99 pf = pq.ParquetFile(os.path.join(parquet_dir, f
'{uniqueIdentifier}_samples_merged.parquet'))
102 n_rowgroups = pf.num_row_groups
103 for i
in range(n_rowgroups):
104 n = pf.metadata.row_group(i).num_rows
106 rowgroup_edges.append(n_rows)
107 index = np.arange(n_rows)
108 np.random.shuffle(index)
109 n_training_samples = int(n_rows*val_split)
110 index_training = index[:n_training_samples]
111 index_validation = index[n_training_samples:]
112 print(
'Creating training dataset')
117 f
'{uniqueIdentifier}_training_samples.parquet'),
122 print(
'\nCreating validation dataset')
127 f
'{uniqueIdentifier}_validation_samples.parquet'),
134if __name__ ==
"__main__":
135 parser = argparse.ArgumentParser(description=
'Train TFlat')
140 help=
'Path to directory where sampled root files are stored'
146 help=
'Path to directory where parquet files are saved to'
149 '--uniqueIdentifier',
150 metavar=
'uniqueIdentifier',
151 dest=
'uniqueIdentifier',
153 default=
"TFlaT_MC16rd_light_2601_hyperion",
154 help=
'Name of both the config .yaml to be used and the produced weightfile'
156 args = parser.parse_args()
157 root_dir = args.root_dir
158 parquet_dir = args.parquet_dir
159 uniqueIdentifier = args.uniqueIdentifier
160 os.makedirs(parquet_dir, exist_ok=
True)
162 config = utils.load_config(uniqueIdentifier)
163 val_split = config[
'train_valid_fraction']
164 chunk_size = config[
'chunk_size']
165 mask_value = config[
'parameters'][
'mask_value']
167 merge_root_to_parquet(
169 parquet_dir=parquet_dir,
170 mask_value=mask_value,
171 uniqueIdentifier=uniqueIdentifier
173 shuffle_and_chunk_parquet(
174 parquet_dir=parquet_dir,
176 chunk_size=chunk_size,
177 uniqueIdentifier=uniqueIdentifier