Coverage for src/km3flux/flux.py: 85%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

190 statements  

1"""Assorted Fluxes, in (m^2 sec sr GeV)^-1""" 

2 

3import gzip 

4import logging 

5import io 

6import re 

7 

8import numpy as np 

9import numpy.lib.recfunctions as rfn 

10 

11import scipy.interpolate 

12from scipy.integrate import romberg, simps 

13from scipy.interpolate import splrep, splev, RectBivariateSpline 

14 

15from km3flux.data import basepath 

16 

17 

18logger = logging.getLogger(__name__) 

19 

20 

21class BaseFlux(object): 

22 """Base class for fluxes. 

23 

24 Methods 

25 ======= 

26 __call__(energy, zenith=None) 

27 Return the flux on energy, optionally on zenith. 

28 integrate(zenith=None, emin=1, emax=100, **integargs) 

29 Integrate the flux via romberg integration. 

30 integrate_samples(energy, zenith=None, emin=1, emax=100) 

31 Integrate the flux from given samples, via simpson integration. 

32 

33 Example 

34 ======= 

35 >>> zen = np.linspace(0, np.pi, 5) 

36 >>> ene = np.logspace(0, 2, 5) 

37 

38 >>> from km3flux.flux import MyFlux 

39 >>> flux = MyFlux(flavor='nu_mu') 

40 

41 >>> flux(ene) 

42 array([6.68440000e+01, 1.83370000e+01, 4.96390000e+00, 

43 1.61780000e+00, 5.05350000e-01,]) 

44 >>> flux(ene, zen) 

45 array([2.29920000e-01, 2.34160000e-02, 2.99460000e-03, 

46 3.77690000e-04, 6.87310000e-05]) 

47 """ 

48 

49 def __init__(self, **kwargs): 

50 pass 

51 

52 def __call__(self, energy, zenith=None, interpolate=True): 

53 logger.debug("Interpolate? {}".format(interpolate)) 

54 energy = np.atleast_1d(energy) 

55 logger.debug("Entering __call__...") 

56 if zenith is None: 

57 logger.debug("Zenith is none, using averaged table...") 

58 return self._averaged(energy, interpolate=interpolate) 

59 zenith = np.atleast_1d(zenith) 

60 if len(zenith) != len(energy): 

61 raise ValueError("Zenith and energy need to have the same length.") 

62 logger.debug("Zenith available, using angle-dependent table...") 

63 return self._with_zenith(energy=energy, zenith=zenith, interpolate=interpolate) 

64 

65 def _averaged(self, energy, interpolate=True): 

66 logger.debug("Interpolate? {}".format(interpolate)) 

67 raise NotImplementedError 

68 

69 def _with_zenith(self, energy, zenith, interpolate=True): 

70 logger.debug("Interpolate? {}".format(interpolate)) 

71 raise NotImplementedError 

72 

73 def integrate(self, zenith=None, emin=1, emax=100, interpolate=True, **integargs): 

74 logger.debug("Interpolate? {}".format(interpolate)) 

75 return romberg(self, emin, emax, vec_func=True, **integargs) 

76 

77 def integrate_samples( 

78 self, energy, zenith=None, emin=1, emax=100, interpolate=True, **integargs 

79 ): 

80 logger.debug("Interpolate? {}".format(interpolate)) 

81 energy = np.atleast_1d(energy) 

82 mask = (emin <= energy) & (energy <= emax) 

83 energy = energy[mask] 

84 if zenith: 

85 logger.debug("Zenith available, using angle-dependent table...") 

86 zenith = np.atleast_1d(zenith) 

87 zenith = zenith[mask] 

88 flux = self(energy, zenith=zenith, interpolate=interpolate) 

89 return simps(flux, energy, **integargs) 

90 

91 

92class PowerlawFlux(BaseFlux): 

93 """E^-gamma flux.""" 

94 

95 def __init__(self, gamma=2, scale=1e-4): 

