Workflow Logic Code in the Snakefile#
In snakemake all workflow logic is centralized in the so-called snakefile, similar to a make file. All analysis code is detached in separate scripts, with minimal adaptations to accommodate them in the workflow logic. Snakemake supports multiple programming languages; we will stick to Python.
Processing steps in a snakemake workflow are called rules, written in a simple Python-based language and typically consist of input:
files, output:
files and a directive to create the latter. Available directives are shell:
commands, python code in run:
or a script:
path. Let us examine the snakefile for our Belle II workflow.
configfile: "config.yaml"
workdir: config["outputDirectory"]
localrules: OfflineAnalysis, MergeEventType, MergeBatch, BatchToTextFile, Skim, SetProxy #only Reconstruction on bsub cluster
from extravariables import outputfile, runningOnMC
import numpy as np
import hashlib
rule OfflineAnalysis:
input:
data_BB = "data/projectName_bmesons/reco.root",
data_QQ = "data/projectName_qqcontinuum/reco.root"
output:
mbc_plot = "Mbc.jpg",
deltaE_plot = "deltaE.jpg"
script:
"offlineanalysis.py"
def PathDictionary(wildcards): #match gbasf2 dataset paths to output file paths
Paths = dict()
for i,skim in enumerate(open(f"{workflow.basedir}/../{wildcards.EventType}skims.dat",'r').read().splitlines()):
Paths.update({skim:f"Reconstruction/projectName_{wildcards.EventType}/skim_{i}/reco.root"})
return Paths
rule MergeEventType:
input:
unpack(PathDictionary)
output:
"data/projectName_{EventType}/reco.root"
shell:
"hadd {output} {input}" #for merging nTuples
rule MergeBatch:
input:
expand("Reconstruction/projectName_{EventType}/skim_{skim}/reco_batch{batch}.root",\
batch=np.arange(config["BatchesPerSkim"]), allow_missing=True)
output:
"Reconstruction/projectName_{EventType}/skim_{skim}/reco.root"
shell:
"hadd {output} {input}" #for merging nTuples
rule ReconstructBatch:
input:
inputfileList = "Skim/projectName_{EventType}/skim_{skim}/batch{batch}.json"
params:
runningOnMC = runningOnMC
log:
"Reconstruction/projectName_{EventType}/skim_{skim}/batch{batch}_log.dat"
output:
"Reconstruction/projectName_{EventType}/skim_{skim}/reco_batch{batch}.root"
script:
"reconstruction.py"
rule BatchToTextFile:
input:
skim_dirs = "Skim/projectName_{EventType}/skim_{skim}/skimslist.dat"
output:
expand("Skim/projectName_{EventType}/skim_{skim}/batch{batch}.json",\
batch=np.arange(config["BatchesPerSkim"]),allow_missing=True)
params:
BatchesPerSkim = config["BatchesPerSkim"]
script:
"batchToTxt.py"
rule Skim:
input:
proxy_text_file = "proxy.dat"
params:
steeringfile = f"{workflow.basedir}/skim.py",
sandbox_input_files = ["extravariables.py"],
gbasf2_dataset = lambda wildcards: list(PathDictionary(wildcards).keys())[int(wildcards.skim)],
release = str(subprocess.check_output(["b2help-releases"]).strip(),'utf-8'),
maxretries = int(config["gbasf2_max_retries"]),
gbasf2_download_logs = bool(config["gbasf2_download_logs"]),
gbasf2_min_proxy_lifetime = config["gbasf2_min_proxy_lifetime"],
gbasf2_proxy_lifetime = config["gbasf2_proxy_lifetime"],
gbasf2_output_file_name = outputfile,
gbasf2_project_name_prefix = lambda wildcards: wildcards.EventType
output:
output_filelist = "Skim/projectName_{EventType}/skim_{skim}/skimslist.dat"
wrapper:
"file:/path/to/gbasf2_wrapper_for_snakemake"
rule SetProxy:
params:
setProxy = True
output:
proxy_text_file = temp("proxy.dat")
wrapper:
"file:/path/to/gbasf2_wrapper_for_snakemake"
In a snakefile the topmost rule is the target rule, in our case OfflineAnalysis
which produces the histogram plots. Rules can have log:
and params:
directives for log file paths and additional parameters respectively.
The output file directory structure is specified in the file paths and created automatically. To avoid repetition, we can use wildcards that are filled in automatically. For example the rule with output "data/projectName_{EventType}/reco.root"
will be called as many times as there are values for {EventType}
(here twice for its two values bmesons
and qqcontinuum
).
To merge basf2 output files, we can simply employ hadd
or b2file_merge
in a shell:
directive.
To submit jobs to the grid using gbasf2, we provide a public wrapper via git clone https://github.com/casschmitt/gbasf2_wrapper_for_snakemake.git
. Specify wrapper: "file:/path/to/gbasf2_wrapper_for_snakemake"
as a directive in the rules that you want to submit using gbasf2. It takes care of proxy setup, job submission, reschedules failed jobs and downloads finished job outputs. To make sure the proxy is only initialized once, please include a rule with setProxy = True
and require its output proxy_text_file
in the skim rules. After checking for download completeness, it returns a text file with the paths to all output files, which can be used in subsequent processing steps. Please note that gbasf2 does not support absolute paths for the sandbox files. Global gbasf2-specific parameters can be given in a configfile:
outputDirectory:
"/group/belle/users/<user>/"
BatchesPerSkim:
3
gbasf2_max_retries:
10
gbasf2_download_logs:
False
gbasf2_min_proxy_lifetime:
0
gbasf2_proxy_lifetime:
24
To map gbasf2 input file paths to output directories, we here use a dictionary PathDictionary
, which is filled with paths from the provided text files:
/belle/MC/release-05-02-00/DB00001330/MC14ri_d/prod00021635/s00/e1003/4S/r00000/uubar/mdst
/belle/MC/release-05-02-00/DB00001330/MC14ri_d/prod00021636/s00/e1003/4S/r00000/ddbar/mdst
/belle/MC/release-05-02-00/DB00001330/MC14ri_d/prod00021752/s00/e1003/4S/r00000/charged/mdst
/belle/MC/release-05-02-00/DB00001330/MC14ri_d/prod00021641/s00/e1003/4S/r00000/mixed/mdst