618 lines
17 KiB
Python
618 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
Created on Wed May 20 13:32:14 2026
|
|
|
|
@author: jamesszalkie
|
|
"""
|
|
import numpy as np
|
|
import pandas as pd
|
|
from scipy.interpolate import interp1d
|
|
import argparse
|
|
import uproot
|
|
import pycatima as catima
|
|
import numpy as np
|
|
import pandas as pd
|
|
from scipy.interpolate import interp1d
|
|
from scipy.integrate import cumulative_trapezoid
|
|
import matplotlib
|
|
matplotlib.use("TkAgg")
|
|
import matplotlib.pyplot as plt
|
|
import threading
|
|
import time
|
|
import sys
|
|
import cmd
|
|
import shlex
|
|
import textwrap
|
|
import readline
|
|
import atexit
|
|
import os
|
|
import periodictable as pt
|
|
import re
|
|
|
|
#Run program from terminal or IDE, and prompts will provide user steps
|
|
|
|
histfile = os.path.expanduser("~/.uproot_shell_history")
|
|
|
|
try:
|
|
readline.read_history_file(histfile)
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
atexit.register(readline.write_history_file, histfile)
|
|
|
|
#data = [z, mass_u, maximum MeV, name]
|
|
alpha_data = [2, 4.0026, 40, "alpha"]
|
|
proton_data = [1, 1.0078, 20, "proton"]
|
|
deuteron_data = [1, 2.014102, 30, "deuteron"]
|
|
|
|
|
|
particles = {
|
|
"alpha": alpha_data,
|
|
"proton": proton_data,
|
|
"deuteron": deuteron_data
|
|
}
|
|
|
|
def make_E_vs_x(z, mass_u, emax_mev, label, npoints, P_TORR, TEMP_K):
|
|
# GAS SETUP
|
|
R = 8.3144
|
|
P_TORR = 400
|
|
TEMP_K = 293.15
|
|
R = 8.3144
|
|
|
|
# Gas density
|
|
p_pa = P_TORR * 133.322
|
|
molar_density = p_pa / (R * TEMP_K)
|
|
|
|
m_he = 4.0026
|
|
m_c = 12.0000
|
|
m_o = 15.9949
|
|
|
|
m_mix_avg = (0.96 * m_he) + (0.04 * (m_c + 2*m_o))
|
|
|
|
rho_g_cm3 = (molar_density * m_mix_avg) / 1e6
|
|
|
|
print(f"Gas density = {rho_g_cm3:.6e} g/cm^3")
|
|
|
|
# MATERIAL
|
|
material_def = [
|
|
(m_he, 2, 0.96),
|
|
(m_c, 6, 0.04),
|
|
(m_o, 8, 0.08)
|
|
]
|
|
|
|
gas_mix = catima.Material(material_def)
|
|
gas_mix.density(rho_g_cm3)
|
|
|
|
projectile = catima.Projectile(mass_u, z)
|
|
|
|
# Energy grid
|
|
E = np.linspace(0.01, emax_mev, npoints)
|
|
|
|
# Stopping power array
|
|
S_mass = np.zeros_like(E)
|
|
|
|
for i, energy in enumerate(E):
|
|
|
|
projectile.T(energy / mass_u)
|
|
|
|
# MeV / (g/cm^2)
|
|
S_mass[i] = catima.dedx(projectile, gas_mix)
|
|
|
|
# Convert to MeV/cm
|
|
S_linear = S_mass * rho_g_cm3
|
|
|
|
# Sort descending energy
|
|
sort_idx = np.argsort(E)[::-1]
|
|
|
|
E = E[sort_idx]
|
|
S_linear = S_linear[sort_idx]
|
|
|
|
# Integrate dx/dE = 1/S(E)
|
|
invS = 1.0 / S_linear
|
|
|
|
x = cumulative_trapezoid(
|
|
invS,
|
|
E,
|
|
initial=0
|
|
)
|
|
|
|
x = -x
|
|
|
|
# Output table
|
|
output = pd.DataFrame({
|
|
"Distance_cm": x,
|
|
"Energy_MeV": E
|
|
})
|
|
|
|
outfile = f"E_vs_x_{label}.dat"
|
|
|
|
output.to_csv(
|
|
outfile,
|
|
sep='\t',
|
|
index=False
|
|
)
|
|
|
|
print(f"Saved: {outfile}")
|
|
|
|
return x, E
|
|
|
|
#Generate energy loss tables from file
|
|
def load_table(filename):
|
|
"""
|
|
Load table with columns:
|
|
x(cm) E(MeV)
|
|
|
|
Returns:
|
|
x_array, E_array
|
|
"""
|
|
|
|
data = pd.read_csv(
|
|
filename,
|
|
sep='\s+',
|
|
comment="#",
|
|
header=None,
|
|
skiprows=1
|
|
)
|
|
|
|
x = data.iloc[:, 0].values
|
|
E = data.iloc[:, 1].values
|
|
|
|
return x, E
|
|
|
|
def energy_loss(particle, Ei, dl):
|
|
filename = f"E_vs_x_{particle}.dat"
|
|
data = pd.read_csv(
|
|
filename,
|
|
sep='\s+',
|
|
comment="#",
|
|
header=None,
|
|
skiprows=1
|
|
)
|
|
|
|
x = data.iloc[:, 0].values
|
|
E = data.iloc[:, 1].values
|
|
|
|
E_of_x = interp1d(
|
|
x,
|
|
E,
|
|
bounds_error=False,
|
|
fill_value=0.0
|
|
)
|
|
|
|
x_of_E = interp1d(
|
|
E[::-1], # reverse so energy increases
|
|
x[::-1],
|
|
bounds_error=False,
|
|
fill_value="extrapolate"
|
|
)
|
|
|
|
xi = float(x_of_E(Ei))
|
|
|
|
xf = xi + float(dl)
|
|
|
|
Ef = float(E_of_x(xf))
|
|
|
|
return max(Ef, 0.0)
|
|
|
|
#def energy_loss_angular(particle, Ei, theta):
|
|
|
|
def energy_reconstruction(particle, Ef, dl):
|
|
filename = f"E_vs_x_{particle}.dat"
|
|
data = pd.read_csv(
|
|
filename,
|
|
sep='\s+',
|
|
comment="#",
|
|
header=None,
|
|
skiprows=1
|
|
)
|
|
|
|
x = data.iloc[:, 0].values
|
|
E = data.iloc[:, 1].values
|
|
|
|
E_of_x = interp1d(
|
|
x,
|
|
E,
|
|
bounds_error=False,
|
|
fill_value=0.0
|
|
)
|
|
|
|
x_of_E = interp1d(
|
|
E[::-1], # reverse so energy increases
|
|
x[::-1],
|
|
bounds_error=False,
|
|
fill_value="extrapolate"
|
|
)
|
|
|
|
xf = float(x_of_E(Ef))
|
|
|
|
xi = xf - dl
|
|
|
|
Ei = float(E_of_x(xi))
|
|
|
|
return max(Ei, 0.0)
|
|
|
|
def energy_distance(particle, Ei, Ef):
|
|
filename = f"E_vs_x_{particle}.dat"
|
|
data = pd.read_csv(
|
|
filename,
|
|
sep='\s+',
|
|
comment="#",
|
|
header=None,
|
|
skiprows=1
|
|
)
|
|
|
|
x = data.iloc[:, 0].values
|
|
E = data.iloc[:, 1].values
|
|
|
|
x_of_E = interp1d(
|
|
E[::-1], # reverse so energy increases
|
|
x[::-1],
|
|
bounds_error=False,
|
|
fill_value="extrapolate"
|
|
)
|
|
|
|
xi = float(x_of_E(Ei))
|
|
xf = float(x_of_E(Ef))
|
|
|
|
return abs(xf - xi)
|
|
|
|
|
|
def resolve_particle(name):
|
|
name = name.lower().strip().rstrip("s")
|
|
if name in particles:
|
|
return particles[name]
|
|
match = re.match(r"([a-zA-Z]+)[-\s]?(\d+)", name)
|
|
if match:
|
|
element_symbol = match.group(1).capitalize()
|
|
A = int(match.group(2))
|
|
|
|
try:
|
|
element = pt.elements.symbol(element_symbol)
|
|
isotope = element[A] # <-- THIS is the correct way
|
|
|
|
return (
|
|
isotope.number, # Z
|
|
isotope.mass, # mass in u
|
|
30.0,
|
|
f"{element_symbol}-{A}"
|
|
)
|
|
except Exception:
|
|
raise ValueError(f"Unknown isotope: {name}")
|
|
try:
|
|
elem = pt.elements.symbol(name.capitalize())
|
|
return elem.number, elem.mass, 30.0, name
|
|
except Exception:
|
|
raise ValueError(f"Unknown particle/isotope: {name}")
|
|
|
|
|
|
class MyInteractiveApp(cmd.Cmd):
|
|
def __init__(self):
|
|
super().__init__()
|
|
# Initial value set when the script starts
|
|
self.buckets = 500
|
|
self.T = 293.15
|
|
self.P = 400
|
|
self.temp_particle = [0, 0.0, 0.0, ""]
|
|
self.rootFile = "SimAnasen1.root"
|
|
self.file = None
|
|
self.initialize_file()
|
|
self.tree = None
|
|
print("-" * 30)
|
|
print("INTERACTIVE SHELL STARTED")
|
|
self.print_params()
|
|
print("Type 'help' for commands.")
|
|
print("Type 'exit' to end program")
|
|
print("-" * 30)
|
|
|
|
def print_params(self):
|
|
"""Helper method to display current state"""
|
|
print(f"Current Parameters: T={self.T}, P={self.P}")
|
|
|
|
def initialize_file(self):
|
|
"""Load in default root file for anasen"""
|
|
self.file = uproot.open(f"../Armory/{self.rootFile}")
|
|
|
|
|
|
#intro = "Interactive Shell Started. Type 'help' to see commands."
|
|
prompt = ">> "
|
|
|
|
|
|
def default(self, line):
|
|
# Check if the command starts with our multi-word phrase
|
|
if line.startswith("make table "):
|
|
# Extract everything after "make table "
|
|
args = line[len("make table "):].strip()
|
|
self.do_make_table(args)
|
|
elif line.startswith("set t") or line.startswith("Set T") or line.startswith("set T") or line.startswith("Set t"):
|
|
args = line[len("set t "):].strip()
|
|
self.do_set_T(args)
|
|
elif line.startswith("set p") or line.startswith("Set P") or line.startswith("set P") or line.startswith("Set p"):
|
|
args = line[len("set p "):].strip()
|
|
self.do_set_P(args)
|
|
elif line.startswith("set buckets") or line.startswith("Set Buckets") or line.startswith("set Buckets") or line.startswith("Set buckets"):
|
|
args = line[len("set buckets "):].strip()
|
|
self.do_set_Buckets(args)
|
|
elif line.startswith("energy loss") or line.startswith("Energy Loss") or line.startswith("Energy loss"):
|
|
args = line[len("energy loss "):].strip()
|
|
self.do_energy_loss(args)
|
|
elif line.startswith("energy reconstruction") or line.startswith("Energy Reconstruction") or line.startswith("Energy reconstruction"):
|
|
args = line[len("energy reconstruction "):].strip()
|
|
self.do_energy_reconstruction(args)
|
|
elif line.startswith("energy distance") or line.startswith("Energy Distance") or line.startswith("Energy distance"):
|
|
args = line[len("energy distance "):].strip()
|
|
self.do_energy_distance(args)
|
|
else:
|
|
print(f"*** Unknown syntax: {line}")
|
|
|
|
def do_T(self, arg):
|
|
"""Print value of T"""
|
|
print(self.T)
|
|
|
|
def do_Buckets(self, arg):
|
|
"""print number of histogram buckets and points"""
|
|
print(self.buckets)
|
|
|
|
def do_P(self, arg):
|
|
"""Print value of P (pressure)"""
|
|
print(self.P)
|
|
|
|
def do_make_particle(self, arg):
|
|
"""Enter parameters for new particle to be given to catyma system
|
|
Ex: >> make_particle <z> <mass u> <maximum E value> <particle name>"""
|
|
args = shlex.split(arg)
|
|
z = int(args[0])
|
|
mass_u = float(args[1])
|
|
emax_mev = float(args[2])
|
|
particle = args[3]
|
|
self.temp_particle = [z, mass_u, emax_mev, particle]
|
|
print(f"New particle '{particle}': z = {z}, mass_u = {mass_u}, emax_mev = {emax_mev}")
|
|
|
|
def do_set_T(self, arg):
|
|
"""Changes the value of T. Usage: set_t 300"""
|
|
try:
|
|
self.T = float(arg)
|
|
print(f"T has been updated to {self.T}")
|
|
except ValueError:
|
|
print("Please enter a valid number for T.")
|
|
|
|
def do_set_P(self, arg):
|
|
"""Changes the value of P in Torr. Usage: set_ 400"""
|
|
try:
|
|
self.P = float(arg)
|
|
print(f"P has been updated to {self.P}")
|
|
except ValueError:
|
|
print("Please enter a valid number for P.")
|
|
|
|
def do_set_Buckets(self, arg):
|
|
"""Changes the value of buckets"""
|
|
try:
|
|
self.buckets = int(arg)
|
|
print(f"Number of buckets has been updated to {self.buckets}")
|
|
except ValueError:
|
|
print("Please enter a valid number of buckets.")
|
|
|
|
def do_exit(self, arg):
|
|
"""Exits the application."""
|
|
print("Closing application...")
|
|
return True # Returning True stops the cmdloop()
|
|
|
|
def do_make_table(self, arg):
|
|
"""Create E vs X tables for particle, or isotopes
|
|
Ex: >> make table proton <max energy (optional) >
|
|
Ex: >> make table Co60 <max energy (optional) >
|
|
Ex: >> make table N17 <max energy (optional) >"""
|
|
try:
|
|
args = shlex.split(arg)
|
|
|
|
if not args:
|
|
print("Please enter desired reaction particle")
|
|
return
|
|
|
|
name = args[0]
|
|
|
|
if len(args) > 1:
|
|
emax_mev = float(args[1])
|
|
else:
|
|
emax_mev = None
|
|
|
|
z, mass_u, default_emax, label = resolve_particle(name)
|
|
|
|
if emax_mev is None:
|
|
emax_mev = default_emax
|
|
|
|
x, E = make_E_vs_x(
|
|
z,
|
|
mass_u,
|
|
emax_mev,
|
|
label,
|
|
self.buckets,
|
|
self.P,
|
|
self.T
|
|
)
|
|
|
|
plt.figure(figsize=(8,6))
|
|
plt.plot(x, E)
|
|
plt.xlabel("Distance (cm)")
|
|
plt.ylabel("Energy (MeV)")
|
|
plt.title(f"Energy Loss Curve {label.capitalize()}")
|
|
plt.grid(True)
|
|
plt.show()
|
|
|
|
except Exception as e:
|
|
print(f"Error in make_table: {e}")
|
|
|
|
def load_table(self, arg):
|
|
args = shlex.split(arg)
|
|
particle = args[0]
|
|
filename = f"E_vs_x_{particle}.dat"
|
|
load_table(filename)
|
|
print(f"Loaded table {filename}")
|
|
|
|
def do_energy_loss(self, arg):
|
|
"""Find a final energy given an initial energy and distance travelled
|
|
Ex: >> energy loss <particle> <initial energy MeV> <distance travelled cm>"""
|
|
args = shlex.split(arg)
|
|
try:
|
|
particle = args[0]
|
|
Ei = float(args[1])
|
|
dl = float(args[2])
|
|
Ei_offset = Ei * 1.1
|
|
table_specs = f"{particle} {Ei_offset}"
|
|
try:
|
|
self.do_make_table(table_specs)
|
|
Ef = energy_loss(particle, Ei, dl)
|
|
print(f"\nFinal energy: {Ef:.6f} MeV\n")
|
|
except:
|
|
return
|
|
except IndexError:
|
|
print("Please input particle, initial energy, and distance travelled")
|
|
|
|
def do_energy_reconstruction(self, arg):
|
|
"""Find a vertex energy given an final energy and distance travelled
|
|
Ex: >> energy reconstruction <particle> <final energy MeV> <distance travelled cm>"""
|
|
args = shlex.split(arg)
|
|
try:
|
|
particle = args[0]
|
|
Ef = float(args[1])
|
|
dl = float(args[2])
|
|
try:
|
|
Ei = energy_reconstruction(particle, Ef, dl)
|
|
if Ei > 0:
|
|
print(f"\nInitial energy: {Ei:.6f} MeV\n")
|
|
else:
|
|
print("Error: remake table with larger value, fallen off map")
|
|
except:
|
|
print("Particle energy table not made yet, please do so using 'make table'")
|
|
except IndexError:
|
|
print("Please input particle, final energy from detector, and distance travelled")
|
|
|
|
def do_energy_distance(self, arg):
|
|
"""Find a distance travelled given an initial and final energy
|
|
Ex: >> energy distance <particle> <initial energy> <final energy MeV>"""
|
|
args = shlex.split(arg)
|
|
try:
|
|
particle = args[0]
|
|
Ei = float(args[1])
|
|
Ef = float(args[2])
|
|
dE = Ei - Ef
|
|
try:
|
|
dl = energy_distance(particle, Ei, Ef)
|
|
if dl > 0:
|
|
print(f"\nChange in energy: {dE:.6f} MeV")
|
|
print(f"Distance travelled: {dl:.6f} cm\n")
|
|
else:
|
|
print("Error: remake table with larger value, fallen off map")
|
|
except:
|
|
print("Particle energy table not made yet, please do so using 'make table'")
|
|
except IndexError:
|
|
print("Please input particle, final energy from detector, and distance travelled")
|
|
|
|
def do_uproot_file(self, arg):
|
|
"""Open a specific root file for inspection"""
|
|
args = shlex.split(arg)
|
|
if len(args) > 0:
|
|
filename = args[0]
|
|
else:
|
|
filename = self.rootFile
|
|
try:
|
|
print(f"Opening {filename}")
|
|
with uproot.open(f"../Armory/{filename}") as tempfile:
|
|
self.file = tempfile
|
|
except FileNotFoundError:
|
|
print("Error: file not found")
|
|
|
|
def do_print_file(self, arg):
|
|
"""Print contents of ROOT file"""
|
|
args = shlex.split(arg)
|
|
file = self.file
|
|
if "keys" in args or len(args) == 0:
|
|
print("Keys in file: ", file.keys())
|
|
if "class_names" in args or len(args) == 0:
|
|
print("Class names: ", file.classnames())
|
|
|
|
def do_set_tree(self, arg):
|
|
"""Set a specific tree from the file (default to 'tree')"""
|
|
file = self.file
|
|
if len(arg) > 0:
|
|
treeName = f"tree{arg}"
|
|
else:
|
|
treeName = "tree"
|
|
try:
|
|
self.tree = file[treeName]
|
|
tree = self.tree
|
|
print(f"Tree: {tree}")
|
|
print("Branches: ", tree.keys())
|
|
except:
|
|
print("\nError, trees include ", file.keys())
|
|
|
|
def run_command_line(self):
|
|
import readline
|
|
import atexit
|
|
import os
|
|
import awkward as ak
|
|
|
|
histfile = os.path.expanduser("~/.uproot_shell_history")
|
|
|
|
try:
|
|
readline.read_history_file(histfile)
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
readline.set_history_length(1000)
|
|
readline.parse_and_bind("tab: complete")
|
|
|
|
atexit.register(readline.write_history_file, histfile)
|
|
|
|
print("Custom Python CMD (type 'exit' to stop)")
|
|
|
|
local_vars = {
|
|
"file": self.file,
|
|
"uproot": uproot,
|
|
"np": np,
|
|
#"ak": ak,
|
|
"plt": plt,
|
|
"self": self
|
|
}
|
|
|
|
startup_script = textwrap.dedent("""
|
|
tree = file["tree"]
|
|
""")
|
|
|
|
exec(startup_script, globals(), local_vars)
|
|
while True:
|
|
try:
|
|
entry = input(">>> ").strip()
|
|
|
|
if entry.lower() in ["exit", "quit"]:
|
|
break
|
|
|
|
if not entry:
|
|
continue
|
|
|
|
# Try eval first so expressions print naturally
|
|
try:
|
|
result = eval(entry, globals(), local_vars)
|
|
|
|
if result is not None:
|
|
print(result)
|
|
|
|
except SyntaxError:
|
|
exec(entry, globals(), local_vars)
|
|
|
|
if "tree" in local_vars:
|
|
self.tree = local_vars["tree"]
|
|
except OverflowError():
|
|
print("Arrays too large, causing crash")
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
self.shell_vars = local_vars
|
|
|
|
def do_uproot(self, arg):
|
|
"""Start up an in-program command line to use root tools with python,
|
|
look up 'uproot' for more details"""
|
|
self.run_command_line()
|
|
|
|
if __name__ == "__main__":
|
|
MyInteractiveApp().cmdloop()
|