Coverage for src/km3flux/flux.py: 85%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

125 statements  

1"""Assorted Fluxes, in (m^2 sec sr GeV)^-1""" 

2 

3import gzip 

4import logging 

5 

6import numpy as np 

7import scipy.interpolate 

8from scipy.integrate import romberg, simps 

9from scipy.interpolate import splrep, splev, RectBivariateSpline 

10 

11from km3flux.data import basepath 

12 

13 

14logger = logging.getLogger(__name__) 

15 

16 

17class BaseFlux(object): 

18 """Base class for fluxes. 

19 

20 Methods 

21 ======= 

22 __call__(energy, zenith=None) 

23 Return the flux on energy, optionally on zenith. 

24 integrate(zenith=None, emin=1, emax=100, **integargs) 

25 Integrate the flux via romberg integration. 

26 integrate_samples(energy, zenith=None, emin=1, emax=100) 

27 Integrate the flux from given samples, via simpson integration. 

28 

29 Example 

30 ======= 

31 >>> zen = np.linspace(0, np.pi, 5) 

32 >>> ene = np.logspace(0, 2, 5) 

33 

34 >>> from km3flux.flux import MyFlux 

35 >>> flux = MyFlux(flavor='nu_mu') 

36 

37 >>> flux(ene) 

38 array([6.68440000e+01, 1.83370000e+01, 4.96390000e+00, 

39 1.61780000e+00, 5.05350000e-01,]) 

40 >>> flux(ene, zen) 

41 array([2.29920000e-01, 2.34160000e-02, 2.99460000e-03, 

42 3.77690000e-04, 6.87310000e-05]) 

43 """ 

44 

45 def __init__(self, **kwargs): 

46 pass 

47 

48 def __call__(self, energy, zenith=None, interpolate=True): 

49 logger.debug("Interpolate? {}".format(interpolate)) 

50 energy = np.atleast_1d(energy) 

51 logger.debug("Entering __call__...") 

52 if zenith is None: 

53 logger.debug("Zenith is none, using averaged table...") 

54 return self._averaged(energy, interpolate=interpolate) 

55 zenith = np.atleast_1d(zenith) 

56 if len(zenith) != len(energy): 

57 raise ValueError("Zenith and energy need to have the same length.") 

58 logger.debug("Zenith available, using angle-dependent table...") 

59 return self._with_zenith(energy=energy, zenith=zenith, interpolate=interpolate) 

60 

61 def _averaged(self, energy, interpolate=True): 

62 logger.debug("Interpolate? {}".format(interpolate)) 

63 raise NotImplementedError 

64 

65 def _with_zenith(self, energy, zenith, interpolate=True): 

66 logger.debug("Interpolate? {}".format(interpolate)) 

67 raise NotImplementedError 

68 

69 def integrate(self, zenith=None, emin=1, emax=100, interpolate=True, **integargs): 

70 logger.debug("Interpolate? {}".format(interpolate)) 

71 return romberg(self, emin, emax, vec_func=True, **integargs) 

72 

73 def integrate_samples( 

74 self, energy, zenith=None, emin=1, emax=100, interpolate=True, **integargs 

75 ): 

76 logger.debug("Interpolate? {}".format(interpolate)) 

77 energy = np.atleast_1d(energy) 

78 mask = (emin <= energy) & (energy <= emax) 

79 energy = energy[mask] 

80 if zenith: 

81 logger.debug("Zenith available, using angle-dependent table...") 

82 zenith = np.atleast_1d(zenith) 

83 zenith = zenith[mask] 

84 flux = self(energy, zenith=zenith, interpolate=interpolate) 

85 return simps(flux, energy, **integargs) 

86 

87 

88class PowerlawFlux(BaseFlux): 

89 """E^-gamma flux.""" 

90 

91 def __init__(self, gamma=2, scale=1e-4): 

92 self.gamma = gamma 

93 self.scale = scale 

94 

95 def _averaged(self, energy, interpolate=True): 

96 return self.scale * np.power(energy, -1 * self.gamma) 

97 

98 def integrate(self, zenith=None, emin=1, emax=100, **integargs): 

99 """Compute analytic integral instead of numeric one.""" 

100 if np.around(self.gamma, decimals=1) == 1.0: 

101 return np.log(emax) - np.log(emin) 

102 num = np.power(emax, 1 - self.gamma) - np.power(emin, 1 - self.gamma) 

103 den = 1.0 - self.gamma 

104 return self.scale * (num / den) 

105 

106 

107class IsotropicFlux: 

108 def __init__(self, data, flavors): 

109 self._data = data 

110 self._flavors = flavors 

111 for flavor in flavors: 

112 flux = scipy.interpolate.InterpolatedUnivariateSpline( 

113 data.energy, data[flavor] 

114 ) 

115 setattr(self, flavor, flux) 

116 

117 def __getitem__(self, flavor): 

118 if flavor in self._flavors: 

119 return getattr(self, flavor) 

120 raise KeyError( 

121 f"Flavor '{flavor}' not present in data. " 

122 "Available flavors: {', '.join(self._flavors)}" 

123 ) 

124 

125 @classmethod 

126 def from_hondafile(cls, filepath): 

