Source code for cinrad.calc

# -*- coding: utf-8 -*-
# Author: Puyuan Du

import datetime
import time
from typing import *
from functools import wraps

import numpy as np
from xarray import DataArray, Dataset

try:
    from pykdtree.kdtree import KDTree
except ImportError:
    from scipy.spatial import KDTree

from cinrad.utils import *
from cinrad.grid import grid_2d, resample
from cinrad.projection import height, get_coordinate
from cinrad.constants import deg2rad
from cinrad.error import RadarCalculationError
from cinrad._typing import Volume_T
from cinrad.common import get_dtype
from cinrad.hca import hydro_class as _hca

__all__ = [
    "quick_cr",
    "quick_et",
    "quick_vil",
    "VCS",
    "quick_vild",
    "GridMapper",
    "hydro_class",
]


def require(var_names: List[str]) -> Callable:
    def wrap(func: Callable) -> Callable:
        @wraps(func)
        def deco(*args, **kwargs) -> Any:
            if len(args) == 1:
                varset = args[0]
            else:
                varset = list(args)
            if isinstance(varset, Dataset):
                var_list = list(varset.keys())
            elif isinstance(varset, list):
                var_list = list()
                for var in varset:
                    var_list += list(var.keys())
                var_list = list(set(var_list))
            for v in var_names:
                if v not in var_list:
                    raise ValueError(
                        "Function {} requires variable {}".format(func.__name__, v)
                    )
            return func(*args, **kwargs)

        return deco

    return wrap


def _extract(r_list: Volume_T, dtype: str) -> tuple:
    if len(set(i.range for i in r_list)) > 1:
        raise ValueError("Input radials must have same data range")
    adim_shape = set(i.dims["azimuth"] for i in r_list)
    if max(adim_shape) > 400:
        # CC radar
        adim_interp_to = 512
    else:
        adim_interp_to = 360
    r_data = list()
    elev = list()
    for i in r_list:
        x, d, a = resample(
            i[dtype].values,
            i["distance"].values,
            i["azimuth"].values,
            i.tangential_reso,
            adim_interp_to,
        )
        r_data.append(x)
        elev.append(i.elevation)
    data = np.concatenate(r_data).reshape(
        len(r_list), r_data[0].shape[0], r_data[0].shape[1]
    )
    return data, d, a, np.array(elev)


