Workflow Logic Code in the Snakefile

Workflow Logic Code in the Snakefile#

In snakemake all workflow logic is centralized in the so-called snakefile, similar to a make file. All analysis code is detached in separate scripts, with minimal adaptations to accommodate them in the workflow logic. Snakemake supports multiple programming languages; we will stick to Python.

Processing steps in a snakemake workflow are called rules, written in a simple Python-based language and typically consist of input: files, output: files and a directive to create the latter. Available directives are shell: commands, python code in run: or a script: path. Let us examine the snakefile for our Belle II workflow.

Listing 3.7 snakefile#
configfile: "config.yaml"

workdir: config["outputDirectory"]

localrules: OfflineAnalysis, MergeEventType, MergeBatch, BatchToTextFile, Skim, SetProxy #only Reconstruction on bsub cluster
    
from extravariables import outputfile, runningOnMC
import numpy as np
import hashlib

rule OfflineAnalysis:
    input:
        data_BB = "data/projectName_bmesons/reco.root",
        data_QQ = "data/projectName_qqcontinuum/reco.root"
        
    output:
        mbc_plot = "Mbc.jpg",
        deltaE_plot = "deltaE.jpg"
    
    script:
        "offlineanalysis.py"
      
def PathDictionary(wildcards): #match gbasf2 dataset paths to output file paths
    Paths = dict()
    for i,skim in enumerate(open(f"{workflow.basedir}/../{wildcards.EventType}skims.dat",'r').read().splitlines()):
        Paths.update({skim:f"Reconstruction/projectName_{wildcards.EventType}/skim_{i}/reco.root"})
    return Paths

rule MergeEventType:
    input:
        unpack(PathDictionary)
        
    output:
        "data/projectName_{EventType}/reco.root"
        
    shell:
        "hadd {output} {input}" #for merging nTuples
        
rule MergeBatch:
    input:
        expand("Reconstruction/projectName_{EventType}/skim_{skim}/reco_batch{batch}.root",\
               batch=np.arange(config["BatchesPerSkim"]), allow_missing=True)
        
    output:
        "Reconstruction/projectName_{EventType}/skim_{skim}/reco.root"
        
    shell:
        "hadd {output} {input}" #for merging nTuples
        
rule ReconstructBatch:
    input:
        inputfileList = "Skim/projectName_{EventType}/skim_{skim}/batch{batch}.json"
        
    params:
        runningOnMC = runningOnMC
    
    log:
        "Reconstruction/projectName_{EventType}/skim_{skim}/batch{batch}_log.dat"
        
    output:
        "Reconstruction/projectName_{EventType}/skim_{skim}/reco_batch{batch}.root"
        
    script:
        "reconstruction.py"

rule BatchToTextFile:
    input:
        skim_dirs = "Skim/projectName_{EventType}/skim_{skim}/skimslist.dat"
        
    output:
        expand("Skim/projectName_{EventType}/skim_{skim}/batch{batch}.json",\
               batch=np.arange(config["BatchesPerSkim"]),allow_missing=True)
        
    params:
        BatchesPerSkim = config["BatchesPerSkim"]
        
    script:
        "batchToTxt.py"
        
rule Skim:
    input:
        proxy_text_file = "proxy.dat"
    params:
        steeringfile = f"{workflow.basedir}/skim.py",
        sandbox_input_files = ["extravariables.py"],
        gbasf2_dataset = lambda wildcards: list(PathDictionary(wildcards).keys())[int(wildcards.skim)],
        release = str(subprocess.check_output(["b2help-releases"]).strip(),'utf-8'),
        maxretries = int(config["gbasf2_max_retries"]),
        gbasf2_download_logs = bool(config["gbasf2_download_logs"]),
        gbasf2_min_proxy_lifetime = config["gbasf2_min_proxy_lifetime"],
        gbasf2_proxy_lifetime = config["gbasf2_proxy_lifetime"],
        gbasf2_output_file_name = outputfile,
        gbasf2_project_name_prefix = lambda wildcards: wildcards.EventType
    output:
        output_filelist = "Skim/projectName_{EventType}/skim_{skim}/skimslist.dat"
    wrapper:
        "file:/path/to/gbasf2_wrapper_for_snakemake"
        
rule SetProxy:
    params:
        setProxy = True
    output:
        proxy_text_file = temp("proxy.dat")
    wrapper:
        "file:/path/to/gbasf2_wrapper_for_snakemake"

In a snakefile the topmost rule is the target rule, in our case OfflineAnalysis which produces the histogram plots. Rules can have log: and params: directives for log file paths and additional parameters respectively.

The output file directory structure is specified in the file paths and created automatically. To avoid repetition, we can use wildcards that are filled in automatically. For example the rule with output "data/projectName_{EventType}/reco.root" will be called as many times as there are values for {EventType} (here twice for its two values bmesons and qqcontinuum).

To merge basf2 output files, we can simply employ hadd or b2file_merge in a shell: directive.

To submit jobs to the grid using gbasf2, we provide a public wrapper via git clone https://github.com/casschmitt/gbasf2_wrapper_for_snakemake.git. Specify wrapper: "file:/path/to/gbasf2_wrapper_for_snakemake" as a directive in the rules that you want to submit using gbasf2. It takes care of proxy setup, job submission, reschedules failed jobs and downloads finished job outputs. To make sure the proxy is only initialized once, please include a rule with setProxy = True and require its output proxy_text_file in the skim rules. After checking for download completeness, it returns a text file with the paths to all output files, which can be used in subsequent processing steps. Please note that gbasf2 does not support absolute paths for the sandbox files. Global gbasf2-specific parameters can be given in a configfile:

Listing 3.8 config.yaml#
outputDirectory:
    "/group/belle/users/<user>/"
BatchesPerSkim:
    3
gbasf2_max_retries:
    10
gbasf2_download_logs:
    False
gbasf2_min_proxy_lifetime:
    0
gbasf2_proxy_lifetime:
    24

To map gbasf2 input file paths to output directories, we here use a dictionary PathDictionary, which is filled with paths from the provided text files:

Listing 3.9 ../qqcontinuumskims.dat#
/belle/MC/release-05-02-00/DB00001330/MC14ri_d/prod00021635/s00/e1003/4S/r00000/uubar/mdst
/belle/MC/release-05-02-00/DB00001330/MC14ri_d/prod00021636/s00/e1003/4S/r00000/ddbar/mdst
Listing 3.10 ../bmesonsskims.dat#
/belle/MC/release-05-02-00/DB00001330/MC14ri_d/prod00021752/s00/e1003/4S/r00000/charged/mdst
/belle/MC/release-05-02-00/DB00001330/MC14ri_d/prod00021641/s00/e1003/4S/r00000/mixed/mdst