ANASEN_analysis/ELoss/PCEnergyAnalysis.py
2026-05-22 14:41:23 -04:00

869 lines
24 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed May 20 13:32:14 2026
@author: jamesszalkie
"""
import numpy as np
import pandas as pd
from scipy.interpolate import interp1d
import argparse
import uproot
import pycatima as catima
from scipy.integrate import cumulative_trapezoid
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import threading
import time
import sys
import cmd
import shlex
import textwrap
import readline
import atexit
import os
import periodictable as pt
import re
#Run program from terminal or IDE, and prompts will provide user steps
histfile = os.path.expanduser("~/.uproot_shell_history")
try:
readline.read_history_file(histfile)
except FileNotFoundError:
pass
atexit.register(readline.write_history_file, histfile)
#data = [z, mass_u, maximum MeV, name]
alpha_data = [2, 4.0026, 40, "alpha"]
proton_data = [1, 1.0078, 20, "proton"]
deuteron_data = [1, 2.014102, 30, "deuteron"]
dA = 3.2 #cm
dC = 4.2 #cm
dsx3 = 8.8 #cm
particles = {
"alpha": alpha_data,
"proton": proton_data,
"deuteron": deuteron_data
}
interp_cache = {}
def make_E_vs_x(z, mass_u, emax_mev, label, npoints, P_TORR, TEMP_K):
# GAS SETUP
R = 8.3144
P_TORR = 400
TEMP_K = 293.15
R = 8.3144
# Gas density
p_pa = P_TORR * 133.322
molar_density = p_pa / (R * TEMP_K)
m_he = 4.0026
m_c = 12.0000
m_o = 15.9949
m_mix_avg = (0.96 * m_he) + (0.04 * (m_c + 2*m_o))
rho_g_cm3 = (molar_density * m_mix_avg) / 1e6
print(f"Gas density = {rho_g_cm3:.6e} g/cm^3")
# MATERIAL
material_def = [
(m_he, 2, 0.96),
(m_c, 6, 0.04),
(m_o, 8, 0.08)
]
gas_mix = catima.Material(material_def)
gas_mix.density(rho_g_cm3)
projectile = catima.Projectile(mass_u, z)
# Energy grid
E = np.linspace(0.01, emax_mev, npoints)
# Stopping power array
S_mass = np.zeros_like(E)
for i, energy in enumerate(E):
projectile.T(energy / mass_u)
# MeV / (g/cm^2)
S_mass[i] = catima.dedx(projectile, gas_mix)
# Convert to MeV/cm
S_linear = S_mass * rho_g_cm3
# Sort descending energy
sort_idx = np.argsort(E)[::-1]
E = E[sort_idx]
S_linear = S_linear[sort_idx]
# Integrate dx/dE = 1/S(E)
invS = 1.0 / S_linear
x = cumulative_trapezoid(
invS,
E,
initial=0
)
x = -x
# Output table
output = pd.DataFrame({
"Distance_cm": x,
"Energy_MeV": E
})
outfile = f"E_vs_x_{label}.dat"
output.to_csv(
outfile,
sep='\t',
index=False
)
print(f"Saved: {outfile}")
return x, E
#Generate energy loss tables from file
def load_table(filename):
"""
Load table with columns:
x(cm) E(MeV)
Returns:
x_array, E_array
"""
data = pd.read_csv(
filename,
sep=r'\s+',
comment="#",
header=None,
skiprows=1
)
x = data.iloc[:, 0].to_numpy()
E = data.iloc[:, 1].to_numpy()
return x, E
def get_interpolators(particle):
if particle in interp_cache:
return interp_cache[particle]
filename = f"E_vs_x_{particle}.dat"
x, E = load_table(filename)
E_of_x = interp1d(
x,
E,
bounds_error=False,
fill_value=0.0
)
x_of_E = interp1d(
E[::-1],
x[::-1],
bounds_error=False,
fill_value="extrapolate"
)
interp_cache[particle] = (E_of_x, x_of_E)
return E_of_x, x_of_E
def energy_loss(particle, Ei, dl):
E_of_x, x_of_E = get_interpolators(particle)
xi = x_of_E(Ei)
xf = xi + dl
Ef = E_of_x(xf)
return np.maximum(Ef, 0.0)
#def energy_loss_angular(particle, Ei, theta):
def energy_reconstruction(particle, Ef, dl):
E_of_x, x_of_E = get_interpolators(particle)
xf = x_of_E(Ef)
xi = xf - dl
Ei = E_of_x(xi)
return np.maximum(Ei, 0.0)
def energy_distance(particle, Ei, Ef):
_, x_of_E = get_interpolators(particle)
xi = x_of_E(Ei)
xf = x_of_E(Ef)
return np.abs(xf - xi)
def resolve_particle(name):
name = name.lower().strip().rstrip("s")
if name in particles:
return particles[name]
match = re.match(r"([a-zA-Z]+)[-\s]?(\d+)", name)
if match:
element_symbol = match.group(1).capitalize()
A = int(match.group(2))
try:
element = pt.elements.symbol(element_symbol)
isotope = element[A] # <-- THIS is the correct way
return (
isotope.number, # Z
isotope.mass, # mass in u
30.0,
f"{element_symbol}-{A}"
)
except Exception:
raise ValueError(f"Unknown isotope: {name}")
try:
elem = pt.elements.symbol(name.capitalize())
return elem.number, elem.mass, 30.0, name
except Exception:
raise ValueError(f"Unknown particle/isotope: {name}")
class MyInteractiveApp(cmd.Cmd):
def __init__(self):
super().__init__()
# Initial value set when the script starts
self.buckets = 500
self.T = 293.15
self.P = 400
self.temp_particle = [0, 0.0, 0.0, ""]
self.rootFile = "SimAnasen1.root"
self.file = None
self.initialize_file()
self.tree = None
print("-" * 30)
print("INTERACTIVE SHELL STARTED")
self.print_params()
print("Type 'help' for commands.")
print("Type 'exit' to end program")
print("-" * 30)
def print_params(self):
"""Helper method to display current state"""
print(f"Current Parameters: T={self.T} K, P={self.P} Torr")
def initialize_file(self):
"""Load in default root file for anasen"""
try:
self.file = uproot.open(f"../Armory/{self.rootFile}")
except:
self.file = None
print("\nATTENTION: Root file not found, continue without uproot functions or uproot file manually")
#intro = "Interactive Shell Started. Type 'help' to see commands."
prompt = ">> "
def default(self, line):
# Check if the command starts with our multi-word phrase
if line.startswith("make table "):
# Extract everything after "make table "
args = line[len("make table "):].strip()
self.do_make_table(args)
elif line.startswith("set t") or line.startswith("Set T") or line.startswith("set T") or line.startswith("Set t"):
args = line[len("set t "):].strip()
self.do_set_T(args)
elif line.startswith("set p") or line.startswith("Set P") or line.startswith("set P") or line.startswith("Set p"):
args = line[len("set p "):].strip()
self.do_set_P(args)
elif line.startswith("set buckets") or line.startswith("Set Buckets") or line.startswith("set Buckets") or line.startswith("Set buckets"):
args = line[len("set buckets "):].strip()
self.do_set_Buckets(args)
elif line.startswith("energy loss") or line.startswith("Energy Loss") or line.startswith("Energy loss"):
args = line[len("energy loss "):].strip()
self.do_energy_loss(args)
elif line.startswith("energy reconstruction") or line.startswith("Energy Reconstruction") or line.startswith("Energy reconstruction"):
args = line[len("energy reconstruction "):].strip()
self.do_energy_reconstruction(args)
elif line.startswith("energy distance") or line.startswith("Energy Distance") or line.startswith("Energy distance"):
args = line[len("energy distance "):].strip()
self.do_energy_distance(args)
else:
print(f"*** Unknown syntax: {line}")
def do_T(self, arg):
"""Print value of T"""
print(self.T)
def do_Buckets(self, arg):
"""print number of histogram buckets and points"""
print(self.buckets)
def do_P(self, arg):
"""Print value of P (pressure)"""
print(self.P)
def do_make_particle(self, arg):
"""Enter parameters for new particle to be given to catyma system
Ex: >> make_particle <z> <mass u> <maximum E value> <particle name>"""
args = shlex.split(arg)
z = int(args[0])
mass_u = float(args[1])
emax_mev = float(args[2])
particle = args[3]
self.temp_particle = [z, mass_u, emax_mev, particle]
print(f"New particle '{particle}': z = {z}, mass_u = {mass_u}, emax_mev = {emax_mev}")
def do_set_T(self, arg):
"""Changes the value of T. Usage: set_t 300"""
try:
self.T = float(arg)
print(f"T has been updated to {self.T}")
except ValueError:
print("Please enter a valid number for T.")
def do_set_P(self, arg):
"""Changes the value of P in Torr. Usage: set_ 400"""
try:
self.P = float(arg)
print(f"P has been updated to {self.P}")
except ValueError:
print("Please enter a valid number for P.")
def do_set_Buckets(self, arg):
"""Changes the value of buckets"""
try:
self.buckets = int(arg)
print(f"Number of buckets has been updated to {self.buckets}")
except ValueError:
print("Please enter a valid number of buckets.")
def do_exit(self, arg):
"""Exits the application."""
print("Closing application...")
return True # Returning True stops the cmdloop()
def do_make_table(self, arg):
"""Create E vs X tables for particle, or isotopes
Ex: >> make table proton <max energy (optional) >
Ex: >> make table Co60 <max energy (optional) >
Ex: >> make table N17 <max energy (optional) >"""
try:
args = shlex.split(arg)
if not args:
print("Please enter desired reaction particle")
return
name = args[0]
if len(args) > 1:
emax_mev = float(args[1])
else:
emax_mev = None
z, mass_u, default_emax, label = resolve_particle(name)
if emax_mev is None:
emax_mev = default_emax
x, E = make_E_vs_x(
z,
mass_u,
emax_mev,
label,
self.buckets,
self.P,
self.T
)
plt.figure(figsize=(8,6))
plt.plot(x, E)
plt.xlabel("Distance (cm)")
plt.ylabel("Energy (MeV)")
plt.title(f"Energy Loss Curve {label.capitalize()}")
plt.grid(True)
plt.show()
except Exception as e:
print(f"Error in make_table: {e}")
def load_table(self, arg):
args = shlex.split(arg)
particle = args[0]
filename = f"E_vs_x_{particle}.dat"
load_table(filename)
print(f"Loaded table {filename}")
def do_energy_loss(self, arg):
"""Find a final energy given an initial energy and distance travelled
Ex: >> energy loss <particle> <initial energy MeV> <distance travelled cm>"""
args = shlex.split(arg)
try:
particle = args[0]
Ei = float(args[1])
dl = float(args[2])
Ei_offset = Ei * 1.1
table_specs = f"{particle} {Ei_offset}"
try:
self.do_make_table(table_specs)
Ef = energy_loss(particle, Ei, dl)
print(f"\nFinal energy: {Ef:.6f} MeV\n")
except:
return
except IndexError:
print("Please input particle, initial energy, and distance travelled")
def do_energy_reconstruction(self, arg):
"""Find a vertex energy given an final energy and distance travelled
Ex: >> energy reconstruction <particle> <final energy MeV> <distance travelled cm>"""
args = shlex.split(arg)
try:
particle = args[0]
Ef = float(args[1])
dl = float(args[2])
try:
Ei = energy_reconstruction(particle, Ef, dl)
if Ei > 0:
print(f"\nInitial energy: {Ei:.6f} MeV\n")
else:
print("Error: remake table with larger value, fallen off map")
except:
print("Particle energy table not made yet, please do so using 'make table'")
except IndexError:
print("Please input particle, final energy from detector, and distance travelled")
def do_energy_distance(self, arg):
"""Find a distance travelled given an initial and final energy
Ex: >> energy distance <particle> <initial energy> <final energy MeV>"""
args = shlex.split(arg)
try:
particle = args[0]
Ei = float(args[1])
Ef = float(args[2])
dE = Ei - Ef
try:
dl = energy_distance(particle, Ei, Ef)
if dl > 0:
print(f"\nChange in energy: {dE:.6f} MeV")
print(f"Distance travelled: {dl:.6f} cm\n")
else:
print("Error: remake table with larger value, fallen off map")
except:
print("Particle energy table not made yet, please do so using 'make table'")
except IndexError:
print("Please input particle, final energy from detector, and distance travelled")
def do_uproot_file(self, arg):
"""Open a specific root file for inspection"""
args = shlex.split(arg)
if len(args) > 0:
filename = args[0]
else:
filename = self.rootFile
try:
print(f"Opening {filename}")
# Try Armory path first
try:
self.file = uproot.open(f"../Armory/{filename}")
except FileNotFoundError:
self.file = uproot.open(filename)
print("File loaded successfully.")
print("Keys:", self.file.keys())
except Exception as e:
print("Error opening file:", e)
def do_print_file(self, arg):
"""Print contents of ROOT file"""
args = shlex.split(arg)
file = self.file
if "keys" in args or len(args) == 0:
print("Keys in file: ", file.keys())
if "class_names" in args or len(args) == 0:
print("Class names: ", file.classnames())
def do_set_tree(self, arg):
if self.file is None:
print("No ROOT file loaded.")
return
keys = self.file.keys()
print("Available trees:", keys)
treeName = arg if len(arg) > 0 else "tree"
try:
# uproot automatically resolves ";1"
self.tree = self.file[treeName]
print(f"Tree loaded: {self.tree}")
print("Branches:", self.tree.keys())
except Exception as e:
print("Error loading tree:", treeName)
print("Exception:", e)
def run_command_line(self):
import readline
import atexit
import os
import awkward as ak
histfile = os.path.expanduser("~/.uproot_shell_history")
try:
readline.read_history_file(histfile)
except FileNotFoundError:
pass
readline.set_history_length(1000)
readline.parse_and_bind("tab: complete")
atexit.register(readline.write_history_file, histfile)
print("Custom Python CMD (type 'exit' to stop)")
local_vars = {
"file": self.file,
"uproot": uproot,
"np": np,
#"ak": ak,
"plt": plt,
"self": self
}
startup_script = textwrap.dedent("""
tree = file["tree"]
""")
exec(startup_script, globals(), local_vars)
while True:
try:
entry = input(">>> ").strip()
if entry.lower() in ["exit", "quit"]:
break
if not entry:
continue
try:
result = eval(entry, globals(), local_vars)
if result is not None:
print(result)
except SyntaxError:
exec(entry, globals(), local_vars)
if "tree" in local_vars:
self.tree = local_vars["tree"]
except OverflowError():
print("Arrays too large, causing crash")
except Exception as e:
print(f"Error: {e}")
self.shell_vars = local_vars
def do_uproot(self, arg):
"""Start up an in-program command line to use root tools with python,
look up 'uproot' for more details"""
self.run_command_line()
def do_energy_analysis(self, arg):
args = shlex.split(arg)
try:
particle = args[0]
except IndexError:
print("Please indicate reactant for analysis")
return
try:
max_events = int(args[1])
except IndexError:
max_events = None
if self.tree is None:
self.do_set_tree("")
print(f"Using TTree: {self.tree}")
branches = [
"Tb",
"thetab",
"sx3Z"
]
if max_events:
n_events = max_events
else:
n_events = self.tree.num_entries
print(f"Loading {n_events} events...")
data = self.tree.arrays(
branches,
library="np",
entry_stop=max_events
)
global Ei
Ei = data["Tb"]
global sx3Z
sx3Z = data["sx3Z"]
theta = np.radians(data["thetab"])
# Remove theta = 0 events
mask = np.sin(theta) != 0
Ei = Ei[mask]
theta = theta[mask]
sin_theta = np.sin(theta)
radii = np.array([3.2, 4.2, 6.6])
dA = radii[0] / sin_theta
dC = radii[1] / sin_theta
dsx3 = radii[2] / sin_theta
print("Calculating energy losses...")
global EA
EA = energy_loss(particle, Ei, dA)
global EC
EC = energy_loss(particle, Ei, dC)
global Esx3
Esx3 = energy_loss(particle, Ei, dsx3)
global Eprop
Eprop = EA - EC
global Elost
Elost = Ei - Esx3
print("Analysis complete")
print(f"Processed events: {len(Ei)}")
print(f"Anode average energy: {np.mean(EA):.3f} MeV")
print(f"Cathode average energy: {np.mean(EC):.3f} MeV")
print(f"sx3 average energy: {np.mean(Esx3):.3f} MeV")
print(f"Average total energy loss to sx3: {np.mean(Elost):.3f} MeV")
print(f"Maximum total energy loss to sx3: {np.max(Elost):.3f} MeV")
print(f"Minimum total energy loss to sx3: {np.min(Elost):.3f} MeV")
print(f"Proportion counter average energy difference: {np.mean(Eprop):.3f} MeV")
print(f"Maximum proportion counter energy difference: {np.max(Eprop):.3f} MeV")
print(f"Minimum proportion counter energy difference: {np.min(Eprop):.3f} MeV")
output_filename = "energy_analysis.root"
print(f"Writing new tree to {output_filename}")
# Load ALL original branches
all_data = self.tree.arrays(
library="np",
entry_stop=max_events
)
# Create full-length arrays initialized to NaN
n_total = len(data["Tb"])
EA_full = np.full(n_total, np.nan)
EC_full = np.full(n_total, np.nan)
Esx3_full = np.full(n_total, np.nan)
Eprop_full = np.full(n_total, np.nan)
Elost_full = np.full(n_total, np.nan)
# Put values back into valid entries
EA_full[mask] = EA
EC_full[mask] = EC
Esx3_full[mask] = Esx3
Eprop_full[mask] = Eprop
Elost_full[mask] = Elost
# Add new branches
all_data["EA"] = EA_full
all_data["EC"] = EC_full
all_data["Esx3"] = Esx3_full
all_data["Eprop"] = Eprop_full
all_data["Elost"] = Elost_full
# Write new ROOT file as a classic TTree
with uproot.recreate(output_filename) as fout:
branch_types = {
name: array.dtype
for name, array in all_data.items()
}
fout.mktree("tree", branch_types)
fout["tree"].extend(all_data)
print("Finished writing augmented ROOT file")
def do_make_plots(self, arg):
import os
args = shlex.split(arg)
particle = args[0] if len(args) > 0 else "proton"
max_events = int(args[1]) if len(args) > 1 else None
if self.tree is None:
self.do_set_tree("")
print(f"Using TTree: {self.tree}")
branches = ["Tb", "thetab", "sx3Z"]
data = self.tree.arrays(
branches,
library="np",
entry_stop=max_events
)
Ei = data["Tb"]
theta = np.radians(data["thetab"])
sx3Z = data["sx3Z"]
mask = np.sin(theta) != 0
Ei = Ei[mask]
theta = theta[mask]
sx3Z = sx3Z[mask]
sin_theta = np.sin(theta)
radii = np.array([3.2, 4.2, 6.6])
dA = radii[0] / sin_theta
dC = radii[1] / sin_theta
dsx3 = radii[2] / sin_theta
print("Computing energies...")
EA = energy_loss(particle, Ei, dA)
EC = energy_loss(particle, Ei, dC)
Esx3 = energy_loss(particle, Ei, dsx3)
Elost = Ei - Esx3
Eprop = EA - EC
base = f"{particle}_plots"
os.makedirs(base, exist_ok=True)
print(f"Saving plots to folder: {base}")
# 1. Histogram: energy loss
plt.figure(figsize=(7,5))
plt.hist(Elost, bins=200)
plt.xlabel("Energy Loss (MeV)")
plt.ylabel("Counts")
plt.title("Total Energy Loss Distribution")
plt.grid(True)
plt.tight_layout()
plt.savefig(f"{base}/Elost_hist.png", dpi=300)
plt.close()
# 2. Histogram: sx3Z
plt.figure(figsize=(7,5))
plt.hist(sx3Z, bins=100)
plt.xlabel("SX3 Z")
plt.ylabel("Counts")
plt.title("SX3 Position Distribution")
plt.grid(True)
plt.tight_layout()
plt.savefig(f"{base}/sx3Z_hist.png", dpi=300)
plt.close()
# 3. 2D: Elost vs sx3Z
plt.figure(figsize=(7,6))
plt.hist2d(Elost, sx3Z, bins=200)
plt.xlabel("Energy Loss (MeV)")
plt.ylabel("SX3 Z")
plt.title("Energy Loss vs SX3 Position")
plt.colorbar(label="Counts")
plt.tight_layout()
plt.savefig(f"{base}/Elost_vs_sx3Z.png", dpi=300)
plt.close()
#Anode energy vs sx3 energy
plt.figure(figsize=(7,6))
plt.hist2d(EA, Esx3, bins=200)
plt.xlabel("EA (MeV)")
plt.ylabel("Esx3 (MeV)")
plt.title("Anode vs SX3 Energy")
plt.colorbar(label="Counts")
plt.tight_layout()
plt.savefig(f"{base}/EA_vs_Esx3.png", dpi=300)
plt.close()
#Prop counter energy loss
plt.figure(figsize=(7,6))
plt.hist2d(Eprop, sx3Z, bins=200)
plt.xlabel("EA - EC (MeV)")
plt.ylabel("SX3 Z")
plt.title("Energy Propagation Difference vs Position")
plt.colorbar(label="Counts")
plt.tight_layout()
plt.savefig(f"{base}/Eprop_vs_sx3Z.png", dpi=300)
plt.close()
print("Plotting complete.")
#exec(open("PCEnergyAnalysis.py").read())
if __name__ == "__main__":
MyInteractiveApp().cmdloop()