Coverage for src/km3flux/flux.py: 85%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Assorted Fluxes, in (m^2 sec sr GeV)^-1"""
3import gzip
4import logging
5import io
6import re
8import numpy as np
9import numpy.lib.recfunctions as rfn
11import scipy.interpolate
12from scipy.integrate import romberg, simps
13from scipy.interpolate import splrep, splev, RectBivariateSpline
15from km3flux.data import basepath
18logger = logging.getLogger(__name__)
21class BaseFlux(object):
22 """Base class for fluxes.
24 Methods
25 =======
26 __call__(energy, zenith=None)
27 Return the flux on energy, optionally on zenith.
28 integrate(zenith=None, emin=1, emax=100, **integargs)
29 Integrate the flux via romberg integration.
30 integrate_samples(energy, zenith=None, emin=1, emax=100)
31 Integrate the flux from given samples, via simpson integration.
33 Example
34 =======
35 >>> zen = np.linspace(0, np.pi, 5)
36 >>> ene = np.logspace(0, 2, 5)
38 >>> from km3flux.flux import MyFlux
39 >>> flux = MyFlux(flavor='nu_mu')
41 >>> flux(ene)
42 array([6.68440000e+01, 1.83370000e+01, 4.96390000e+00,
43 1.61780000e+00, 5.05350000e-01,])
44 >>> flux(ene, zen)
45 array([2.29920000e-01, 2.34160000e-02, 2.99460000e-03,
46 3.77690000e-04, 6.87310000e-05])
47 """
49 def __init__(self, **kwargs):
50 pass
52 def __call__(self, energy, zenith=None, interpolate=True):
53 logger.debug("Interpolate? {}".format(interpolate))
54 energy = np.atleast_1d(energy)
55 logger.debug("Entering __call__...")
56 if zenith is None:
57 logger.debug("Zenith is none, using averaged table...")
58 return self._averaged(energy, interpolate=interpolate)
59 zenith = np.atleast_1d(zenith)
60 if len(zenith) != len(energy):
61 raise ValueError("Zenith and energy need to have the same length.")
62 logger.debug("Zenith available, using angle-dependent table...")
63 return self._with_zenith(energy=energy, zenith=zenith, interpolate=interpolate)
65 def _averaged(self, energy, interpolate=True):
66 logger.debug("Interpolate? {}".format(interpolate))
67 raise NotImplementedError
69 def _with_zenith(self, energy, zenith, interpolate=True):
70 logger.debug("Interpolate? {}".format(interpolate))
71 raise NotImplementedError
73 def integrate(self, zenith=None, emin=1, emax=100, interpolate=True, **integargs):
74 logger.debug("Interpolate? {}".format(interpolate))
75 return romberg(self, emin, emax, vec_func=True, **integargs)
77 def integrate_samples(
78 self, energy, zenith=None, emin=1, emax=100, interpolate=True, **integargs
79 ):
80 logger.debug("Interpolate? {}".format(interpolate))
81 energy = np.atleast_1d(energy)
82 mask = (emin <= energy) & (energy <= emax)
83 energy = energy[mask]
84 if zenith:
85 logger.debug("Zenith available, using angle-dependent table...")
86 zenith = np.atleast_1d(zenith)
87 zenith = zenith[mask]
88 flux = self(energy, zenith=zenith, interpolate=interpolate)
89 return simps(flux, energy, **integargs)
92class PowerlawFlux(BaseFlux):
93 """E^-gamma flux."""
95 def __init__(self, gamma=2, scale=1e-4):
96 self.gamma = gamma
97 self.scale = scale
99 def _averaged(self, energy, interpolate=True):
100 return self.scale * np.power(energy, -1 * self.gamma)
102 def integrate(self, zenith=None, emin=1, emax=100, **integargs):
103 """Compute analytic integral instead of numeric one."""
104 if np.around(self.gamma, decimals=1) == 1.0:
105 return np.log(emax) - np.log(emin)
106 num = np.power(emax, 1 - self.gamma) - np.power(emin, 1 - self.gamma)
107 den = 1.0 - self.gamma
108 return self.scale * (num / den)
111class IsotropicFlux:
112 def __init__(self, data, flavors):
113 self._data = data
114 self._flavors = flavors
115 for flavor in flavors:
116 flux = scipy.interpolate.InterpolatedUnivariateSpline(
117 data.energy, data[flavor]
118 )
119 setattr(self, flavor, flux)
121 def __getitem__(self, flavor):
122 if flavor in self._flavors:
123 return getattr(self, flavor)
124 raise KeyError(
125 f"Flavor '{flavor}' not present in data. "
126 "Available flavors: {', '.join(self._flavors)}"
127 )
130class HondaFlux:
131 """Base class for Honda fluxes
133 Methods
134 =======
135 make_regular_grid(axes_keys, flavor)
136 Create a n_dim grid based on data.
137 interpolation_method(axes_keys, flavor)
138 Select the interpolation method.
139 parse_categories(f)
140 Integrate the flux from given samples, via simpson integration.
141 """
143 def __init__(self, data, flavors):
145 # Add cosz and phi_az bin center
146 data = rfn.append_fields(
147 data, "cosz_mean", (data.cosz_min + data.cosz_max) / 2.0, dtypes=float
148 ).view(np.recarray)
149 data = rfn.append_fields(
150 data, "phi_az_mean", (data.phi_az_min + data.phi_az_max) / 2.0, dtypes=float
151 ).view(np.recarray)
153 self._flavors = flavors
154 self._data = data
155 self._axes = ["energy"]
157 # Check number of input dimensions
158 if len(np.unique(data.cosz_mean)) > 1:
159 self._axes.append("cosz_mean")
160 if len(np.unique(data.phi_az_mean)) > 1:
161 self._axes.append("phi_az_mean")
163 self._n_dim = len(self._axes)
165 # Set the interpolation method accordingly
166 for flavor in flavors:
167 flux = self.interpolation_method(self._axes, flavor)
168 setattr(self, flavor, flux)
170 def make_regular_grid(self, axes_keys, flavor):
171 """
172 Create a n_dim grid based on data.
174 Parameters
175 ----------
176 axes_keys : list of str
177 axes to be used, define dimensions order
178 flavor : str
179 column to use to fill the grid
180 """
181 axes = []
182 dim = []
184 for key in axes_keys:
185 r = np.sort(np.unique(self._data[key]))
186 axes.append(r)
187 dim.append(len(r))
189 self._data.sort(order=axes_keys)
190 grid = np.reshape(self._data[flavor], np.array(dim))
191 return grid, axes
193 def interpolation_method(self, axes_keys, flavor):
194 """
195 Select the interpolation method.
197 For 1D interpolation:
198 - InterpolatedUnivariateSpline
199 For 2D interpolation:
200 - RectBivariateSpline
201 For >= 3D interpolation:
202 - RegularGridInterpolator
204 Parameters
205 ----------
206 axes_keys : list of str
207 axes to be used, define dimensions order
208 flavor : str
209 column to use to fill the grid
210 """
211 if len(axes_keys) == 1:
212 return scipy.interpolate.InterpolatedUnivariateSpline(
213 self._data.energy, self._data[flavor]
214 )
216 elif len(axes_keys) == 2:
217 grid, axes = self.make_regular_grid(axes_keys, flavor)
218 return scipy.interpolate.RectBivariateSpline(*axes, grid).ev
220 else:
221 grid, axes = self.make_regular_grid(axes_keys, flavor)
222 flux = scipy.interpolate.RegularGridInterpolator(
223 axes, grid, bounds_error=False, fill_value=None
224 )
226 def columnar_interface(*args):
227 return flux(np.stack(args).T)
229 return columnar_interface
231 def __getitem__(self, flavor):
232 if flavor in self._flavors:
233 return getattr(self, flavor)
234 raise KeyError(
235 f"Flavor '{flavor}' not present in data. "
236 "Available flavors: {', '.join(self._flavors)}"
237 )
239 @classmethod
240 def from_hondafile(cls, filepath):
242 with gzip.open(filepath, "r") as fobj:
243 flavors = ["numu", "anumu", "nue", "anue"]
245 cats = cls.parse_categories(cls, fobj)
247 data = []
248 for header, content in cats:
250 # Split the header to get cosZ and phi_Az ranges (4 numbers)
251 header = " ".join(re.findall(r"[-+]?(?:\d*\.\d+|\d+)", header))
252 header_cols = ["cosz_min", "cosz_max", "phi_az_min", "phi_az_max"]
254 # Create a dummy file object from sub range content +
255 # additional columns for the cos Zenith and phi
256 # Azimuth
257 f = io.StringIO(header.join([""] + content))
259 # Create a recarray from the file
260 data_tmp = np.recfromcsv(
261 f,
262 names=header_cols + ["energy"] + flavors,
263 skip_header=2,
264 delimiter=" ",
265 )
266 data.append(data_tmp)
268 # Merge invidual rec arrays to one
269 data = rfn.stack_arrays(data, asrecarray=True, usemask=False)
271 return cls(data, flavors)
273 def parse_categories(self, f):
274 """
275 Select the interpolation method.
277 Parameters
278 ----------
279 f : file object
280 Honda file to be parsed
281 """
282 cats = []
283 last_cat_start = -1
284 last_cat_header = ""
285 lines = []
286 for i, line in enumerate(f):
287 line = line.decode("ascii")
288 if bool(line.count("average flux in")):
289 if len(lines) != 0:
290 cats.append((last_cat_header, lines))
291 lines = []
292 last_cat_header = line
293 lines.append(line)
294 cats.append((last_cat_header, lines))
295 f.seek(0)
296 return cats
299class Honda:
300 _experiments = {
301 "Frejus": "frj",
302 "Gran Sasso": "grn",
303 "Homestake": "hms",
304 "INO": "ino",
305 "JUNO": "juno",
306 "Kamioka": "kam",
307 "Pythasalmi": "pyh",
308 "Soudan Mine": "sdn",
309 "South Pole": "spl",
310 "Sudbury": "sno",
311 }
312 _datapath = basepath / "honda"
314 # def __init__(self):
315 # self._years = [p.name for p in self._datapath.glob("????") if p.is_dir()]
317 def flux(
318 self, year, experiment, solar="min", mountain=False, season=None, averaged=None
319 ):
320 """
321 Return the flux for a given year and experiment.
323 Parameters
324 ----------
325 year : int
326 The year of the publication.
327 experiment : str
328 The experiment name, can be one of the following:
329 "Kamioka", "Gran Sasso", "Sudbury", "Frejus", "INO", "South Pole",
330 "Pythasalmi", "Homestake", "JUNO"
331 solar : str (optional)
332 The solar parameter, can be "min" or "max". Default is "min" for Solar
333 minimum.
334 mountain : bool (optional)
335 With or without mountain over the detector. Default is "False" => without.
336 season : None or (int, int) (optional)
337 The season of interest. If `None`, the dataset for the full period is taken.
338 If a tuple is provided, the first entry is the starting and the last the
339 ending month. Notice that the corresponding dataset might not be available.
340 averaged : None or str (optional)
341 The type of averaging. Default is `None`. Also available are "all" for all
342 direction averaging and "azimuth" for azimuth averaging only.
343 """
344 filepath = self._filepath_for(
345 year, experiment, solar, mountain, season, averaged
346 )
347 if not filepath.exists():
348 raise FileNotFoundError(
349 f"The requested data file {filepath} could not be found in the archive. "
350 "Try running `km3flux update` (see `km3flux -h` for more information) and "
351 "also make sure the requested combination of parameters is available."
352 )
354 return HondaFlux.from_hondafile(filepath)
356 def _filepath_for(self, year, experiment, solar, mountain, season, averaged):
357 """Generate the filename and path according to the naming conventions of Honda
359 Does some sanity checks too.
360 """
361 exp = self.experiment_abbr(experiment)
362 filename = exp
364 if year >= 2014: # seasonal data was added in 2014
365 if season is None:
366 filename += "-ally"
367 else:
368 filename += f"-{season[0]:02d}{season[1]:02d}"
370 if averaged is None:
371 filename += "-20-12"
372 elif averaged == "azimuth":
373 filename += "-20-01"
374 elif averaged == "all":
375 filename += "-01-01"
376 else:
377 raise ValueError(
378 f"Unsupported averageing '{averaged}', "
379 "please use `None`, 'all', or 'azimuth'."
380 )
382 if mountain:
383 filename += "-mtn"
385 if solar in ("min", "max"):
386 filename += f"-sol{solar}"
387 else:
388 raise ValueError(
389 f"Unsupported solar parameter '{solar}' "
390 "please use either 'min' or 'max'."
391 )
393 if year <= 2011:
394 if season:
395 raise ValueError(
396 "No seasonal tables available for year 2011 and earlier."
397 )
399 if mountain:
400 if averaged == "all":
401 raise ValueError(
402 "No published mountain data for all directions averaged."
403 )
404 filename += "-mountain"
406 if averaged is None:
407 filename += ""
408 elif averaged == "azimuth":
409 filename += "-aa"
410 elif averaged == "all":
411 filename += "-alldir"
412 else:
413 raise ValueError(
414 f"Unsupported averageing '{averaged}', "
415 "please use `None`, 'all', or 'azimuth'."
416 )
418 filename += ".d.gz"
419 filepath = self._datapath / str(year) / filename
420 return filepath
422 @property
423 def experiments(self):
424 """Return a list of supported experiments."""
425 return sorted(list(self._experiments.keys()))
427 def experiment_abbr(self, experiment):
428 """Return the abbreviation used in filenames for a given experiment."""
429 try:
430 return self._experiments[experiment]
431 except KeyError:
432 experiments = ", ".join(self.experiments)
433 raise KeyError(
434 f"The '{experiment}' is not in the list of available experiments: {experiments}"
435 )