15import pyarrow.parquet
as pq
22def merge_root_to_parquet(root_dir, parquet_dir, mask_value, uniqueIdentifier, tree_name="tflat_variables"):
24 Merges tflat sampled root files into one parquet file
26 files = sorted(glob.glob(os.path.join(root_dir, f
"{uniqueIdentifier}_training_data*.root")))
28 print(
"Merging root files into parquet")
29 for i
in range(len(files)):
30 print(f
"\r{i+1}/{len(files)}", end=
"", flush=
True)
32 with uproot.open(f)[tree_name]
as tree:
33 df = tree.arrays(library=
"pd")
36 m = df[
"qrCombined"].min()
38 df[
"qrCombined"] = df[
"qrCombined"].where(df[
"qrCombined"] != m, 0)
41 assert len(df[
"qrCombined"].unique()) == 2
44 df = df.fillna(mask_value)
46 for column
in df.columns:
47 if (column[0:2] ==
'__') & (column[-2:] ==
'__'):
48 df = df.drop(column, axis=1)
50 table = pa.Table.from_pandas(df)
52 writer = pq.ParquetWriter(
53 os.path.join(parquet_dir, f
'{uniqueIdentifier}_samples_merged.parquet'),
54 table.schema, compression=
'snappy')
55 writer.write_table(table)
60def create_dataset(pf, parquet_path, index, chunk_size, n_rowgroups, rowgroup_edges):
62 Picks rows from parquet file according to the given index array.
63 Created paruet file is segmented into rowgroups with maximum size given by chunk_size.
66 n_chunks = (len(index)-1)//chunk_size + 1
67 for chunk
in range(n_chunks):
68 print(f
"\r{chunk+1}/{n_chunks}", end=
"", flush=
True)
69 chunk_df = pd.DataFrame()
71 if (chunk+1)*chunk_size > len(index):
72 index_chunks = index[chunk*chunk_size:]
74 index_chunks = index[chunk*chunk_size:(chunk+1)*chunk_size]
76 for rowgroup
in range(n_rowgroups):
77 upper_edge = rowgroup_edges[rowgroup]
79 rows_to_fetch = index_chunks[(index_chunks >= lower_edge) & (index_chunks < upper_edge)]
80 if len(rows_to_fetch) > 0:
81 table = pf.read_row_group(rowgroup)
82 df = table.to_pandas()
83 df = df.iloc[(rows_to_fetch-lower_edge)]
84 chunk_df = pd.concat([chunk_df, df])
85 lower_edge = upper_edge
86 table = pa.Table.from_pandas(chunk_df)
88 writer = pq.ParquetWriter(parquet_path, table.schema, compression=
'NONE')
89 writer.write_table(table)
93def shuffle_and_chunk_parquet(parquet_dir, val_split, chunk_size, uniqueIdentifier):
95 Splits single parquet file into a training and validation parquet file.
96 The data contained in the resulting files is shuffled and segmented into chunks.
98 pf = pq.ParquetFile(os.path.join(parquet_dir, f
'{uniqueIdentifier}_samples_merged.parquet'))
101 n_rowgroups = pf.num_row_groups
102 for i
in range(n_rowgroups):
103 n = pf.metadata.row_group(i).num_rows
105 rowgroup_edges.append(n_rows)
106 index = np.arange(n_rows)
107 np.random.shuffle(index)
108 n_training_samples = int(n_rows*val_split)
109 index_training = index[:n_training_samples]
110 index_validation = index[n_training_samples:]
111 print(
'\nCreating training dataset')
116 f
'{uniqueIdentifier}_training_samples.parquet'),
121 print(
'\nCreating validation dataset')
126 f
'{uniqueIdentifier}_validation_samples.parquet'),
133if __name__ ==
"__main__":
134 parser = argparse.ArgumentParser(description=
'Train TFlat')
139 help=
'Path to directory where sampled root files are stored'
145 help=
'Path to directory where parquet files are saved to'
148 '--uniqueIdentifier',
149 metavar=
'uniqueIdentifier',
150 dest=
'uniqueIdentifier',
152 default=
"TFlaT_MC16rd_light_2601_hyperion",
153 help=
'Name of both the config .yaml to be used and the produced weightfile'
155 args = parser.parse_args()
156 root_dir = args.root_dir
157 parquet_dir = args.parquet_dir
158 uniqueIdentifier = args.uniqueIdentifier
159 os.makedirs(parquet_dir, exist_ok=
True)
161 config = utils.load_config(uniqueIdentifier)
162 val_split = config[
'train_valid_fraction']
163 chunk_size = config[
'chunk_size']
164 mask_value = config[
'parameters'][
'mask_value']
166 merge_root_to_parquet(
168 parquet_dir=parquet_dir,
169 mask_value=mask_value,
170 uniqueIdentifier=uniqueIdentifier
172 shuffle_and_chunk_parquet(
173 parquet_dir=parquet_dir,
175 chunk_size=chunk_size,
176 uniqueIdentifier=uniqueIdentifier