Skip to content
Snippets Groups Projects
Unverified Commit 6de018e3 authored by Dmitry Kalinkin's avatar Dmitry Kalinkin Committed by GitHub
Browse files

backwards_ecal: run with dask (#122)

parent b8e5f579
Branches
No related tags found
No related merge requests found
...@@ -152,6 +152,10 @@ rule backwards_ecal: ...@@ -152,6 +152,10 @@ rule backwards_ecal:
script="benchmarks/backwards_ecal/backwards_ecal.py", script="benchmarks/backwards_ecal/backwards_ecal.py",
output: output:
directory("results/backwards_ecal/{CAMPAIGN}/") directory("results/backwards_ecal/{CAMPAIGN}/")
log:
scheduler=".logs/results/backwards_ecal/{CAMPAIGN}/scheduler.log",
worker=".logs/results/backwards_ecal/{CAMPAIGN}/worker.log",
threads: workflow.cores
shell: shell:
""" """
if [[ "{wildcards.CAMPAIGN}" == "local" ]]; then if [[ "{wildcards.CAMPAIGN}" == "local" ]]; then
...@@ -159,6 +163,20 @@ export PLOT_TITLE="Benchmark simulation" ...@@ -159,6 +163,20 @@ export PLOT_TITLE="Benchmark simulation"
else else
export PLOT_TITLE="\\textbf{{ePIC}} Simulation {wildcards.CAMPAIGN}" export PLOT_TITLE="\\textbf{{ePIC}} Simulation {wildcards.CAMPAIGN}"
fi fi
set -m # monitor mode to prevent lingering processes
cleanup() {{
echo Cleaning up
kill $WORKER_PID $SCHEDULER_PID
}}
trap cleanup EXIT
PORT=$RANDOM
dask scheduler --port $PORT 2>{log.scheduler} &
export DASK_SCHEDULER=localhost:$PORT
SCHEDULER_PID=$!
dask worker tcp://$DASK_SCHEDULER --nworkers {threads} --nthreads 1 2>{log.worker} &
WORKER_PID=$!
env \ env \
MATPLOTLIBRC={input.matplotlibrc} \ MATPLOTLIBRC={input.matplotlibrc} \
DETECTOR_CONFIG=""" + DETECTOR_CONFIG + """ \ DETECTOR_CONFIG=""" + DETECTOR_CONFIG + """ \
......
...@@ -18,12 +18,32 @@ import os ...@@ -18,12 +18,32 @@ import os
from pathlib import Path from pathlib import Path
import awkward as ak import awkward as ak
import boost_histogram as bh
import dask_histogram as dh
import numpy as np import numpy as np
import vector
import uproot import uproot
from sklearn.metrics import roc_curve import vector
#+end_src
#+begin_src jupyter-python :results silent
from dask.distributed import Client
client = Client(os.environ.get("DASK_SCHEDULER", "localhost:8786"))
#+end_src
#+begin_src jupyter-python
vector.register_awkward() vector.register_awkward()
from dask.distributed import WorkerPlugin
class VectorImporter(WorkerPlugin):
idempotent=True
def setup(self, worker):
import vector
vector.register_awkward()
client.register_plugin(VectorImporter())
#+end_src #+end_src
* Plotting setup * Plotting setup
...@@ -93,7 +113,7 @@ energies = [ ...@@ -93,7 +113,7 @@ energies = [
] ]
filter_name = [ filter_name = [
"MCParticles.*", "MCParticles.*",
"*EcalEndcapNClusters*", "EcalEndcapNClusters.energy",
] ]
pi_eval = {} pi_eval = {}
...@@ -105,18 +125,46 @@ def readlist(path): ...@@ -105,18 +125,46 @@ def readlist(path):
return paths return paths
for energy in energies: for energy in energies:
pi_eval[energy] = filter_pointing(uproot.concatenate( pi_eval[energy] = uproot.dask(
{path: "events" for path in readlist(INPUT_PATH_FORMAT.format(particle="pi-", energy=energy))}, {path: "events" for path in readlist(INPUT_PATH_FORMAT.format(particle="pi-", energy=energy))},
filter_name=filter_name, filter_name=filter_name,
)) steps_per_file=1,
e_eval[energy] = filter_pointing(uproot.concatenate( open_files=False,
).map_partitions(filter_pointing)
e_eval[energy] = uproot.dask(
{path: "events" for path in readlist(INPUT_PATH_FORMAT.format(particle="e-", energy=energy))}, {path: "events" for path in readlist(INPUT_PATH_FORMAT.format(particle="e-", energy=energy))},
filter_name=filter_name, filter_name=filter_name,
)) steps_per_file=1,
open_files=False,
).map_partitions(filter_pointing)
#+end_src #+end_src
** Energy resolution ** Energy resolution
The first step in the analysis is plotting distributions of cluster energies.
Only electron distribution is needed, but we also do pi- for the pion rejection
study.
#+begin_src jupyter-python
e_over_p_hist = {}
e_over_p_axis = bh.axis.Regular(101, 0., 1.01)
for ix, energy in enumerate(energies):
for particle_name, dataset in [("e-", e_eval[energy]), ("pi-", pi_eval[energy])]:
energy_value = float(energy.replace("GeV", "").replace("MeV", "e-3"))
def clf(events):
return ak.drop_none(ak.max(events["EcalEndcapNClusters.energy"], axis=-1)) / energy_value
e_over_p_hist[(particle_name, energy)] = dh.factory(
clf(dataset),
axes=(e_over_p_axis,),
)
e_over_p_hist = client.gather(client.compute(e_over_p_hist))
#+end_src
The next step is to plot the histograms together with fits and produce a plot
for resolution vs energy that summarizes them.
#+begin_src jupyter-python #+begin_src jupyter-python
fig, axs = plt.subplots(2, 4, sharex=True, sharey=True, figsize=(15, 6)) fig, axs = plt.subplots(2, 4, sharex=True, sharey=True, figsize=(15, 6))
...@@ -125,29 +173,26 @@ axs = np.ravel(np.array(axs)) ...@@ -125,29 +173,26 @@ axs = np.ravel(np.array(axs))
sigmas_rel_FWHM_cb = {} sigmas_rel_FWHM_cb = {}
fractions_below = {} fractions_below = {}
for ix, energy in enumerate(energies): def norm_by_sum(arr):
energy_value = float(energy.replace("GeV", "").replace("MeV", "e-3")) return arr / np.sum(arr)
clf_label = PLOT_TITLE
def clf(events):
return ak.drop_none(ak.max(events["EcalEndcapNClusters.energy"], axis=-1)) / energy_value
e_pred = clf(e_eval[energy])
for ix, energy in enumerate(energies):
plt.sca(axs[ix]) plt.sca(axs[ix])
counts, bins, patches = plt.hist(e_pred, weights=np.full_like(e_pred, 1.0 / ak.num(e_pred, axis=0)), bins=np.linspace(0.01, 1.01, 101), label=rf"$e^-$ {clf_label}") hist = e_over_p_hist[("e-", energy)]
patch = plt.stairs(norm_by_sum(hist.values()), hist.axes[0].edges, label=rf"$e^-$ {PLOT_TITLE}")
plt.title(f"{energy}") plt.title(f"{energy}")
e_over_p = (bins[1:] + bins[:-1]) / 2
import scipy.stats import scipy.stats
def f(x, n, beta, m, loc, scale): def f(x, n, beta, m, loc, scale):
return n * scipy.stats.crystalball.pdf(x, beta, m, loc, scale) return n * scipy.stats.crystalball.pdf(x, beta, m, loc, scale)
p0 = (np.sum(counts[10:]), 2., 3., 0.95, 0.05) p0 = (np.sum(hist.values()[10:]), 2., 3., 0.95, 0.05)
try: try:
import scipy.optimize import scipy.optimize
par, pcov = scipy.optimize.curve_fit(f, e_over_p[5:], counts[5:], p0=p0, maxfev=10000) par, pcov = scipy.optimize.curve_fit(f, hist.axes[0].centers[5:], hist.values()[5:], p0=p0, maxfev=10000)
except RuntimeError: except RuntimeError:
par = None par = None
plt.plot(e_over_p, f(e_over_p, *par), label=rf"Crystal Ball fit", color="tab:green", lw=0.8) plt.plot(hist.axes[0].centers, f(hist.axes[0].centers, *par), label=rf"Crystal Ball fit", color="tab:green", lw=0.8)
def summarize_fit(par): def summarize_fit(par):
_, _, _, loc_cb, scale_cb = par _, _, _, loc_cb, scale_cb = par
...@@ -156,19 +201,19 @@ for ix, energy in enumerate(energies): ...@@ -156,19 +201,19 @@ for ix, energy in enumerate(energies):
f_prime = lambda x: f(x, *par) - y_max / 2 f_prime = lambda x: f(x, *par) - y_max / 2
x_plus, = scipy.optimize.root(f_prime, loc_cb + scale_cb).x x_plus, = scipy.optimize.root(f_prime, loc_cb + scale_cb).x
x_minus, = scipy.optimize.root(f_prime, loc_cb - scale_cb).x x_minus, = scipy.optimize.root(f_prime, loc_cb - scale_cb).x
plt.axvline(x_minus, ls="--", lw=0.75, color=patches[0].get_facecolor(), label=r"$\mu - $FWHM") plt.axvline(x_minus, ls="--", lw=0.75, color=patch.get_facecolor(), label=r"$\mu - $FWHM")
plt.axvline(x_plus, ls=":", lw=0.75, color=patches[0].get_facecolor(), label=r"$\mu + $FWHM") plt.axvline(x_plus, ls=":", lw=0.75, color=patch.get_facecolor(), label=r"$\mu + $FWHM")
fwhm = (x_plus - x_minus) / loc_cb fwhm = (x_plus - x_minus) / loc_cb
sigma_rel_FWHM_cb = fwhm / 2 / np.sqrt(2 * np.log(2)) sigma_rel_FWHM_cb = fwhm / 2 / np.sqrt(2 * np.log(2))
cutoff_x = loc_cb - fwhm cutoff_x = loc_cb - fwhm
fraction_below = np.sum(counts[e_over_p < cutoff_x]) / ak.num(e_pred, axis=0) fraction_below = np.sum(hist.values()[hist.axes[0].centers < cutoff_x]) / np.sum(hist.values())
return sigma_rel_FWHM_cb, fraction_below return sigma_rel_FWHM_cb, fraction_below
sigma_rel_FWHM_cb, fraction_below = summarize_fit(par) sigma_rel_FWHM_cb, fraction_below = summarize_fit(par)
sigmas_rel_FWHM_cb.setdefault(clf_label, {})[energy] = sigma_rel_FWHM_cb sigmas_rel_FWHM_cb.setdefault(PLOT_TITLE, {})[energy] = sigma_rel_FWHM_cb
fractions_below.setdefault(clf_label, {})[energy] = fraction_below fractions_below.setdefault(PLOT_TITLE, {})[energy] = fraction_below
plt.legend() plt.legend()
plt.xlabel("$E/p$", loc="right") plt.xlabel("$E/p$", loc="right")
...@@ -246,7 +291,6 @@ rocs = {} ...@@ -246,7 +291,6 @@ rocs = {}
for ix, energy in enumerate(energies): for ix, energy in enumerate(energies):
energy_value = float(energy.replace("GeV", "").replace("MeV", "e-3")) energy_value = float(energy.replace("GeV", "").replace("MeV", "e-3"))
clf_label = PLOT_TITLE
def clf(events): def clf(events):
return ak.drop_none(ak.max(events["EcalEndcapNClusters.energy"], axis=-1)) / energy_value return ak.drop_none(ak.max(events["EcalEndcapNClusters.energy"], axis=-1)) / energy_value
e_pred = clf(e_eval[energy]) e_pred = clf(e_eval[energy])
...@@ -254,8 +298,8 @@ for ix, energy in enumerate(energies): ...@@ -254,8 +298,8 @@ for ix, energy in enumerate(energies):
for do_log, ax in [(False, axs[ix]), (True, axs_log[ix])]: for do_log, ax in [(False, axs[ix]), (True, axs_log[ix])]:
plt.sca(ax) plt.sca(ax)
plt.hist(e_pred, weights=np.full_like(e_pred, 1.0 / ak.num(e_pred, axis=0)), bins=np.linspace(0., 1.01, 101), label=rf"$e^-$ {clf_label}") patch = plt.stairs(norm_by_sum(e_over_p_hist[("e-", energy)].values()), e_over_p_hist[("e-", energy)].axes[0].edges, label=rf"$e^-$ {PLOT_TITLE}")
plt.hist(pi_pred, weights=np.full_like(pi_pred, 1.0 / ak.num(pi_pred, axis=0)), bins=np.linspace(0., 1.01, 101), label=rf"$\pi^-$ {clf_label}", histtype="step") patch = plt.stairs(norm_by_sum(e_over_p_hist[("pi-", energy)].values()), e_over_p_hist[("pi-", energy)].axes[0].edges, label=rf"$\pi^-$ {PLOT_TITLE}")
plt.title(f"{energy}") plt.title(f"{energy}")
plt.legend() plt.legend()
plt.xlabel("Classifier output") plt.xlabel("Classifier output")
...@@ -264,18 +308,18 @@ for ix, energy in enumerate(energies): ...@@ -264,18 +308,18 @@ for ix, energy in enumerate(energies):
plt.yscale("log") plt.yscale("log")
plt.sca(axs_roc[ix]) plt.sca(axs_roc[ix])
fpr, tpr, _ = roc_curve( fpr = np.cumsum(e_over_p_hist[("pi-", energy)].values()[::-1])
np.concatenate([np.ones_like(e_pred), np.zeros_like(pi_pred)]), fpr /= fpr[-1]
np.concatenate([e_pred, pi_pred]), tpr = np.cumsum(e_over_p_hist[("e-", energy)].values()[::-1])
) tpr /= tpr[-1]
cond = fpr != 0 # avoid infinite rejection (region of large uncertainty) cond = fpr != 0 # avoid infinite rejection (region of large uncertainty)
cond &= tpr != 1 # avoid linear interpolation (region of large uncertainty) cond &= tpr != 1 # avoid linear interpolation (region of large uncertainty)
def mk_interp(tpr, fpr): def mk_interp(tpr, fpr):
def interp(eff): def interp(eff):
return np.interp(eff, tpr, fpr) return np.interp(eff, tpr, fpr)
return interp return interp
rocs.setdefault(clf_label, {})[energy] = mk_interp(tpr, fpr) rocs.setdefault(PLOT_TITLE, {})[energy] = mk_interp(tpr, fpr)
plt.plot(tpr[cond] * 100, 1 / fpr[cond], label=f"{clf_label}") plt.plot(tpr[cond] * 100, 1 / fpr[cond], label=f"{PLOT_TITLE}")
plt.yscale("log") plt.yscale("log")
plt.title(f"{energy}") plt.title(f"{energy}")
plt.legend(loc="lower left") plt.legend(loc="lower left")
......
...@@ -26,7 +26,7 @@ bench:backwards_ecal: ...@@ -26,7 +26,7 @@ bench:backwards_ecal:
script: script:
- export PYTHONUSERBASE=$LOCAL_DATA_PATH/deps - export PYTHONUSERBASE=$LOCAL_DATA_PATH/deps
- pip install -r benchmarks/backwards_ecal/requirements.txt - pip install -r benchmarks/backwards_ecal/requirements.txt
- snakemake $SNAKEMAKE_FLAGS --cores 1 results/backwards_ecal/local - snakemake $SNAKEMAKE_FLAGS --cores 5 results/backwards_ecal/local
bench:backwards_ecal_campaigns: bench:backwards_ecal_campaigns:
extends: .det_benchmark extends: .det_benchmark
...@@ -36,7 +36,7 @@ bench:backwards_ecal_campaigns: ...@@ -36,7 +36,7 @@ bench:backwards_ecal_campaigns:
script: script:
- export PYTHONUSERBASE=$LOCAL_DATA_PATH/deps - export PYTHONUSERBASE=$LOCAL_DATA_PATH/deps
- pip install -r benchmarks/backwards_ecal/requirements.txt - pip install -r benchmarks/backwards_ecal/requirements.txt
- snakemake $SNAKEMAKE_FLAGS --cores 1 results/backwards_ecal/24.10.1 - snakemake $SNAKEMAKE_FLAGS --cores 5 results/backwards_ecal/24.10.1
collect_results:backwards_ecal: collect_results:backwards_ecal:
extends: .det_benchmark extends: .det_benchmark
......
awkward >= 2.4.0 awkward >= 2.4.0
scikit-learn boost_histogram
dask >= 2024
dask_awkward >= 2024
dask_histogram
distributed >= 2024
uproot >= 5.2.0 uproot >= 5.2.0
vector vector
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment