import numpy as np
import matplotlib.pyplot as plt
from astropy.wcs import WCS
from astropy.wcs.utils import pixel_to_skycoord, skycoord_to_pixel
from astropy.coordinates import SkyCoord
from astropy.time import Time
import astropy.units as u
from scipy.ndimage import (
map_coordinates, gaussian_filter, gaussian_filter1d,
center_of_mass, binary_fill_holes, label
)
try:
from astroquery.gaia import Gaia as _GaiaCatalog
import logging
logging.getLogger('astroquery').setLevel(logging.ERROR)
_ASTROQUERY_AVAILABLE = True
except ImportError:
print("Warning: astroquery is not available. Gaia-based center refinement will be disabled.")
_ASTROQUERY_AVAILABLE = False
[docs]
def get_alma_beam(sigma_maj, sigma_min, bpa_rad, size=15):
x = np.arange(-size, size + 1)
X, Y = np.meshgrid(x, x)
Xrot = X * np.cos(bpa_rad) + Y * np.sin(bpa_rad)
Yrot = -X * np.sin(bpa_rad) + Y * np.cos(bpa_rad)
kernel = np.exp(-(Xrot**2 / (2 * sigma_maj**2) + Yrot**2 / (2 * sigma_min**2)))
return kernel / kernel.sum()
[docs]
def deconvolve_beams(bmaj_t, bmin_t, pa_t, bmaj_i, bmin_i, pa_i):
bmaj_t = bmaj_t * (1.0 + 1e-10)
bmin_t = bmin_t * (1.0 + 1e-10)
fwhm2sig = 2.3548200450309493
def to_cov(bmaj, bmin, pa):
sig_maj = bmaj / fwhm2sig
sig_min = bmin / fwhm2sig
th = np.radians(90.0 - pa)
c, s = np.cos(th), np.sin(th)
R = np.array([[c, -s], [s, c]])
S = np.array([[sig_maj**2, 0], [0, sig_min**2]])
return R @ S @ R.T
C_t = to_cov(bmaj_t, bmin_t, pa_t)
C_i = to_cov(bmaj_i, bmin_i, pa_i)
C_c = C_t - C_i
vals, vecs = np.linalg.eigh(C_c)
if np.any(vals < 0):
return None, None, None
sig_min, sig_maj = np.sqrt(vals[0]), np.sqrt(vals[1])
bmaj_c = sig_maj * fwhm2sig
bmin_c = sig_min * fwhm2sig
dy = vecs[1, 1]
dx = vecs[0, 1]
phi_c = np.degrees(np.arctan2(dy, dx))
pa_c = (90.0 - phi_c) % 180.0
return bmaj_c, bmin_c, pa_c
[docs]
def make_gaussian_kernel_casa(bmaj_c, bmin_c, pa_c, pixel_scale):
sigma_maj = (bmaj_c / pixel_scale) / 2.35482
sigma_min = (bmin_c / pixel_scale) / 2.35482
size = int(np.ceil(sigma_maj * 5))
if size % 2 == 0:
size += 1
x = np.arange(-size, size + 1)
X, Y = np.meshgrid(x, x)
th = np.radians(90.0 - pa_c)
Xrot = X * np.cos(th) + Y * np.sin(th)
Yrot = -X * np.sin(th) + Y * np.cos(th)
kernel = np.exp(-(Xrot**2 / (2 * sigma_maj**2) + Yrot**2 / (2 * sigma_min**2)))
return kernel / np.sum(kernel)
[docs]
def find_center_robust(data, pixel_scale, header):
cy_cen, cx_cen = data.shape[0] // 2, data.shape[1] // 2
search_rad_pix = int(3.0 / pixel_scale)
y_min = max(0, cy_cen - search_rad_pix)
y_max = min(data.shape[0], cy_cen + search_rad_pix)
x_min = max(0, cx_cen - search_rad_pix)
x_max = min(data.shape[1], cx_cen + search_rad_pix)
crop = data[y_min:y_max, x_min:x_max]
bmaj_arcsec = header.get('BMAJ', 0) * 3600
sigma_as = max(bmaj_arcsec / 2.355, 0.08)
sigma_pix = sigma_as / pixel_scale
smoothed = gaussian_filter(crop, sigma=sigma_pix)
peak = np.nanmax(smoothed)
mask = smoothed > (peak * 0.20)
labeled, num_features = label(mask)
if num_features > 0:
sizes = np.bincount(labeled.ravel())
sizes[0] = 0
main_comp = np.argmax(sizes)
main_mask = (labeled == main_comp)
else:
main_mask = mask
filled = binary_fill_holes(main_mask)
pixels_orig = np.sum(main_mask)
pixels_fill = np.sum(filled)
if pixels_orig > 0 and (pixels_fill > pixels_orig * 1.03):
y_idx, x_idx = np.where(filled)
cy_local = (np.min(y_idx) + np.max(y_idx)) / 2.0
cx_local = (np.min(x_idx) + np.max(x_idx)) / 2.0
else:
if np.any(main_mask):
cy_local, cx_local = center_of_mass(main_mask)
else:
cy_local, cx_local = np.unravel_index(np.argmax(smoothed), smoothed.shape)
return float(cx_local + x_min), float(cy_local + y_min)
[docs]
def auto_detect_parameters(data, header, pixel_scale, cx, cy):
bmaj = header.get('BMAJ', 0) * 3600
rmin = max(bmaj * 1.2, 0.15) if bmaj > 0 else 0.2
y, x = np.indices(data.shape)
r = np.hypot(x - cx, y - cy) * pixel_scale
bins = np.arange(0, 10.0, pixel_scale)
prof, _ = np.histogram(r, bins=bins, weights=data)
counts, _ = np.histogram(r, bins=bins)
prof_mean = prof / np.maximum(counts, 1)
prof_smooth = gaussian_filter1d(prof_mean, sigma=2)
edge = np.concatenate([data[:10, :].ravel(), data[-10:, :].ravel(),
data[:, :10].ravel(), data[:, -10:].ravel()])
rms = np.nanstd(edge)
if rms <= 0:
rms = 1e-9
snr_threshold = 3.0
above_snr = prof_smooth > (snr_threshold * rms)
max_gap_bins = int(0.3 / pixel_scale)
rout_idx = 0
gap_counter = 0
for i in range(len(above_snr)):
if above_snr[i]:
rout_idx = i
gap_counter = 0
else:
gap_counter += 1
if gap_counter > max_gap_bins:
break
rout = bins[rout_idx] + (pixel_scale * 2.0)
rout += 0.05
rout = np.clip(rout, 0.15, 8.0)
return float(rmin), float(rout), float(bmaj)
[docs]
def save_debug_deproj_center(image, cx, cy, incl, pa, rout_arcsec, pixel_scale, out_png, title):
dim = 500
x = np.arange(dim) - dim / 2.0
X, Y = np.meshgrid(x, x)
Xc = X * np.cos(np.radians(incl))
Xrot = np.cos(np.radians(pa)) * Xc + np.sin(np.radians(pa)) * Y
Yrot = -np.sin(np.radians(pa)) * Xc + np.cos(np.radians(pa)) * Y
crop_rad = int((rout_arcsec / pixel_scale) * 1.5) + 15
crop_rad = min(crop_rad, image.shape[0]//2, image.shape[1]//2)
scale = (crop_rad * 2) / dim
coords = [Yrot * scale + cy, -Xrot * scale + cx]
deproj = map_coordinates(image, coords, order=1, mode='constant', cval=0.0)
fig, ax = plt.subplots(figsize=(6, 6), dpi=150)
vmin, vmax = np.percentile(deproj, [2, 99.5]) if deproj.size > 0 else (np.min(deproj), np.max(deproj))
ax.imshow(deproj, origin='lower', cmap='inferno', vmin=vmin, vmax=vmax)
center_x, center_y = (dim - 1) / 2.0, (dim - 1) / 2.0
ax.scatter([center_x], [center_y], c='white', s=60, marker='x', linewidths=1.5)
r_pix = (rout_arcsec / pixel_scale) / scale
circ = plt.Circle((center_x, center_y), r_pix, fill=False, ec='cyan', lw=1.8, ls='--')
ax.add_patch(circ)
ax.set_title(title)
ax.set_axis_off()
plt.tight_layout()
plt.savefig(out_png, bbox_inches='tight')
plt.close(fig)
[docs]
def measure_rout_deproj(data, header, pixel_scale, cx, cy, incl, pa, rmin=0.0):
SNR_THR = 2.0
gap_tol = 0.50
pa_rad = np.radians(pa)
incl_rad = np.radians(incl)
cos_i = max(np.cos(incl_rad), 0.05)
bmaj_as = header.get('BMAJ', 0) * 3600.0
ny, nx = data.shape
r_max_pix = min(ny // 2, nx // 2, 800)
y_idx = np.arange(-r_max_pix, r_max_pix + 1)
x_idx = np.arange(-r_max_pix, r_max_pix + 1)
Xf, Yf = np.meshgrid(x_idx, y_idx)
R_maj = -Xf * np.sin(pa_rad) + Yf * np.cos(pa_rad)
R_min = Xf * np.cos(pa_rad) + Yf * np.sin(pa_rad)
R_min_dep = R_min / cos_i
R_pix_fits = np.sqrt(R_maj**2 + R_min_dep**2)
R_arcsec = R_pix_fits * pixel_scale
samp_y = R_maj * np.cos(pa_rad) + R_min * np.sin(pa_rad) + cy
samp_x = -R_maj * np.sin(pa_rad) + R_min * np.cos(pa_rad) + cx
deproj = map_coordinates(data, [samp_y, samp_x], order=1, mode='constant', cval=0.0)
r_arr = R_arcsec.ravel()
d_arr = deproj.ravel()
r_max_as = r_max_pix * pixel_scale
bin_size = pixel_scale
bins = np.arange(0, r_max_as + bin_size, bin_size)
prof = np.zeros(len(bins) - 1)
for k in range(len(bins) - 1):
m = (r_arr >= bins[k]) & (r_arr < bins[k + 1])
if m.sum() > 3:
prof[k] = np.nanmean(d_arr[m])
r_centers = (bins[:-1] + bins[1:]) / 2.0
r_85 = r_max_as * 0.85
edge_mask = r_arr > r_85
if edge_mask.sum() > 50:
rms = max(float(np.nanstd(d_arr[edge_mask])), 1e-10)
else:
edge = np.concatenate([data[:8, :].ravel(), data[-8:, :].ravel(),
data[:, :8].ravel(), data[:, -8:].ravel()])
rms = max(float(np.nanstd(edge)), 1e-10)
prof_s = gaussian_filter1d(prof, sigma=2)
gap_bins = max(1, int(gap_tol / bin_size))
rout_idx = 0
gap_count = 0
for k, (r, v) in enumerate(zip(r_centers, prof_s)):
if r < rmin:
continue
if v > SNR_THR * rms:
rout_idx = k
gap_count = 0
else:
gap_count += 1
if gap_count > gap_bins:
break
margin = max(bmaj_as * 1.0, pixel_scale * 3, 0.03)
rout = float(np.clip(r_centers[rout_idx] + margin, 0.10, 8.0))
return rout
[docs]
def refine_center_local(data, header, pixel_scale, cx_init, cy_init):
bmaj_arcsec = header.get('BMAJ', 0) * 3600
search_rad_arcsec = max(bmaj_arcsec * 0.6, 0.15) if bmaj_arcsec > 0 else 0.25
search_rad_pix = max(int(np.ceil(search_rad_arcsec / pixel_scale)), 3)
y_min = max(0, int(round(cy_init)) - search_rad_pix)
y_max = min(data.shape[0], int(round(cy_init)) + search_rad_pix + 1)
x_min = max(0, int(round(cx_init)) - search_rad_pix)
x_max = min(data.shape[1], int(round(cx_init)) + search_rad_pix + 1)
if y_max - y_min < 3 or x_max - x_min < 3:
return cx_init, cy_init
crop = data[y_min:y_max, x_min:x_max].copy()
sigma_pix = max((bmaj_arcsec / 2.355) / pixel_scale, 1.0) if bmaj_arcsec > 0 else 1.5
smoothed = gaussian_filter(crop, sigma=sigma_pix)
peak = np.nanmax(smoothed)
if peak <= 0:
return cx_init, cy_init
mask = smoothed > (peak * 0.25)
if not np.any(mask):
return cx_init, cy_init
cy_local, cx_local = center_of_mass(smoothed * mask)
cx_ref = cx_init - x_min
cy_ref = cy_init - y_min
if abs(cx_local - cx_ref) > search_rad_pix or abs(cy_local - cy_ref) > search_rad_pix:
return cx_init, cy_init
return float(cx_local + x_min), float(cy_local + y_min)
[docs]
def deg_to_sex(deg):
sign = '+' if deg >= 0 else '-'
deg = abs(deg)
d = int(deg)
m = int((deg - d) * 60)
s = (deg - d - m / 60.0) * 3600.0
return f"{sign}{d:03d}:{m:02d}:{s:06.3f}"
[docs]
def pixel_to_icrs(header, cx, cy):
wcs = WCS(header).celestial
coord = pixel_to_skycoord(cx, cy, wcs, origin=0)
icrs = coord.icrs
return float(icrs.ra.deg), float(icrs.dec.deg)
[docs]
def icrs_to_pixel(header, ra_deg, dec_deg):
wcs = WCS(header).celestial
coord = SkyCoord(ra=ra_deg * u.deg, dec=dec_deg * u.deg, frame='icrs')
x, y = skycoord_to_pixel(coord, wcs, origin=0)
return float(x), float(y)
[docs]
def get_obs_epoch(header):
date_obs = header.get('DATE-OBS', None)
if date_obs:
try:
return Time(str(date_obs).strip(), format='isot', scale='utc')
except Exception:
try:
return Time(str(date_obs).strip(), scale='utc')
except Exception:
pass
mjd_obs = header.get('MJD-OBS', None)
if mjd_obs is not None:
try:
return Time(float(mjd_obs), format='mjd', scale='utc')
except Exception:
pass
return None
[docs]
def query_gaia_proper_motion(ra_deg, dec_deg, search_radius_arcsec=3.0):
if not _ASTROQUERY_AVAILABLE:
return None, None, None
try:
_GaiaCatalog.MAIN_GAIA_TABLE = "gaiadr3.gaia_source"
_GaiaCatalog.ROW_LIMIT = 20
coord = SkyCoord(ra=ra_deg * u.deg, dec=dec_deg * u.deg, frame='icrs')
radius = u.Quantity(search_radius_arcsec, u.arcsec)
job = _GaiaCatalog.cone_search_async(coord, radius=radius, verbose=False)
r = job.get_results()
if len(r) == 0:
return None, None, None
gaia_coords = SkyCoord(
ra=np.array(r['ra'], dtype=float) * u.deg,
dec=np.array(r['dec'], dtype=float) * u.deg,
frame='icrs'
)
seps = coord.separation(gaia_coords).arcsec
order = np.argsort(seps)
for idx in order:
row = r[idx]
try:
pmra_raw = row['pmra']
pmdec_raw = row['pmdec']
if np.ma.is_masked(pmra_raw) or np.ma.is_masked(pmdec_raw):
continue
pmra_val = float(pmra_raw)
pmdec_val = float(pmdec_raw)
if not (np.isfinite(pmra_val) and np.isfinite(pmdec_val)):
continue
return pmra_val, pmdec_val, float(seps[idx])
except Exception:
continue
return None, None, None
except Exception:
return None, None, None
[docs]
def apply_proper_motion_correction(ra_deg, dec_deg, pmra_masyr, pmdec_masyr, dt_yr):
cos_dec = np.cos(np.radians(dec_deg))
delta_ra = (pmra_masyr * dt_yr) / (cos_dec * 3.6e6)
delta_dec = (pmdec_masyr * dt_yr) / 3.6e6
return float(ra_deg + delta_ra), float(dec_deg + delta_dec)