# -*- coding: utf-8 -*-
# Author: Puyuan Du
from typing import Union, Any
import numpy as np
from cinrad.constants import deg2rad, vil_const
from cinrad.projection import height
from cinrad._typing import Array_T, Number_T
try:
from cinrad._utils import *
except ImportError:
# When the C-extension doesn't exist, define the functions in Python.
def r2z(r: np.ndarray) -> np.ndarray:
return 10 ** (r / 10)
def vert_integrated_liquid(
ref: np.ndarray,
distance: np.ndarray,
elev: Array_T,
beam_width: float = 0.99,
threshold: Union[float, int] = 18.0,
density: bool = False,
) -> np.ndarray:
r"""
Calculate vertically integrated liquid (VIL) in one full scan
Parameters
----------
ref: numpy.ndarray dim=3 (elevation angle, distance, azimuth)
reflectivity data
distance: numpy.ndarray dim=2 (distance, azimuth)
distance from radar site
elev: numpy.ndarray or list dim=1
elevation angles in degree
threshold: float
minimum reflectivity value to take into calculation
Returns
-------
data: numpy.ndarray
vertically integrated liquid data
"""
if density:
raise NotImplementedError("VIL density calculation is not implemented")
v_beam_width = beam_width * deg2rad
elev = np.array(elev) * deg2rad
xshape, yshape = ref[0].shape
distance *= 1000
hi_arr = distance * np.sin(v_beam_width / 2)
vil = _vil_iter(xshape, yshape, ref, distance, elev, hi_arr, threshold)
return vil
def _vil_iter(
xshape: int,
yshape: int,
ref: np.ndarray,
distance: np.ndarray,
elev: Array_T,
hi_arr: np.ndarray,
threshold: Number_T,
) -> np.ndarray:
# r = np.clip(ref, None, 55) #reduce the influence of hails
r = ref
z = r2z(r)
VIL = np.zeros((xshape, yshape))
for i in range(xshape):
for j in range(yshape):
vert_r = r[:, i, j]
vert_z = z[:, i, j]
dist = distance[i][j]
position = np.where(vert_r > threshold)[0]
if position.shape[0] == 0:
continue
pos_s = position[0]
pos_e = position[-1]
m1 = 0
hi = hi_arr[i][j]
for l in range(pos_e):
ht = dist * (np.sin(elev[l + 1]) - np.sin(elev[l]))
factor = ((vert_z[l] + vert_z[l + 1]) / 2) ** (4 / 7)
m1 += vil_const * factor * ht
mb = vil_const * vert_z[pos_s] ** (4 / 7) * hi
mt = vil_const * vert_z[pos_e] ** (4 / 7) * hi
VIL[i][j] = m1 + mb + mt
return VIL
def echo_top(
ref: np.ndarray,
distance: np.ndarray,
elev: Array_T,
radarheight: Number_T,
threshold: Number_T = 18.0,
) -> np.ndarray:
r"""
Calculate height of echo tops (ET) in one full scan
Parameters
----------
ref: numpy.ndarray dim=3 (elevation angle, distance, azimuth)
reflectivity data
distance: numpy.ndarray dim=2 (distance, azimuth)
distance from radar site
elev: numpy.ndarray or list dim=1
elevation angles in degree
radarheight: int or float
height of radar
drange: float or int
range of data to be calculated
threshold: float
minimum value of reflectivity to be taken into calculation
Returns
-------
data: numpy.ndarray
echo tops data
"""
ref[np.isnan(ref)] = 0
xshape, yshape = ref[0].shape
et = np.zeros((xshape, yshape))
h_ = list()
for i in elev:
h = height(distance, i, radarheight)
h_.append(h)
hght = np.concatenate(h_).reshape(ref.shape)
for i in range(xshape):
for j in range(yshape):
vert_h = hght[:, i, j]
vert_r = ref[:, i, j]
if vert_r.max() < threshold: # Vertical points don't satisfy threshold
et[i][j] = 0
continue
elif vert_r[-1] >= threshold: # Point in highest scan exceeds threshold
et[i][j] = vert_h[-1]
continue
else:
position = np.where(vert_r >= threshold)[0]
if position[-1] == 0:
et[i][j] = vert_h[0]
continue
else:
pos = position[-1]
z1 = vert_r[pos]
z2 = vert_r[pos + 1]
h1 = vert_h[pos]
h2 = vert_h[pos + 1]
w1 = (z1 - threshold) / (z1 - z2)
w2 = 1 - w1
et[i][j] = w1 * h2 + w2 * h1
return et
[docs]def potential_maximum_gust(et: np.ndarray, vil: np.ndarray) -> np.ndarray:
r"""
Estimate the potential maximum gust with a descending downdraft by Stewart's formula
"""
return np.sqrt(20.628571 * vil - 2.3810964e-6 * et ** 2)
def potential_maximum_gust_from_reflectivity(
ref: np.ndarray, distance: np.ndarray, elev: Array_T
) -> np.ndarray:
et = echo_top(ref, distance, elev, 0)
vil = vert_integrated_liquid(ref, distance, elev)
return potential_maximum_gust(et, vil)
def lanczos_differentiator(winlen: int):
# Copyright (c) 2011-2018, wradlib developers.
m = (winlen - 1) / 2
denom = m * (m + 1.0) * (2 * m + 1.0)
k = np.arange(1, m + 1)
f = 3 * k / denom
return np.r_[f[::-1], [0], -f]
def kdp_from_phidp(
phidp: np.ndarray, winlen: int = 7, dr: float = 1.0, method: bool = None
) -> np.ndarray:
from scipy.stats import linregress
from scipy.ndimage.filters import convolve1d
# Copyright (c) 2011-2018, wradlib developers.
"""Retrieves :math:`K_{DP}` from :math:`Phi_{DP}`.
In normal operation the method uses convolution to estimate :math:`K_{DP}`
(the derivative of :math:`Phi_{DP}` with Low-noise Lanczos differentiators.
The results are very similar to the fallback moving window linear
regression (`method=slow`), but the former is *much* faster, depending on
the percentage of NaN values in the beam, though.
For further reading please see `Differentiation by integration using \
orthogonal polynomials, a survey <https://arxiv.org/pdf/1102.5219>`_ \
and `Low-noise Lanczos differentiators \
<http://www.holoborodko.com/pavel/numerical-methods/numerical-derivative/\
lanczos-low-noise-differentiators/>`_.
The fast method provides fast :math:`K_{DP}` retrieval but will return NaNs
in case at least one value in the moving window is NaN. The remaining gates
are treated by using local linear regression where possible.
Please note that the moving window size ``winlen`` is specified as the number of
range gates. Thus, this argument might need adjustment in case the
range resolution changes.
In the original publication (:cite:`Vulpiani2012`), the value ``winlen=7``
was chosen for a range resolution of 1km.
Warning
-------
The function is designed for speed by allowing to process
multiple dimensions in one step. For this purpose, the RANGE dimension
needs to be the LAST dimension of the input array.
Parameters
----------
phidp : :class:`numpy:numpy.ndarray`
multi-dimensional array, note that the range dimension must be the
last dimension of the input array.
winlen : int
Width of the window (as number of range gates)
dr : float
gate length in km
method : str
If None uses fast convolution based differentiation, if 'slow' uses
linear regression.
Examples
--------
>>> import wradlib
>>> import numpy as np
>>> import matplotlib.pyplot as pl
>>> pl.interactive(True)
>>> kdp_true = np.sin(3 * np.arange(0, 10, 0.1))
>>> phidp_true = np.cumsum(kdp_true)
>>> phidp_raw = phidp_true + np.random.uniform(-1, 1, len(phidp_true))
>>> gaps = np.concatenate([range(10, 20), range(30, 40), range(60, 80)])
>>> phidp_raw[gaps] = np.nan
>>> kdp_re = wradlib.dp.kdp_from_phidp(phidp_raw)
>>> line1 = pl.plot(np.ma.masked_invalid(phidp_true), "b--", label="phidp_true") # noqa
>>> line2 = pl.plot(np.ma.masked_invalid(phidp_raw), "b-", label="phidp_raw") # noqa
>>> line3 = pl.plot(kdp_true, "g-", label="kdp_true")
>>> line4 = pl.plot(np.ma.masked_invalid(kdp_re), "r-", label="kdp_reconstructed") # noqa
>>> lgnd = pl.legend(("phidp_true", "phidp_raw", "kdp_true", "kdp_reconstructed")) # noqa
>>> pl.show()
"""
assert (
winlen % 2
) == 1, "Window size N for function kdp_from_phidp must be an odd number."
shape = phidp.shape
phidp = phidp.reshape((-1, shape[-1]))
# Make really sure winlen is an integer
winlen = int(winlen)
if method == "slow":
kdp = np.zeros(phidp.shape) * np.nan
else:
window = lanczos_differentiator(winlen)
kdp = convolve1d(phidp, window, axis=1)
# find remaining NaN values with valid neighbours
invalidkdp = np.isnan(kdp)
if not np.any(invalidkdp.ravel()):
# No NaN? Return KdP
return kdp.reshape(shape) / 2.0 / dr
# Otherwise continue
x = np.arange(phidp.shape[-1])
valids = ~np.isnan(phidp)
kernel = np.ones(winlen, dtype="i4")
# and do the slow moving window linear regression
for beam in range(len(phidp)):
# number of valid neighbours around one gate
nvalid = np.convolve(valids[beam], kernel, "same") > winlen / 2
# find those gates which have invalid Kdp AND enough valid neighbours
nangates = np.where(invalidkdp[beam] & nvalid)[0]
# now iterate over those
for r in nangates:
ix = np.arange(
max(0, r - int(winlen / 2)), min(r + int(winlen / 2) + 1, shape[-1])
)
# check again (just to make sure...)
if np.sum(valids[beam, ix]) < winlen / 2:
# not enough valid values inside our window
continue
kdp[beam, r] = linregress(
x[ix][valids[beam, ix]], phidp[beam, ix[valids[beam, ix]]]
)[0]
# take care of the start and end of the beam
# start
ix = np.arange(0, winlen)
if np.sum(valids[beam, ix]) >= 2:
kdp[beam, 0 : int(winlen / 2)] = linregress(
x[ix][valids[beam, ix]], phidp[beam, ix[valids[beam, ix]]]
)[0]
# end
ix = np.arange(shape[-1] - winlen, shape[-1])
if np.sum(valids[beam, ix]) >= 2:
kdp[beam, -int(winlen / 2) :] = linregress(
x[ix][valids[beam, ix]], phidp[beam, ix[valids[beam, ix]]]
)[0]
# accounting for forward/backward propagation AND gate length
return kdp.reshape(shape) / 2.0 / dr