96 self.gamma = gamma 

97 self.scale = scale 

98 

99 def _averaged(self, energy, interpolate=True): 

100 return self.scale * np.power(energy, -1 * self.gamma) 

101 

102 def integrate(self, zenith=None, emin=1, emax=100, **integargs): 

103 """Compute analytic integral instead of numeric one.""" 

104 if np.around(self.gamma, decimals=1) == 1.0: 

105 return np.log(emax) - np.log(emin) 

106 num = np.power(emax, 1 - self.gamma) - np.power(emin, 1 - self.gamma) 

107 den = 1.0 - self.gamma 

108 return self.scale * (num / den) 

109 

110 

111class IsotropicFlux: 

112 def __init__(self, data, flavors): 

113 self._data = data 

114 self._flavors = flavors 

115 for flavor in flavors: 

116 flux = scipy.interpolate.InterpolatedUnivariateSpline( 

117 data.energy, data[flavor] 

118 ) 

119 setattr(self, flavor, flux) 

120 

121 def __getitem__(self, flavor): 

122 if flavor in self._flavors: 

123 return getattr(self, flavor) 

124 raise KeyError( 

125 f"Flavor '{flavor}' not present in data. " 

126 "Available flavors: {', '.join(self._flavors)}" 

127 ) 

128 

129 

130class HondaFlux: 

131 """Base class for Honda fluxes 

132 

133 Methods 

134 ======= 

135 make_regular_grid(axes_keys, flavor) 

136 Create a n_dim grid based on data. 

137 interpolation_method(axes_keys, flavor) 

138 Select the interpolation method. 

139 parse_categories(f) 

140 Integrate the flux from given samples, via simpson integration. 

141 """ 

142 

143 def __init__(self, data, flavors): 

144 

145 # Add cosz and phi_az bin center 

146 data = rfn.append_fields( 

147 data, "cosz_mean", (data.cosz_min + data.cosz_max) / 2.0, dtypes=float 

148 ).view(np.recarray) 

149 data = rfn.append_fields( 

150 data, "phi_az_mean", (data.phi_az_min + data.phi_az_max) / 2.0, dtypes=float 

151 ).view(np.recarray) 

152 

153 self._flavors = flavors 

154 self._data = data 

155 self._axes = ["energy"] 

156 

157 # Check number of input dimensions 

158 if len(np.unique(data.cosz_mean)) > 1: 

159 self._axes.append("cosz_mean") 

160 if len(np.unique(data.phi_az_mean)) > 1: 

161 self._axes.append("phi_az_mean") 

162 

163 self._n_dim = len(self._axes) 

164 

165 # Set the interpolation method accordingly 

166 for flavor in flavors: 

167 flux = self.interpolation_method(self._axes, flavor) 

168 setattr(self, flavor, flux) 

169 

170 def make_regular_grid(self, axes_keys, flavor): 

171 """ 

172 Create a n_dim grid based on data. 

173 

174 Parameters 

175 ---------- 

176 axes_keys : list of str 

177 axes to be used, define dimensions order 

178 flavor : str 

179 column to use to fill the grid 

180 """ 

181 axes = [] 

182 dim = [] 

183 

184 for key in axes_keys: 

185 r = np.sort(np.unique(self._data[key])) 

186 axes.append(r) 

187 dim.append(len(r)) 

188 

189 self._data.sort(order=axes_keys) 

190 grid = np.reshape(self._data[flavor], np.array(dim)) 

191 return grid, axes 

192 

193 def interpolation_method(self, axes_keys, flavor): 

194 """ 

195 Select the interpolation method. 

196 

197 For 1D interpolation: 

198 - InterpolatedUnivariateSpline 

199 For 2D interpolation: 

200 - RectBivariateSpline 

201 For >= 3D interpolation: 

202 - RegularGridInterpolator 

203 

204 Parameters 

205 ---------- 

206 axes_keys : list of str 

207 axes to be used, define dimensions order 

208 flavor : str 

209 column to use to fill the grid 

210 """ 