127 print(filepath) 

128 with gzip.open(filepath, "r") as fobj: 

129 flavors = ["numu", "anumu", "nue", "anue"] 

130 data = np.recfromcsv( 

131 fobj, 

132 names=["energy"] + flavors, 

133 skip_header=2, 

134 delimiter=" ", 

135 ) 

136 return cls(data, flavors) 

137 

138 

139class Honda: 

140 _experiments = { 

141 "Frejus": "frj", 

142 "Gran Sasso": "grn", 

143 "Homestake": "hms", 

144 "INO": "ino", 

145 "JUNO": "juno", 

146 "Kamioka": "kam", 

147 "Pythasalmi": "pyh", 

148 "Soudan Mine": "sdn", 

149 "South Pole": "spl", 

150 "Sudbury": "sno", 

151 } 

152 _datapath = basepath / "honda" 

153 

154 # def __init__(self): 

155 # self._years = [p.name for p in self._datapath.glob("????") if p.is_dir()] 

156 

157 def flux( 

158 self, year, experiment, solar="min", mountain=False, season=None, averaged=None 

159 ): 

160 """ 

161 Return the flux for a given year and experiment. 

162 

163 Parameters 

164 ---------- 

165 year : int 

166 The year of the publication. 

167 experiment : str 

168 The experiment name, can be one of the following: 

169 "Kamioka", "Gran Sasso", "Sudbury", "Frejus", "INO", "South Pole", 

170 "Pythasalmi", "Homestake", "JUNO" 

171 solar : str (optional) 

172 The solar parameter, can be "min" or "max". Default is "min" for Solar 

173 minimum. 

174 mountain : bool (optional) 

175 With or without mountain over the detector. Default is "False" => without. 

176 season : None or (int, int) (optional) 

177 The season of interest. If `None`, the dataset for the full period is taken. 

178 If a tuple is provided, the first entry is the starting and the last the 

179 ending month. Notice that the corresponding dataset might not be available. 

180 averaged : None or str (optional) 

181 The type of averaging. Default is `None`. Also available are "all" for all 

182 direction averaging and "azimuth" for azimuth averaging only. 

183 """ 

184 filepath = self._filepath_for( 

185 year, experiment, solar, mountain, season, averaged 

186 ) 

187 if not filepath.exists(): 

188 raise FileNotFoundError( 

189 f"The requested data file {filepath} could not be found in the archive. " 

190 "Try running `km3flux update` (see `km3flux -h` for more information) and " 

191 "also make sure the requested combination of parameters is available." 

192 ) 

193 

194 if averaged == "all": 

195 return IsotropicFlux.from_hondafile(filepath) 

196 

197 def _filepath_for(self, year, experiment, solar, mountain, season, averaged): 

198 """Generate the filename and path according to the naming conventions of Honda 

199 

200 Does some sanity checks too. 

201 """ 

202 exp = self.experiment_abbr(experiment) 

203 filename = exp 

204 

205 if year >= 2014: # seasonal data was added in 2014 

206 if season is None: 

207 filename += "-ally" 

208 else: 

209 filename += f"-{season[0]:02d}{season[1]:02d}" 

210 

211 if averaged is None: 

212 filename += "-20-12" 

213 elif averaged == "azimuth": 

214 filename += "-20-01" 

215 elif averaged == "all": 

216 filename += "-01-01" 

217 else: 

218 raise ValueError( 

219 f"Unsupported averageing '{averaged}', " 

220 "please use `None`, 'all', or 'azimuth'." 

221 ) 

222 

223 if mountain: 

224 filename += "-mtn" 

225 

226 if solar in ("min", "max"): 

227 filename += f"-sol{solar}" 

228 else: 

229 raise ValueError( 

230 f"Unsupported solar parameter '{solar}' " 

231 "please use either 'min' or 'max'." 

232 ) 

233 

234 if year <= 2011: 

235 if season: 

236 raise ValueError( 

237 "No seasonal tables available for year 2011 and earlier." 

238 ) 

239 

240 if mountain: 

241 if averaged == "all": 

242 raise ValueError( 

243 "No published mountain data for all directions averaged." 

244 ) 

245 filename += "-mountain" 

246 

247 if averaged is None: 

248 filename += "" 

249 elif averaged == "azimuth": 

250 filename += "-aa" 

251 elif averaged == "all": 

252 filename += "-alldir" 

253 else: 

254 raise ValueError( 

255 f"Unsupported averageing '{averaged}', " 

256 "please use `None`, 'all', or 'azimuth'." 

257 ) 

258 

259 filename += ".d.gz" 

260 filepath = self._datapath / str(year) / filename 

261 return filepath 

262 

263 @property 

264 def experiments(self): 

265 """Return a list of supported experiments.""" 

266 return sorted(list(self._experiments.keys())) 

267 

268 def experiment_abbr(self, experiment): 

269 """Return the abbreviation used in filenames for a given experiment.""" 

270 try: 

271 return self._experiments[experiment] 

272 except KeyError: 

273 experiments = ", ".join(self.experiments) 

274 raise KeyError( 

275 f"The '{experiment}' is not in the list of available experiments: {experiments}" 

276 )