[docs]@require(["REF"]) def quick_cr(r_list: Volume_T, resolution: tuple = (1000, 1000)) -> Dataset: r"""Calculate composite reflectivity Args: r_list (list(xarray.Dataset)): Reflectivity data. Returns: xarray.Dataset: composite reflectivity """ r_data = list() for i in r_list: r, x, y = grid_2d( i["REF"].values, i["longitude"].values, i["latitude"].values, resolution=resolution, ) r_data.append(r) cr = np.nanmax(r_data, axis=0) ret = Dataset({"CR": DataArray(cr, coords=[y, x], dims=["latitude", "longitude"])}) ret.attrs = i.attrs ret.attrs["elevation"] = 0 return ret
[docs]@require(["REF"]) def quick_et(r_list: Volume_T) -> Dataset: r"""Calculate echo tops Args: r_list (list(xarray.Dataset)): Reflectivity data. Returns: xarray.Dataset: echo tops """ r_data, d, a, elev = _extract(r_list, "REF") i = r_list[0] et = echo_top( r_data.astype(np.double), d.astype(np.double), elev.astype(np.double), 0.0 ) azimuth = a[:, 0] distance = d[0] ret = Dataset( { "ET": DataArray( np.ma.masked_less(et, 2), coords=[azimuth, distance], dims=["azimuth", "distance"], ) } ) ret.attrs = i.attrs ret.attrs["elevation"] = 0 lon, lat = get_coordinate(distance, azimuth, 0, i.site_longitude, i.site_latitude) ret["longitude"] = (["azimuth", "distance"], lon) ret["latitude"] = (["azimuth", "distance"], lat) return ret
[docs]@require(["REF"]) def quick_vil(r_list: Volume_T) -> Dataset: r"""Calculate vertically integrated liquid. This algorithm process data in polar coordinates, which avoids the loss of data. By default, this function calls low-level function `vert_integrated_liquid` in C-extension. If the C-extension is not available, the python version will be used instead but with much slower speed. Args: r_list (list(xarray.Dataset)): Reflectivity data. Returns: xarray.Dataset: vertically integrated liquid """ r_data, d, a, elev = _extract(r_list, "REF") i = r_list[0] vil = vert_integrated_liquid( r_data.astype(np.double), d.astype(np.double), elev.astype(np.double) ) azimuth = a[:, 0] distance = d[0] ret = Dataset( { "VIL": DataArray( np.ma.masked_less(vil, 0.1), coords=[azimuth, distance], dims=["azimuth", "distance"], ) } ) ret.attrs = i.attrs ret.attrs["elevation"] = 0 lon, lat = get_coordinate(distance, azimuth, 0, i.site_longitude, i.site_latitude) ret["longitude"] = (["azimuth", "distance"], lon) ret["latitude"] = (["azimuth", "distance"], lat) return ret
[docs]def quick_vild(r_list: Volume_T) -> Dataset: r"""Calculate vertically integrated liquid density. By default, this function calls low-level function `vert_integrated_liquid` in C-extension. If the C-extension is not available, the python version will be used instead but with much slower speed. Args: r_list (list(xarray.Dataset)): Reflectivity data. Returns: xarray.Dataset: Vertically integrated liquid """ r_data, d, a, elev = _extract(r_list, "REF") i = r_list[0] vild = vert_integrated_liquid( r_data.astype(np.double), d.astype(np.double), elev.astype(np.double), density=True, ) azimuth = a[:, 0] distance = d[0] ret = Dataset( { "VILD": DataArray( np.ma.masked_less(vild, 0.1), coords=[azimuth, distance], dims=["azimuth", "distance"], ) } ) ret.attrs = i.attrs ret.attrs["elevation"] = 0 lon, lat = get_coordinate(distance, azimuth, 0, i.site_longitude, i.site_latitude) ret["longitude"] = (["azimuth", "distance"], lon) ret["latitude"] = (["azimuth", "distance"], lat) return ret
[docs]class VCS(object): r""" Class performing vertical cross-section calculation Args: r_list (list(xarray.Dataset)): The whole volume scan. """ def __init__(self, r_list: Volume_T): el = [i.elevation for i in r_list] if len(el) != len(set(el)): self.rl = list() el_list = list() for data in r_list: if data.elevation not in el_list: self.rl.append(data) el_list.append(data.elevation) else: self.rl = r_list self.dtype = get_dtype(r_list[0]) self.x, self.y, self.h, self.r = self._geocoor() self.attrs = r_list[0].attrs def _geocoor(self) -> Tuple[list]: r_data = list() x_data = list() y_data = list() h_data = list() for i in self.rl: _lon = i["longitude"].values _lat = i["latitude"].values r, x, y = grid_2d(i[self.dtype].values, _lon, _lat) r, x, y = grid_2d(i[self.dtype].values, _lon, _lat) r_data.append(r) x_data.append(x) y_data.append(y) hgh_grid, x, y = grid_2d(i["height"].values, _lon, _lat) h_data.append(hgh_grid) return x_data, y_data, h_data, r_data def _get_section( self, stp: Tuple[float, float], enp: Tuple[float, float], spacing: int ) -> Dataset: r_sec = list() h_sec = list() for x, y, h, r in zip(self.x, self.y, self.h, self.r): d_x = DataArray(r, [("lat", y), ("lon", x)]) d_h = DataArray(h, [("lat", y), ("lon", x)]) x_new = DataArray(np.linspace(stp[0], enp[0], spacing), dims="z") y_new = DataArray(np.linspace(stp[1], enp[1], spacing), dims="z") r_section = d_x.interp(lon=x_new, lat=y_new) h_section = d_h.interp(lon=x_new, lat=y_new) r_sec.append(r_section) h_sec.append(h_section) r = np.asarray(r_sec) h = np.asarray(h_sec) x = np.linspace(0, 1, spacing) * np.ones(r.shape[0])[:, np.newaxis] ret = Dataset( { self.dtype: DataArray(r, dims=["distance", "tilt"]), "y_cor": DataArray(h, dims=["distance", "tilt"]), "x_cor": DataArray(x, dims=["distance", "tilt"]), } ) r_attr = self.attrs.copy() del r_attr["elevation"], r_attr["tangential_reso"], r_attr["range"] r_attr["start_lon"] = stp[0] r_attr["start_lat"] = stp[1] r_attr["end_lon"] = enp[0] r_attr["end_lat"] = enp[1] ret.attrs = r_attr return ret
[docs] def get_section( self, start_polar: Optional[Tuple[float, float]] = None, end_polar: Optional[Tuple[float, float]] = None, start_cart: Optional[Tuple[float, float]] = None, end_cart: Optional[Tuple[float, float]] = None, spacing: int = 500, ) -> Dataset: r""" Get cross-section data from input points Args: start_polar (tuple): polar coordinates of start point i.e.(distance, azimuth) end_polar (tuple): polar coordinates of end point i.e.(distance, azimuth) start_cart (tuple): geographic coordinates of start point i.e.(longitude, latitude) end_cart (tuple): geographic coordinates of end point i.e.(longitude, latitude) Returns: xarray.Dataset: Cross-section data """ if start_polar and end_polar: stlat = self.rl[0].site_latitude stlon = self.rl[0].site_longitude stp = np.round_( get_coordinate( start_polar[0], start_polar[1] * deg2rad, 0, stlon, stlat ), 2, ) enp = np.round_( get_coordinate(end_polar[0], end_polar[1] * deg2rad, 0, stlon, stlat), 2 ) elif start_cart and end_cart: stp = start_cart enp = end_cart else: raise RadarCalculationError("Invalid input") return self._get_section(stp, enp, spacing)
[docs]class GridMapper(object): r""" This class can merge scans from different radars to a single cartesian grid. Args: fields (list(xarray.Dataset)): Lists of scans to be merged. max_dist (int, float): The maximum distance in kdtree searching. Example: >>> gm = GridMapper([r1, r2, r3]) >>> grid = gm(0.1) """ def __init__(self, fields: Volume_T, max_dist: Number_T = 0.1): # Process data type self.dtype = get_dtype(fields[0]) # Process time t_arr = np.array( [ time.mktime( datetime.datetime.strptime( i.scan_time, "%Y-%m-%d %H:%M:%S" ).timetuple() ) for i in fields ] ) if (t_arr.max() - t_arr.min()) / 60 > 10: raise RadarCalculationError( "Time difference of input data should not exceed 10 minutes" ) mean_time = t_arr.mean() mean_dtime = datetime.datetime(*time.localtime(int(mean_time))[:6]) time_increment = 10 time_rest = mean_dtime.minute % time_increment if time_rest > time_increment / 2: mean_dtime += datetime.timedelta(minutes=(time_increment - time_rest)) else: mean_dtime -= datetime.timedelta(minutes=time_rest) self.scan_time = mean_dtime self.lon_ravel = np.hstack([i["longitude"].values.ravel() for i in fields]) self.lat_ravel = np.hstack([i["latitude"].values.ravel() for i in fields]) self.data_ravel = np.ma.hstack([i[self.dtype].values.ravel() for i in fields]) self.dist_ravel = np.hstack( [ np.broadcast_to(i["distance"], i["longitude"].shape).ravel() for i in fields ] ) self.tree = KDTree(np.dstack((self.lon_ravel, self.lat_ravel))[0]) self.md = max_dist self.attr = fields[0].attrs.copy() def _process_grid(self, x_step: Number_T, y_step: Number_T) -> Tuple[np.ndarray]: x_lower = np.round_(self.lon_ravel.min(), 2) x_upper = np.round_(self.lon_ravel.max(), 2) y_lower = np.round_(self.lat_ravel.min(), 2) y_upper = np.round_(self.lat_ravel.max(), 2) x_grid = np.arange(x_lower, x_upper + x_step, x_step) y_grid = np.arange(y_lower, y_upper + x_step, x_step) return np.meshgrid(x_grid, y_grid) def _map_points(self, x: np.ndarray, y: np.ndarray) -> np.ma.MaskedArray: _MAX_RETURN = 5 _FILL_VALUE = -1e5 xdim, ydim = x.shape _, idx = self.tree.query( np.dstack((x.ravel(), y.ravel()))[0], distance_upper_bound=self.md, k=_MAX_RETURN, ) idx_all = idx.ravel() data_indexing = np.append(self.data_ravel, _FILL_VALUE) dist_indexing = np.append(self.dist_ravel, 0) target_rad = np.ma.masked_equal(data_indexing[idx_all], _FILL_VALUE) weight = dist_indexing[idx_all] inp = target_rad.reshape(xdim, ydim, _MAX_RETURN) wgt = weight.reshape(xdim, ydim, _MAX_RETURN) return np.ma.average(inp, weights=1 / wgt, axis=2) def __call__(self, step: Number_T) -> Dataset: r""" Args: step (int, float): Output grid spacing. Returns: xarray.Dataset: Merged grid data. """ x, y = self._process_grid(step, step) grid = self._map_points(x, y) grid = np.ma.masked_outside(grid, 0.1, 100) ret = Dataset( { self.dtype: DataArray( grid, coords=[y[:, 0], x[0]], dims=["latitude", "longitude"] ) } ) r_attr = self.attr # Keep this attribute temporarily waiting for future fix r_attr["tangential_reso"] = np.nan r_attr["elevation"] = 0 r_attr["site_name"] = "RADMAP" r_attr["site_code"] = "RADMAP" r_attr["scan_time"] = self.scan_time.strftime("%Y-%m-%d %H:%M:%S") del ( r_attr["site_longitude"], r_attr["site_latitude"], r_attr["nyquist_vel"], ) ret.attrs = r_attr return ret
[docs]@require(["REF", "ZDR", "RHO", "KDP"]) def hydro_class( z: Dataset, zdr: Dataset, rho: Dataset, kdp: Dataset, band: str = "S" ) -> Dataset: r"""Hydrometeor classification Args: z (xarray.Dataset): Reflectivity data. zdr (xarray.Dataset): Differential reflectivity data. rho (xarray.Dataset): Cross-correlation coefficient data. kdp (xarray.Dataset): Specific differential phase data. band (str): Band of the radar, default to S. Returns: xarray.Dataset: Classification result. """ z_data = z["REF"].values zdr_data = zdr["ZDR"].values rho_data = rho["RHO"].values kdp_data = kdp["KDP"].values result = _hca( z_data.ravel(), zdr_data.ravel(), rho_data.ravel(), kdp_data.ravel(), band=band ) result = result.reshape(z_data.shape).astype(float) result[np.isnan(z_data)] = np.nan hcl = z.copy() hcl["HCL"] = (["azimuth", "distance"], result) del hcl["REF"] return hcl