Source code for km3astro.plot

"""Plotting utilities.
"""
from astropy.units import degree

import numpy as np
import pandas as pd
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.lines as mlines
import healpy as hp
from datetime import timedelta
import warnings

import astropy.units as u
from astropy.coordinates import SkyCoord
from astropy.coordinates import ICRS, Galactic, FK4, FK5  # Low-level frames

from astropy.time import Time

from km3net_testdata import data_path

import km3astro.coord as kc
import km3astro.frame as kf
import km3astro.toolbox as kt
import km3astro.extras as ke
import tempfile
import os


[docs]def ra_dec(skycoord): """Take (ra, dec) from skycoord in matplotlib-firendly format. This wraps the ra because astropy's convention differs from matplotlib. """ ra = skycoord.ra.wrap_at(180 * degree).radian dec = skycoord.dec.radian return ra, dec
[docs]def projection_axes(projection="aitoff", **figargs): fig = plt.figure(**figargs) ax = fig.add_subplot(111, projection=projection) ax.grid(color="lightgrey") return fig, ax
[docs]def plot_equatorial( evts, projection="aitoff", ax=None, marker="o", markersize=4, alpha=0.8, adjust_subplots=True, **kwargs, ): ra, dec = ra_dec(evts) if ax is None: _, ax = projection_axes(projection=projection) ax.plot(ra, dec, marker, markersize=markersize, alpha=alpha, **kwargs) if adjust_subplots: plt.subplots_adjust(top=0.95, bottom=0.0) return ax
[docs]def get_coord_from_skycoord(SC, frame, detector): """Return the coordinate of a SkyCoord object for aitoff or mollweide projection Parameters ---------- SC : astropy.SkyCoord The sky coordinate. frame : str The frame of the coordinate, either "ParticleFrame" or "UTM" or "altaz" or "equatorial" or "galactic" detector : str [default = "antares"] Detector of the coordinate, either "orca", "arca" or "antares". Returns ------- (phi, theta)/(az, ze)/(alt, az)/(ra, dec)/(l, b) : (float, float) The set of coordinate. """ SC_copy = kc.transform_to(SC, frame, detector) if frame == "ParticleFrame": phi = SC_copy.phi.wrap_at(180 * u.deg).radian theta = SC_copy.theta.radian if type(theta) == np.float64: if theta > np.pi / 2: theta = theta - np.pi if type(theta) == np.ndarray: print(len(theta)) for idx, t in enumerate(theta): if t > np.pi / 2: theta[idx] = t - np.pi return phi, theta elif frame == "UTM": az = SC_copy.azimuth.wrap_at(180 * u.deg).radian ze = SC_copy.alt_utm.radian return az, ze elif frame == "altaz": alt = SC_copy.alt.radian az = SC_copy.az.wrap_at(180 * u.deg).radian return alt, az elif frame == "equatorial": ra = SC_copy.ra.wrap_at(180 * u.deg).radian dec = SC_copy.dec.radian return ra, dec elif frame == "galactic": l = SC_copy.l.wrap_at(180 * u.deg).radian b = SC_copy.b.radian return l, b else: raise ValueError("Error: Wrong Frame input frame") return None
[docs]def get_alert_color(alert_type): """Return the color for a specific alert_type Based on color list tab:Palette 10 """ Alert_color_dict = { "GRB": "tab:blue", "GW": "tab:orange", "Neutrino": "tab:green", "NuEM": "tab:red", "SK_SN": "tab:purple", "SNEWS": "tab:brown", "Transient": "tab:pink", } if alert_type in Alert_color_dict.keys(): return Alert_color_dict[alert_type] else: return "c"
[docs]def get_alert_marker(alert_type): """Return the marker style for a specific alert_type""" Alert_marker_dict = { "GRB": "o", "Neutrino": "^", "NuEM": "v", "SK_SN": "*", "SNEWS": "*", "Transient": "s", } if alert_type in Alert_marker_dict.keys(): return Alert_marker_dict[alert_type] else: return "."
[docs]def get_visibility_map(frame, detector): """Get the visibility map for a given frame ('equatorial' or 'galactic') and a given detector ('antares', 'orca' or 'arca').""" path = f"{os.path.dirname(os.path.abspath(__file__))}/data/visibility_map_{frame}_{detector}.npy" if os.path.isfile(path): visibility_map = np.load(path) else: nside = 128 npix = hp.nside2npix(nside) t0, t1 = Time("2022-01-01T00:00:00"), Time("2022-01-02T00:00:00") visibility_map = np.zeros(npix) lon, lat = hp.pix2ang(nside, range(npix), lonlat=True) if frame == "equatorial": coords = SkyCoord(ra=lon * u.deg, dec=lat * u.deg, frame="icrs") elif frame == "galactic": coords = SkyCoord(l=lon * u.deg, b=lat * u.deg, frame="galactic") t = t0 dt = u.Quantity("10 minute") ntimes = 0 while t < t1: frame = kc.local_frame(time=t, location=detector) coords_loc = coords.transform_to(frame) visibility_map[coords_loc.alt.deg > 0] += 1 t += dt ntimes += 1 visibility_map /= ntimes np.save(path, visibility_map) return visibility_map
[docs]def skymap_hpx( skymap_url: str = None, obstime: str = None, nside: int = 128, detector: str = "antares", outfile: str = None, **old_kwargs, ): """Method to plot a skymap from an FITS url. Parameters ---------- skymap_url : str URL of the FITS skymap. obstime : str Alert time in ISOT format. nside : int Resolution of the map to be drawn (higher = more precise but slower). detector : str The detector to use for horizon definition. Choices are antares, orca, and arca. outfile : str Path to the output file. If None, no file is written. **old_kwards: dict [TO BE DEPRECATED] Arguments used by the old function: (file0, save=False, path="", name="") Returns: Fig : file.png A png file of the skymap. """ # === SECTION TO BE DEPRECATED === # Handling of old plotting function if len(old_kwargs) > 0: warnings.warn( "The old 'skymap_hpx' method will be replaced, see documentation.", DeprecationWarning, ) file0 = old_kwargs.get("file0") save = old_kwargs.get("save", False) path = old_kwargs.get("path", "") name = old_kwargs.get("name", "") projection = "mollweide" fig, ax = projection_axes(projection=projection, figsize=[40 / 2.54, 30 / 2.54]) gw_map = hp.read_map(file0) hp.mollview(map=gw_map, fig=fig) hp.graticule() if save: if path != "": if name == "": name = os.path.join(path.name, f"skymap_hpx_test.png") else: name = os.path.join(path.name, f"{name}.png") else: path = tempfile.TemporaryDirectory() if name == "": name = "hpx_skymap_maker_test.png" plt.savefig(name) return fig # === END OF SECTION TO BE DEPRECATED === assert skymap_url is not None and obstime is not None io, postprocess = ke.ligoskymap() obstime = Time(obstime, format="isot") skymap = io.fits.read_sky_map(skymap_url, nest=False)[0] # downgrade a bit the resolution to make it quicker skymap = hp.ud_grade(skymap, nside) npix = hp.nside2npix(nside) # Prepare contours cls_skymap = 100 * postprocess.find_greedy_credible_levels(skymap / np.sum(skymap)) # Prepare horizon (darkens the half of the sky that is above horizon) ra, dec = hp.pix2ang(nside, range(npix), lonlat=True) coords = SkyCoord(ra=ra * u.deg, dec=dec * u.deg, frame="icrs") frame = kc.local_frame(time=obstime, location=detector) coords = coords.transform_to(frame) horizon = -1 * np.ones_like(skymap) horizon[np.sin(coords.alt.rad) > 0] = 1 # Prepare the figure fig = plt.figure(figsize=(9, 5.3)) ax = plt.axes([0.00, 0.15, 1, 0.78], projection="astro degrees mollweide") # Plot skymap and its contour ax.imshow_hpx(skymap, cmap=plt.get_cmap("Blues")) ax.contour_hpx( (cls_skymap, "ICRS"), colors=["blue", "blue"], linewidths=[1, 1], linestyles=[":", "-"], levels=(50, 90), ) # Plot horizon ax.imshow_hpx( horizon, cmap=matplotlib.colors.ListedColormap(["white", "grey"]), vmin=0.0, vmax=1.0, alpha=0.3, ) # Axis configuration ax.grid() ax.set_facecolor("none") for key in ["ra", "dec"]: ax.coords[key].set_auto_axislabel(False) # Draw custom legend handles = [] handles.append( matplotlib.lines.Line2D( [], [], color="blue", linewidth=1, linestyle=":", label="50% contour" ) ) handles.append( matplotlib.lines.Line2D( [], [], color="blue", linewidth=1, linestyle="-", label="90% contour" ) ) handles.append( matplotlib.patches.Patch( edgecolor="black", facecolor="grey", alpha=0.30, label=r"Region above horizon at $t_{alert}$", ) ) fig.legend(handles=handles, loc="upper center", bbox_to_anchor=(0.50, 0.10), ncol=3) fig.suptitle("Equatorial coordinates", fontsize=16) if outfile is not None: fig.savefig(outfile, dpi=300) return fig
[docs]def skymap_list( dataframe: pd.DataFrame = pd.DataFrame(), frame: str = "equatorial", frame_input: str = "equatorial", detector: str = "antares", outfile: str = None, **old_kwargs, ): """Method to plot a skymap from a list of alert in a csv file. Parameters ---------- dataframe = pd.DataFrame() The dataframe containing the list of alert. frame : str [default = "equatorial"] The frame of the skymap, either "equatorial" or "galactic". frame_input : str [default = "equatorial"] The frame of the input data, either "equatorial" or "galactic". detector : str [ default = "antares"] The detector to be used for eventual input and for horizon. outfile : str Path to the output file. If None, no file is written. **old_kwards: dict [TO BE DEPRECATED] Arguments used by the old function: (file0, plot_frame, detector_to, alt_cut, title, save, path, name) Returns: Fig : file.png A png file of the skymap. """ # === SECTION TO BE DEPRECATED === # Handling of old plotting function if len(old_kwargs) > 0: warnings.warn( "The old 'skymap_list' method will be replaced, see documentation.", DeprecationWarning, ) file0 = old_kwargs.get("file0", "") plot_frame = old_kwargs.get("plot_frame", "equatorial") detector_to = old_kwargs.get("detector_to", "antares") alt_cut = old_kwargs.get("alt_cut", 5.7) title = old_kwargs.get("title", "") save = old_kwargs.get("save", False) path = old_kwargs.get("path", "") name = old_kwargs.get("name", "") projection = "aitoff" fig, ax = projection_axes(projection=projection, figsize=[40 / 2.54, 30 / 2.54]) if file0 != "": table_read = pd.read_csv(file0, comment="#") table_skycoord = kt.build_skycoord_list(table_read, frame, detector) elif dataframe.empty == False: table_skycoord = kt.build_skycoord_list(dataframe, frame_input, detector) if "Alert_type" in dataframe.columns: extracted_column = dataframe["Alert_type"] table_skycoord = table_skycoord.join(extracted_column) else: file0 = data_path("astro/antares_coordinate_systems_benchmark.csv") table_skycoord = kt.build_skycoord_list(table_read, frame, detector) plot_pd_skycoord(table_skycoord, plot_frame, detector_to, projection, ax) plot_visibility(ax=ax, frame=plot_frame, detector=detector_to) if title == "": title = detector + " Alert List " + plot_frame + " frame skymap" ax.set_title(title, fontsize=20, y=1.1) xlabel, ylabel = get_label(plot_frame) plt.xlabel(xlabel) plt.ylabel(ylabel) if "Alert_type" in table_skycoord.columns: h = [] check_list = [] for it in table_skycoord["Alert_type"]: if it not in check_list: check_list.append(it) alert = mlines.Line2D( [], [], color=get_alert_color(it), marker=".", markersize=10, label=it, ) h.append(alert) plt.legend(handles=h, bbox_to_anchor=(1.1, 1.2), loc="upper right") if save: if path != "": if name == "": name = os.path.join( path.name, f"skymap_list_{detector}_test_{plot_frame}.png" ) else: name = os.path.join(path.name, f"{name}.png") else: path = tempfile.TemporaryDirectory() if name == "": name = "skymap_list_" + detector + "_test_" + plot_frame + ".png" plt.savefig(name) return fig # ===END OF SECTION TO BE DEPRECATED === if not dataframe.empty: detector = "antares" table_skycoord = kt.build_skycoord_list(dataframe, frame_input, detector) if "Alert_type" in dataframe.columns: extracted_column = dataframe["Alert_type"] table_skycoord = table_skycoord.join(extracted_column) table_skycoord["SkyCoord_base"] = table_skycoord["SkyCoord_base"].map( lambda x: kc.transform_to(x, frame, detector) ) else: table_read = pd.read_csv( data_path("astro/antares_coordinate_systems_benchmark.csv"), comment="#" ) table_skycoord = kt.build_skycoord_list(table_read, frame_input, detector) table_skycoord["SkyCoord_base"] = table_skycoord["SkyCoord_base"].map( lambda x: kc.transform_to(x, frame, detector) ) ke.ligoskymap() if frame not in ["galactic", "equatorial"]: raise RuntimeError(f"Unknown frame type {frame}") aframe = "icrs" if frame == "equatorial" else "galactic" # Prepare figure fig = plt.figure(figsize=(9, 5.3)) ax = plt.axes( [0.00, 0.15, 1, 0.78], projection="%s degrees mollweide" % ("astro" if frame == "equatorial" else "geo"), ) # Plot point-source for _, alert in table_skycoord.iterrows(): if frame == "equatorial": ax.scatter( alert["SkyCoord_base"].ra.deg, alert["SkyCoord_base"].dec.deg, s=100, marker=get_alert_marker(alert.get("Alert_type", ".")), color=get_alert_color(alert.get("Alert_type", "black")), transform=ax.get_transform("icrs"), ) elif frame == "galactic": ax.scatter( alert["SkyCoord_base"].l.deg, alert["SkyCoord_base"].b.deg, s=50, marker=get_alert_marker(alert.get("Alert_type", ".")), color=get_alert_color(alert.get("Alert_type", "black")), transform=ax.get_transform("itrs"), ) # Axis configuration ax.grid() ax.set_facecolor("none") for key in ["ra", "dec", "longitude", "latitude"]: if key in ax.coords: ax.coords[key].set_auto_axislabel(False) # Draw custom legend if "Alert_type" in table_skycoord: handles = [] for alert_type in np.unique(table_skycoord["Alert_type"]): handles.append( matplotlib.lines.Line2D( [], [], color=get_alert_color(alert_type), marker=get_alert_marker(alert_type), markersize=10, linewidth=0, label=alert_type, ) ) fig.legend( handles=handles, loc="upper center", bbox_to_anchor=(0.50, 0.13), ncol=4 ) fig.suptitle(frame.capitalize() + " coordinates", fontsize=16) if outfile is not None: fig.savefig(outfile, dpi=300) return fig
[docs]def skymap_alert( ra: float = None, dec: float = None, obstime: str = None, error_radius: float = None, frame: str = "equatorial", detector: str = "antares", outfile: str = None, **old_kwargs, ): """Method to plot a skymap from an alert in a csv file or by giving RA, DEC and obstime. Parameters ---------- ra, dec : (float,float) The ra and dec coordinate of the alert. obstime : str The observation time of the alert. Format is "YYYY-MM-DDTHH:MM:SS" error_radius : float [default = None] The radius of the error circle around the alert coordinate. frame : str [default = "equatorial"] The frame of the map. detector : str [default = "antares"] The detector to use for horizon definition and coordinate transformation. Choices are antares, orca, and arca. outfile : str Path to the output file. If None, no file is written. **old_kwards: dict [TO BE DEPRECATED] Arguments used by the old function: (file0, plot_frame, detector_to, alt_cut, title, save, path, name) Returns: Fig : file.png A png file of the skymap. """ # === SECTION TO BE DEPRECATED === # Handling of old plotting function if len(old_kwargs) > 0: warnings.warn( "The old 'skymap_alert' method will be replaced, see documentation.", DeprecationWarning, ) file0 = old_kwargs.get("file0", "") plot_frame = old_kwargs.get("plot_frame", "equatorial") detector_to = old_kwargs.get("detector_to", "antares") alt_cut = old_kwargs.get("alt_cut", 5.7) title = old_kwargs.get("title", "") save = old_kwargs.get("save", False) path = old_kwargs.get("path", "") name = old_kwargs.get("name", "") use_coord = False use_file = False if file0 != "": use_file = True if ra != None and dec != None and obstime != None: use_coord = True if use_coord == False and use_file == False: file0 = data_path("astro/antares_moon_sun_position_benchmark.csv") ra = 80 dec = 15 obstime = "2022-06-15T03:03:03" use_coord = True use_file = True projection = "aitoff" fig, ax = projection_axes(projection=projection, figsize=[40 / 2.54, 30 / 2.54]) if use_file == True: table_read = pd.read_csv(file0, comment="#") table_skycoord = kt.build_skycoord_list(table_read, frame, detector) obstime = get_obstime(table_skycoord) plot_file(file0, ax, projection, frame, detector, plot_frame, detector_to) if use_coord == True: alert = SkyCoord( ra=ra * u.degree, dec=dec * u.degree, obstime=obstime, frame="icrs" ) if error_radius is not None: theta = np.linspace(0, 2 * np.pi, 360) ra = ra + error_radius * np.cos(theta) dec = np.fmax(-90, np.fmin(90, dec + error_radius * np.sin(theta))) error = SkyCoord( ra=ra * u.degree, dec=dec * u.degree, obstime=obstime, frame="icrs" ) plot_SkyCoord( error, frame=plot_frame, detector=detector_to, projection=projection, ax=ax, marker=".", markersize=1, color="royalblue", ) plot_SkyCoord( alert, frame=plot_frame, detector=detector_to, projection=projection, ax=ax, marker=".", markersize=10, linewidth=0, color="royalblue", label=get_Sky_label(alert, plot_frame, detector_to, time=False), ) horizon = get_horizon( detector=detector_to, time=obstime, frame=plot_frame, alt_cut=alt_cut ) plot_SkyCoord( horizon, frame=plot_frame, detector=detector_to, projection=projection, ax=ax, marker=".", markersize=10, linewidth=0, color="darkgreen", ) gal_plan = get_galactic_plan( detector=detector_to, time=obstime, frame=plot_frame ) plot_SkyCoord( gal_plan, frame=plot_frame, detector=detector_to, projection=projection, ax=ax, marker=".", markersize=10, linewidth=0, color="darkred", ) date, time = str(obstime).split("T", 1) if title == "": title = ( detector + " Alert " + plot_frame + " frame skymap " + date + " " + time ) ax.set_title(title, fontsize=20, y=1.1) xlabel, ylabel = get_label(plot_frame) plt.xlabel(xlabel) plt.ylabel(ylabel) horizon_line = mlines.Line2D( [], [], color="lime", marker=".", markersize=1, label="Horizon" ) gal_line = mlines.Line2D( [], [], color="red", marker=".", markersize=1, label="Galactic plan" ) h, l = ax.get_legend_handles_labels() h.append(horizon_line) h.append(gal_line) plt.legend(handles=h, bbox_to_anchor=(1.1, 1.2), loc="upper right") if save: if path != "": if name == "": name = os.path.join( path.name, f"skymap_alert_{detector}_test_{plot_frame}.png" ) else: name = os.path.join(path.name, f"{name}.png") else: path = tempfile.TemporaryDirectory() if name == "": name = "skymap_alert_" + detector + "_test_" + plot_frame + ".png" plt.savefig(name) return fig # ===END OF SECTION TO BE DEPRECATED === assert ra is not None and dec is not None and obstime is not None ke.ligoskymap() if frame not in ["galactic", "equatorial"]: raise RuntimeError(f"Unknown frame type {frame}") aframe = "icrs" if frame == "equatorial" else "galactic" obstime = Time(obstime, format="isot") skycoord_eq = SkyCoord(ra=ra * u.deg, dec=dec * u.deg, frame="icrs") skycoord_frame = kc.transform_to(skycoord_eq, frame, detector) # Prepare horizon (darkens the half of the sky that is above horizon) nside = 512 npix = hp.nside2npix(nside) p_frame, t_frame = hp.pix2ang(nside, range(npix), lonlat=True) coords = SkyCoord(p_frame * u.deg, t_frame * u.deg, frame=aframe) coords_loc = coords.transform_to(kc.local_frame(time=obstime, location=detector)) horizon = -1 * np.ones(npix) horizon[np.sin(coords_loc.alt.rad) > 0] = 1 # Prepare error circle if error_radius is not None: dist = coords.separation(skycoord_frame) mapcircle = np.zeros(hp.nside2npix(nside)) mapcircle[dist.deg <= error_radius] = 1 # Prepare galactic plane coords_gal = kc.transform_to(coords, "galactic", detector) galplane = -1 * np.ones(npix) galplane[np.abs(coords_gal.b.deg) < 2] = 1 # Prepare figure fig = plt.figure(figsize=(9, 5.3)) ax = plt.axes( [0.00, 0.15, 1, 0.78], projection="%s degrees mollweide" % ("astro" if frame == "equatorial" else "geo"), ) # Plot horizon ax.imshow_hpx( horizon, cmap=matplotlib.colors.ListedColormap(["white", "grey"]), vmin=0.0, vmax=1.0, alpha=0.3, ) # Plot galactic plane ax.imshow_hpx( galplane, cmap=matplotlib.colors.ListedColormap(["white", "red"]), vmin=0.0, vmax=1.0, alpha=0.4, ) # Plot point-source if frame == "equatorial": ax.scatter( skycoord_frame.ra.deg, skycoord_frame.dec.deg, marker="+", color="blue", transform=ax.get_transform("icrs"), ) elif frame == "galactic": ax.scatter( skycoord_frame.l.deg, skycoord_frame.b.deg, marker="+", color="blue", transform=ax.get_transform("itrs"), ) if error_radius is not None: ax.imshow_hpx( mapcircle, cmap=matplotlib.colors.ListedColormap(["white", "blue"]), alpha=0.4, vmin=0, vmax=1, ) # Axis configuration ax.grid() ax.set_facecolor("none") for key in ["ra", "dec", "longitude", "latitude"]: if key in ax.coords: ax.coords[key].set_auto_axislabel(False) # Draw custom legend handles = [] handles.append( matplotlib.lines.Line2D( [], [], color="blue", linewidth=0, marker="+", label="Alert position" ) ) if error_radius is not None: handles.append( matplotlib.patches.Patch(facecolor="blue", alpha=0.4, label="Error radius") ) handles.append( matplotlib.patches.Patch( edgecolor="black", facecolor="grey", alpha=0.3, label=r"Region above horizon at $t_{alert}$", ) ) handles.append( matplotlib.patches.Patch( edgecolor="darkred", facecolor="red", alpha=0.4, label="Galactic plane" ) ) fig.legend(handles=handles, loc="upper center", bbox_to_anchor=(0.50, 0.13), ncol=2) fig.suptitle(frame.capitalize() + " coordinates", fontsize=16) if outfile is not None: fig.savefig(outfile, dpi=300) return fig