211 if len(axes_keys) == 1: 

212 return scipy.interpolate.InterpolatedUnivariateSpline( 

213 self._data.energy, self._data[flavor] 

214 ) 

215 

216 elif len(axes_keys) == 2: 

217 grid, axes = self.make_regular_grid(axes_keys, flavor) 

218 return scipy.interpolate.RectBivariateSpline(*axes, grid).ev 

219 

220 else: 

221 grid, axes = self.make_regular_grid(axes_keys, flavor) 

222 flux = scipy.interpolate.RegularGridInterpolator( 

223 axes, grid, bounds_error=False, fill_value=None 

224 ) 

225 

226 def columnar_interface(*args): 

227 return flux(np.stack(args).T) 

228 

229 return columnar_interface 

230 

231 def __getitem__(self, flavor): 

232 if flavor in self._flavors: 

233 return getattr(self, flavor) 

234 raise KeyError( 

235 f"Flavor '{flavor}' not present in data. " 

236 "Available flavors: {', '.join(self._flavors)}" 

237 ) 

238 

239 @classmethod 

240 def from_hondafile(cls, filepath): 

241 

242 with gzip.open(filepath, "r") as fobj: 

243 flavors = ["numu", "anumu", "nue", "anue"] 

244 

245 cats = cls.parse_categories(cls, fobj) 

246 

247 data = [] 

248 for header, content in cats: 

249 

250 # Split the header to get cosZ and phi_Az ranges (4 numbers) 

251 header = " ".join(re.findall(r"[-+]?(?:\d*\.\d+|\d+)", header)) 

252 header_cols = ["cosz_min", "cosz_max", "phi_az_min", "phi_az_max"] 

253 

254 # Create a dummy file object from sub range content + 

255 # additional columns for the cos Zenith and phi 

256 # Azimuth 

257 f = io.StringIO(header.join([""] + content)) 

258 

259 # Create a recarray from the file 

260 data_tmp = np.recfromcsv( 

261 f, 

262 names=header_cols + ["energy"] + flavors, 

263 skip_header=2, 

264 delimiter=" ", 

265 ) 

266 data.append(data_tmp) 

267 

268 # Merge invidual rec arrays to one 

269 data = rfn.stack_arrays(data, asrecarray=True, usemask=False) 

270 

271 return cls(data, flavors) 

272 

273 def parse_categories(self, f): 

274 """ 

275 Select the interpolation method. 

276 

277 Parameters 

278 ---------- 

279 f : file object 

280 Honda file to be parsed 

281 """ 

282 cats = [] 

283 last_cat_start = -1 

284 last_cat_header = "" 

285 lines = [] 

286 for i, line in enumerate(f): 

287 line = line.decode("ascii") 

288 if bool(line.count("average flux in")): 

289 if len(lines) != 0: 

290 cats.append((last_cat_header, lines)) 

291 lines = [] 

292 last_cat_header = line 

293 lines.append(line) 

294 cats.append((last_cat_header, lines)) 

295 f.seek(0) 

296 return cats 

297 

298 

299class Honda: 

300 _experiments = { 

301 "Frejus": "frj", 

302 "Gran Sasso": "grn", 

303 "Homestake": "hms", 

304 "INO": "ino", 

305 "JUNO": "juno", 

306 "Kamioka": "kam", 

307 "Pythasalmi": "pyh", 

308 "Soudan Mine": "sdn", 

309 "South Pole": "spl", 

310 "Sudbury": "sno", 

311 } 

312 _datapath = basepath / "honda" 

313 

314 # def __init__(self): 

315 # self._years = [p.name for p in self._datapath.glob("????") if p.is_dir()] 

316 

317 def flux( 

318 self, year, experiment, solar="min", mountain=False, season=None, averaged=None 

319 ): 

320 """ 

321 Return the flux for a given year and experiment. 

322 

323 Parameters 

324 ---------- 

325 year : int 

326 The year of the publication. 

327 experiment : str 

328 The experiment name, can be one of the following: 

329 "Kamioka", "Gran Sasso", "Sudbury", "Frejus", "INO", "South Pole", 

330 "Pythasalmi", "Homestake", "JUNO" 

331 solar : str (optional) 

332 The solar parameter, can be "min" or "max". Default is "min" for Solar 

333 minimum. 

334 mountain : bool (optional) 

335 With or without mountain over the detector. Default is "False" => without. 

336 season : None or (int, int) (optional) 

337 The season of interest. If `None`, the dataset for the full period is taken. 

338 If a tuple is provided, the first entry is the starting and the last the 

339 ending month. Notice that the corresponding dataset might not be available. 

340 averaged : None or str (optional) 

341 The type of averaging. Default is `None`. Also available are "all" for all 

342 direction averaging and "azimuth" for azimuth averaging only. 

343 """ 

344 filepath = self._filepath_for( 

345 year, experiment, solar, mountain, season, averaged 

346 ) 

347 if not filepath.exists(): 

348 raise FileNotFoundError( 

349 f"The requested data file {filepath} could not be found in the archive. " 

350 "Try running `km3flux update` (see `km3flux -h` for more information) and " 

351 "also make sure the requested combination of parameters is available." 

352 ) 

353 

354 return HondaFlux.from_hondafile(filepath) 

355 

356 def _filepath_for(self, year, experiment, solar, mountain, season, averaged): 

357 """Generate the filename and path according to the naming conventions of Honda 

358 

359 Does some sanity checks too. 

360 """ 

361 exp = self.experiment_abbr(experiment) 

362 filename = exp 

363 

364 if year >= 2014: # seasonal data was added in 2014 

365 if season is None: 

366 filename += "-ally" 

367 else: 

368 filename += f"-{season[0]:02d}{season[1]:02d}" 

369 

370 if averaged is None: 

371 filename += "-20-12" 

372 elif averaged == "azimuth": 

373 filename += "-20-01" 

374 elif averaged == "all": 

375 filename += "-01-01" 

376 else: 

377 raise ValueError( 

378 f"Unsupported averageing '{averaged}', " 

379 "please use `None`, 'all', or 'azimuth'." 

380 ) 

381 

382 if mountain: 

383 filename += "-mtn" 

384 

385 if solar in ("min", "max"): 

386 filename += f"-sol{solar}" 

387 else: 

388 raise ValueError( 

389 f"Unsupported solar parameter '{solar}' " 

390 "please use either 'min' or 'max'." 

391 ) 

392 

393 if year <= 2011: 

394 if season: 

395 raise ValueError( 

396 "No seasonal tables available for year 2011 and earlier." 

397 ) 

398 

399 if mountain: 

400 if averaged == "all": 

401 raise ValueError( 

402 "No published mountain data for all directions averaged." 

403 ) 

404 filename += "-mountain" 

405 

406 if averaged is None: 

407 filename += "" 

408 elif averaged == "azimuth": 

409 filename += "-aa" 

410 elif averaged == "all": 

411 filename += "-alldir" 

412 else: 

413 raise ValueError( 

414 f"Unsupported averageing '{averaged}', " 

415 "please use `None`, 'all', or 'azimuth'." 

416 ) 

417 

418 filename += ".d.gz" 

419 filepath = self._datapath / str(year) / filename 

420 return filepath 

421 

422 @property 

423 def experiments(self): 

424 """Return a list of supported experiments.""" 

425 return sorted(list(self._experiments.keys())) 

426 

427 def experiment_abbr(self, experiment): 

428 """Return the abbreviation used in filenames for a given experiment.""" 

429 try: 

430 return self._experiments[experiment] 

431 except KeyError: 

432 experiments = ", ".join(self.experiments) 

433 raise KeyError( 

434 f"The '{experiment}' is not in the list of available experiments: {experiments}" 

435 )