# -*- coding: utf-8 -*-
# Author: Puyuan Du
import warnings
import datetime
from pathlib import Path
from collections import namedtuple, defaultdict
from typing import Union, Optional, List, Any, Generator
import numpy as np
import xarray as xr
from cinrad.constants import deg2rad, con
from cinrad.projection import get_coordinate, height
from cinrad.error import RadarDecodeError
from cinrad.io.base import RadarBase, prepare_file
from cinrad.io._dtype import *
from cinrad._typing import Number_T
__all__ = ["CinradReader", "StandardData", "PhasedArrayData"]
ScanConfig = namedtuple("ScanConfig", SDD_cut.fields.keys())
utc_offset = datetime.timedelta(hours=8)
def vcp(el_num: int) -> str:
r"""Determine volume coverage pattern by number of scans."""
if el_num == 5:
task_name = "VCP31"
elif el_num == 9:
task_name = "VCP21"
elif el_num == 14:
task_name = "VCP11"
else:
task_name = "Unknown"
return task_name
def infer_type(f: Any, filename: str) -> tuple:
r"""Detect radar type from records in file"""
# Attempt to find information in file, which has higher
# priority compared with information obtained from file name
radartype = None
code = None
f.seek(100)
typestring = f.read(9)
if typestring == b"CINRAD/SC":
radartype = "SC"
elif typestring == b"CINRAD/CD":
radartype = "CD"
f.seek(116)
if f.read(9) == b"CINRAD/CC":
radartype = "CC"
# Read information from filename (if applicable)
if filename.startswith("RADA"):
spart = filename.split("-")
if len(spart) > 2:
code = spart[1]
radartype = spart[2]
elif filename.startswith("Z"):
spart = filename.split("_")
if len(spart) > 7:
code = spart[3]
radartype = spart[7]
return code, radartype
[docs]class CinradReader(RadarBase):
r"""
Class reading old-version CINRAD data.
Args:
file (str, IO): Path points to the file or a file object.
radar_type (str): Type of the radar.
file_name (str): Name of the file, only used when `file` argument is
a file object.
"""
def __init__(
self,
file: Any,
radar_type: Optional[str] = None,
):
f = prepare_file(file)
filename = Path(file).name if isinstance(file, str) else ""
self.code, t_infer = infer_type(f, filename)
if radar_type:
if t_infer != radar_type:
warnings.warn(
"Contradictory information from input radar type and"
"radar type detected from input file."
)
self.radartype = radar_type
else:
if not t_infer:
raise RadarDecodeError(
"Unable to determine the file type. Use `radar_type` keyword"
"to specify the radar type."
)
self.radartype = t_infer
self.site_info = {}
f.seek(0)
if self.radartype in ["SA", "SB"]:
self._SAB_reader(f)
elif self.radartype in ["CA", "CB"]:
self._SAB_reader(f, dtype="CAB")
elif self.radartype == "CC2":
self._CC2_reader(f)
else:
try:
if self.radartype == "CC":
self._CC_reader(f)
elif self.radartype in ["SC", "CD"]:
self._CD_reader(f)
else:
raise RadarDecodeError("Unrecognized data")
except Exception as err:
# Currently there's no good way to differentiate the special
# SC/CC files, so catch the exception of normal decoding process
# and try this one if possible
try:
f.seek(0)
self._SAB_reader(f, dtype="special")
except:
raise err
self._update_radar_info()
# TODO: Override information
if "longitude" in self.site_info:
self.stationlon = self.site_info["longitude"]
if "latitude" in self.site_info:
self.stationlat = self.site_info["latitude"]
if "height" in self.site_info:
self.radarheight = self.site_info["height"]
if "name" in self.site_info:
self.name = self.site_info["name"]
if "code" in self.site_info:
self.code = self.site_info["code"]
if self.code == None and self.name:
# Use name as code when code is missing
self.code = self.name
f.close()
def _SAB_reader(self, f: Any, dtype: str = "SAB"):
_header_size = 128
if dtype == "SAB":
radar_dtype = SAB_dtype
elif dtype == "CAB":
radar_dtype = CAB_dtype
elif dtype == "special":
radar_dtype = S_SPECIAL_dtype
_header_size = 132
data = np.frombuffer(f.read(), dtype=radar_dtype)
start = datetime.datetime(1969, 12, 31)
deltday = datetime.timedelta(days=int(data["day"][0]))
deltsec = datetime.timedelta(milliseconds=int(data["time"][0]))
self.scantime = start + deltday + deltsec
self.Rreso = data["gate_length_r"][0] / 1000
self.Vreso = data["gate_length_v"][0] / 1000
boundary = np.where(data["radial_num"] == 1)[0]
self.el = data["elevation"][boundary] * con
self.azimuth = data["azimuth"] * con * deg2rad
dv = data["v_reso"][0]
self.nyquist_v = data["nyquist_vel"][boundary] / 100
self.task_name = "VCP{}".format(data["vcp_mode"][0])
f.seek(0)
size = radar_dtype.itemsize
b = np.append(boundary, data.shape[0])
gnr = data["gate_num_r"][boundary]
gnv = data["gate_num_v"][boundary]
out_data = dict()
for bidx, rnum, vnum, idx in zip(np.diff(b), gnr, gnv, range(len(b))):
# `bidx`: number of data blocks (i.e. radials)
# `rnum`: number of reflectivity gates.
# `vnum`: number of velocity gates.
# `idx`: number of scans.
# Construct a temporary dtype to parse data more efficiently
temp_dtype = [
("header", "u1", _header_size),
("ref", "u1", rnum),
("vel", "u1", vnum),
("sw", "u1", vnum),
("res", "u1", size - _header_size - rnum - vnum * 2),
]
da = np.frombuffer(f.read(bidx * size), dtype=np.dtype(temp_dtype))
out_data[idx] = dict()
r = (np.ma.masked_equal(da["ref"], 0) - 2) / 2 - 32
r[r == 95.5] = 0
out_data[idx]["REF"] = r
v = np.ma.masked_less(da["vel"], 2)
sw = np.ma.masked_less(da["sw"], 2)
if dv == 2:
out_data[idx]["VEL"] = (v - 2) / 2 - 63.5
out_data[idx]["SW"] = (sw - 2) / 2 - 63.5
elif dv == 4:
out_data[idx]["VEL"] = v - 2 - 127
out_data[idx]["SW"] = sw - 2 - 127
out_data[idx]["azimuth"] = self.azimuth[b[idx] : b[idx + 1]]
out_data[idx]["RF"] = np.ma.masked_not_equal(da["vel"], 1)
angleindex = np.arange(0, data["el_num"][-1], 1)
if dtype == "special":
self.angleindex_r = self.angleindex_v = angleindex
else:
self.angleindex_r = np.delete(angleindex, [1, 3])
self.angleindex_v = np.delete(angleindex, [0, 2])
self.data = out_data
def _CC_reader(self, f: Any):
header = np.frombuffer(f.read(1024), CC_header)
self.site_info = {
"name": header["cStation"][0].decode("gbk", "ignore"),
"code": header["cStationNumber"][0].decode("utf-8", "ignore")[:5],
}
scan_mode = header["ucScanMode"][0]
if scan_mode < 100:
raise NotImplementedError("Only VPPI scan mode is supported")
stop_angle = scan_mode - 100
self.scantime = (
datetime.datetime(
header["ucEYear1"][0] * 100 + header["ucEYear2"][0],
header["ucEMonth"][0],
header["ucEDay"][0],
header["ucEHour"][0],
header["ucEMinute"][0],
header["ucESecond"][0],
)
- utc_offset
)
f.seek(218)
param = np.frombuffer(f.read(660), CC_param)
self.el = param["usAngle"][:stop_angle] / 100
self.nyquist_v = param["usMaxV"][:stop_angle] / 100
self.task_name = vcp(len(self.el))
f.seek(1024)
data = np.frombuffer(f.read(), CC_data)
r = np.ma.masked_equal(data["Z"], -0x8000) / 10
v = np.ma.masked_equal(data["V"], -0x8000) / 10
w = np.ma.masked_equal(data["W"], -0x8000) / 10
self.Rreso = 0.3
self.Vreso = 0.3
data = dict()
for i in range(len(self.el)):
data[i] = dict()
data[i]["REF"] = r[i * 512 : (i + 1) * 512]
data[i]["VEL"] = v[i * 512 : (i + 1) * 512]
data[i]["SW"] = w[i * 512 : (i + 1) * 512]
data[i]["azimuth"] = self.get_azimuth_angles(i)
self.data = data
self.angleindex_r = self.angleindex_v = [i for i in range(len(self.el))]
def _CD_reader(self, f: Any):
header = np.frombuffer(f.read(CD_dtype.itemsize), CD_dtype)
el_num = header["obs"]["stype"][0] - 100 # VOL
self.task_name = vcp(el_num)
self.scantime = (
datetime.datetime(
header["obs"]["syear"][0],
header["obs"]["smonth"][0],
header["obs"]["sday"][0],
header["obs"]["shour"][0],
header["obs"]["sminute"][0],
header["obs"]["ssecond"][0],
)
- utc_offset
)
self.nyquist_v = header["obs"]["layerparam"]["MaxV"][0][:el_num] / 100
self.Rreso = self.Vreso = 0.25
self.el = header["obs"]["layerparam"]["Swangles"][0][:el_num] / 100
data = dict()
for el in range(el_num):
full_scan = np.frombuffer(f.read(360 * CD_DATA.itemsize), CD_DATA)
# Avoid uint8 arithmetic overflow
raw_ref = full_scan["rec"]["m_dbz"].astype(int)
raw_vel = full_scan["rec"]["m_vel"].astype(int)
raw_sw = full_scan["rec"]["m_sw"].astype(int)
data[el] = dict()
data[el]["REF"] = (np.ma.masked_equal(raw_ref, 0) - 64) / 2
data[el]["VEL"] = (
self.nyquist_v[el] * (np.ma.masked_equal(raw_vel, 0) - 128) / 128
)
data[el]["SW"] = self.nyquist_v[el] * np.ma.masked_equal(raw_sw, 0) / 256
data[el]["RF"] = np.ma.masked_not_equal(raw_vel, 128)
self.data = data
self.angleindex_r = self.angleindex_v = list(range(el_num))
def _CC2_reader(self, f: Any):
header = np.frombuffer(f.read(CC2_header.itemsize), CC2_header)
self.site_info = {
"longitude": header["lLongitudeValue"][0] / 1000,
"latitude": header["lLatitudeValue"][0] / 1000,
"height": header["lHeight"][0] / 1000,
"name": header["sStation"][0].decode("gbk"),
}
obs_param = np.frombuffer(f.read(CC2_obs.itemsize), CC2_obs)
self.scantime = (
datetime.datetime(
obs_param["usSYear"][0],
obs_param["ucSMonth"][0],
obs_param["ucSDay"][0],
obs_param["ucSHour"][0],
obs_param["ucSMinute"][0],
obs_param["ucSSecond"][0],
)
- utc_offset
)
layer_info = obs_param["LayerInfo"]
scan_type = obs_param["ucType"][0]
if scan_type == 1:
# RHI
raise NotImplementedError("RHI scan type is not supported")
elif scan_type == 10:
# PPI
raise NotImplementedError("Single layer PPI scan type is not supported")
elif 100 < scan_type < 200:
# VOL
el_num = scan_type - 100
radial_num = layer_info["usRecordNumber"][0][:el_num]
self.el = layer_info["sSwpAngle"][0][:el_num] / 100
self.nyquist_v = layer_info["usMaxV"][0][:el_num] / 100
self.Rreso = layer_info["usZBinWidth"][0][0] / 10000
self.Vreso = layer_info["usVBinWidth"][0][0] / 10000
other_info = np.frombuffer(f.read(CC2_other.itemsize), CC2_other)
self.task_name = other_info["sScanName"][0].decode()
data_block = np.frombuffer(f.read(), CC2_data)
raw_azimuth = data_block["usAzimuth"] / 100 * deg2rad
raw_dbz = np.ma.masked_equal(data_block["ucCorZ"].astype(int), 0)
dbz = (raw_dbz - 64) / 2
zdr = np.ma.masked_equal(data_block["siZDR"].astype(int), -32768) * 0.01
phi = np.ma.masked_equal(data_block["siPHDP"].astype(int), -32768) * 0.01
rho = np.ma.masked_equal(data_block["siROHV"].astype(int), 0) * 0.001
kdp = np.ma.masked_equal(data_block["uKDP"].astype(int), -32768) * 0.01
data = dict()
radial_idx = [0] + (np.cumsum(radial_num) - 1).tolist()
for el in range(el_num):
start_radial_idx = radial_idx[el]
end_radial_idx = radial_idx[el + 1]
data[el] = dict()
data[el]["azimuth"] = raw_azimuth[start_radial_idx:end_radial_idx]
data[el]["REF"] = dbz[start_radial_idx:end_radial_idx]
data[el]["ZDR"] = zdr[start_radial_idx:end_radial_idx]
data[el]["PHI"] = phi[start_radial_idx:end_radial_idx]
data[el]["RHO"] = rho[start_radial_idx:end_radial_idx]
data[el]["KDP"] = kdp[start_radial_idx:end_radial_idx]
self.data = data
self.angleindex_r = self.angleindex_v = list(range(el_num))
[docs] def get_nrays(self, scan: int) -> int:
r"""Get number of radials in certain scan"""
if self.radartype in ["SA", "SB", "CA", "CB"]:
return len(self.data[scan]["azimuth"])
elif self.radartype == "CC":
return 512
elif self.radartype in ["SC", "CD"]:
return 360
[docs] def get_azimuth_angles(self, scans: Optional[int] = None) -> np.ndarray:
r"""Get index of input azimuth angle (radian)"""
if self.radartype in ["SA", "SB", "CA", "CB", "CC2"]:
if scans is None:
return self.azimuth
else:
return self.data[scans]["azimuth"]
elif self.radartype == "CC":
if scans is None:
return (
np.array(np.linspace(0, 360, 512).tolist() * self.get_nscans())
* deg2rad
)
else:
return np.array(np.linspace(0, 360, 512).tolist()) * deg2rad
elif self.radartype in ["SC", "CD"]:
if scans is None:
return (
np.array(np.linspace(0, 360, 360).tolist() * self.get_nscans())
* deg2rad
)
else:
return np.array(np.linspace(0, 360, 360).tolist()) * deg2rad
def get_elevation_angles(
self, scans: Optional[int] = None
) -> Union[np.ndarray, float]:
if scans is None:
return self.el
else:
return self.el[scans]
[docs] def get_raw(
self, tilt: int, drange: Number_T, dtype: str
) -> Union[np.ndarray, tuple]:
r"""
Get radar raw data
Args:
tilt (int): Index of elevation angle starting from zero.
drange (float): Radius of data.
dtype (str): Type of product (REF, VEL, etc.)
Returns:
numpy.ndarray or tuple of numpy.ndarray: Raw data
"""
rf_flag = False
self.tilt = tilt
reso = self.Rreso if dtype == "REF" else self.Vreso
dmax = np.round(self.data[tilt][dtype][0].shape[0] * reso)
if dmax < drange:
warnings.warn("Requested data range exceed max range in this tilt")
self.drange = drange
self.elev = self.el[tilt]
try:
data = np.ma.array(self.data[tilt][dtype])
# The structure of `out_data`:
# The key of `out_data` is the number of scan counting from zero (int).
# The value of `out_data` is also dictionary, the key of it are the abbreviations of
# product name, such as `REF`, `VEL`.
# The value of this sub-dictionary is the data stored in np.ma.MaskedArray.
except KeyError:
raise RadarDecodeError("Invalid product name")
ngates = int(drange // reso)
cut = data.T[:ngates]
shape_diff = ngates - cut.shape[0]
append = np.zeros((int(np.round(shape_diff)), cut.shape[1])) * np.ma.masked
if dtype in ["VEL", "SW"]:
try:
rf = self.data[tilt]["RF"]
except KeyError:
pass
else:
rf_flag = True
rf = rf.T[:ngates]
rf = np.ma.vstack([rf, append])
r = np.ma.vstack([cut, append])
if rf_flag:
r.mask = np.logical_or(r.mask, ~rf.mask)
ret = (r.T, rf.T)
else:
ret = r.T
return ret
[docs] def get_data(self, tilt: int, drange: Number_T, dtype: str) -> xr.Dataset:
r"""
Get radar data with extra information
Args:
tilt (int): Index of elevation angle starting from zero.
drange (float): Radius of data.
dtype (str): Type of product (REF, VEL, etc.)
Returns:
xarray.Dataset: Data.
"""
task = getattr(self, "task_name", None)
reso = self.Rreso if dtype == "REF" else self.Vreso
ret = self.get_raw(tilt, drange, dtype)
rf_flag = (dtype in ["VEL", "SW"]) and ("RF" in self.data[tilt])
x, y, z, d, a = self.projection(reso)
shape = ret[0].shape[1] if rf_flag else ret.shape[1]
if rf_flag:
da = xr.DataArray(ret[0], coords=[a, d], dims=["azimuth", "distance"])
else:
da = xr.DataArray(ret, coords=[a, d], dims=["azimuth", "distance"])
ds = xr.Dataset(
{dtype: da},
attrs={
"elevation": self.elev,
"range": int(np.round(shape * reso)),
"scan_time": self.scantime.strftime("%Y-%m-%d %H:%M:%S"),
"site_code": self.code,
"site_name": self.name,
"site_longitude": self.stationlon,
"site_latitude": self.stationlat,
"tangential_reso": reso,
"nyquist_vel": self.nyquist_v[tilt],
"task": task,
},
)
ds["longitude"] = (["azimuth", "distance"], x)
ds["latitude"] = (["azimuth", "distance"], y)
ds["height"] = (["azimuth", "distance"], z)
if rf_flag:
ds["RF"] = (["azimuth", "distance"], ret[1])
return ds
[docs] def projection(self, reso: float, h_offset: bool = False) -> tuple:
r"""Calculate the geographic coordinates of the requested data range."""
theta = self.get_azimuth_angles(self.tilt)
r = self.get_range(self.drange, reso)
lonx, latx = get_coordinate(
r, theta, self.elev, self.stationlon, self.stationlat, h_offset=h_offset
)
hght = (
height(r, self.elev, self.radarheight)
* np.ones(theta.shape[0])[:, np.newaxis]
)
return lonx, latx, hght, r, theta
def iter_tilt(self, drange: Number_T, dtype: str) -> Generator:
if dtype == "REF":
tlist = self.angleindex_r
elif dtype in ["VEL", "SW"]:
tlist = self.angleindex_v
for i in tlist:
yield self.get_data(i, drange, dtype)
[docs]class StandardData(RadarBase):
r"""
Class reading data in standard format.
Args:
file (str, IO): Path points to the file or a file object.
"""
# fmt: off
dtype_corr = {1:'TREF', 2:'REF', 3:'VEL', 4:'SW', 5:'SQI', 6:'CPA', 7:'ZDR', 8:'LDR',
9:'RHO', 10:'PHI', 11:'KDP', 12:'CP', 14:'HCL', 15:'CF', 16:'SNRH',
17:'SNRV', 32:'Zc', 33:'Vc', 34:'Wc', 35:'ZDRc'}
# fmt: on
def __init__(self, file: Any):
with prepare_file(file) as self.f:
self._parse()
self._update_radar_info()
# In standard data, station information stored in file
# has higher priority, so we override some information.
self.stationlat = self.geo["lat"]
self.stationlon = self.geo["lon"]
self.radarheight = self.geo["height"]
if self.name == "None":
# Last resort to find info
self.name = self.geo["name"]
self.angleindex_r = self.available_tilt("REF") # API consistency
del self.geo
def _parse(self):
header = np.frombuffer(self.f.read(32), SDD_header)
if header["magic_number"] != 0x4D545352:
raise RadarDecodeError("Invalid standard data")
site_config = np.frombuffer(self.f.read(128), SDD_site)
self.code = (
site_config["site_code"][0]
.decode("ascii", errors="ignore")
.replace("\x00", "")
)
freq = site_config["frequency"][0]
self.wavelength = 3e8 / freq / 10000
self.geo = geo = dict()
geo["lat"] = site_config["Latitude"][0]
geo["lon"] = site_config["Longitude"][0]
geo["height"] = site_config["ground_height"][0]
geo["name"] = site_config["site_name"][0].decode("ascii", errors="ignore")
task = np.frombuffer(self.f.read(256), SDD_task)
self.task_name = (
task["task_name"][0].decode("ascii", errors="ignore").split("\x00")[0]
)
self.scantime = datetime.datetime(1970, 1, 1) + datetime.timedelta(
seconds=int(task["scan_start_time"])
)
cut_num = task["cut_number"][0]
scan_config = np.frombuffer(self.f.read(256 * cut_num), SDD_cut)
self.scan_config = list()
for i in scan_config:
_config = ScanConfig(*i)
if _config.dop_reso > 32768: # fine detection scan
true_reso = np.round_((_config.dop_reso - 32768) / 100, 1)
_config_element = list(_config)
_config_element[11] = true_reso
_config_element[12] = true_reso
_config = ScanConfig(*_config_element)
self.scan_config.append(_config)
# TODO: improve repr
data = dict()
# `aux` stores some auxiliary information, including azimuth angles, elevation angles,
# and scale and offset of data.
aux = dict()
if task["scan_type"] == 2: # Single-layer RHI
self.scan_type = "RHI"
else:
# There are actually some other scan types, however, they are not currently supported.
self.scan_type = "PPI"
# Some attributes that are used only for converting to pyart.core.Radar instances
self._time_radial = list()
self._sweep_start_ray_index = list()
self._sweep_end_ray_index = list()
# Time for each radial
radial_count = 0
while 1:
try:
header_bytes = self.f.read(64)
if not header_bytes:
# Fix for single-tilt file
break
radial_header = np.frombuffer(header_bytes, SDD_rad_header)
if radial_header["zip_type"][0] == 1: # LZO compression
raise NotImplementedError("LZO compressed file is not supported")
self._time_radial.append(
radial_header["seconds"][0] + radial_header["microseconds"][0] / 1e6
)
el_num = radial_header["elevation_number"][0] - 1
if el_num not in data.keys():
data[el_num] = defaultdict(list)
aux[el_num] = defaultdict(list)
aux[el_num]["azimuth"].append(radial_header["azimuth"][0])
aux[el_num]["elevation"].append(radial_header["elevation"][0])
for _ in range(radial_header["moment_number"][0]):
moment_header = np.frombuffer(self.f.read(32), SDD_mom_header)
if moment_header["block_length"][0] == 0:
# Some ill-formed files
continue
dtype_code = moment_header["data_type"][0]
dtype = self.dtype_corr.get(dtype_code, None)
data_body = np.frombuffer(
self.f.read(moment_header["block_length"][0]),
"u{}".format(moment_header["bin_length"][0]),
)
if not dtype:
warnings.warn(
"Data type {} not understood, skipping".format(dtype_code),
RuntimeWarning,
)
continue
if dtype not in aux[el_num].keys():
scale = moment_header["scale"][0]
offset = moment_header["offset"][0]
aux[el_num][dtype] = (scale, offset)
# In `StandardData`, the `data` dictionary stores raw data instead of data
# calibrated by scale and offset.
# The calibration process is moved to `get_raw` part.
data[el_num][dtype].append(data_body)
radial_state = radial_header["radial_state"][0]
if radial_state in [0, 3]:
# Start of tilt or volume scan
self._sweep_start_ray_index.append(radial_count)
elif radial_state in [2, 4]:
self._sweep_end_ray_index.append(radial_count)
radial_count += 1
if radial_state in [4, 6]: # End scan
break
except EOFError:
# Decode broken files as much as possible
warnings.warn("Broken compressed file detected.", RuntimeWarning)
break
self.data = data
self.aux = aux
self.el = [i.elev for i in self.scan_config]
[docs] @classmethod
def merge(cls, files: List[str], output: str):
r"""
Merge single-tilt standard data into a volumetric scan
Args:
files (List[str]): List of path of data to be merged
output (str): The file path to store the merged data
"""
with prepare_file(files[0]) as first_file:
first_file.seek(160)
task = np.frombuffer(first_file.read(256), SDD_task)
cut_num = task["cut_number"][0]
total_seek_bytes = first_file.tell() + 256 * cut_num
all_tilt_data = [b""] * cut_num
first_file.seek(0)
header_bytes = first_file.read(total_seek_bytes)
rad = np.frombuffer(first_file.read(64), SDD_rad_header)
el_num = rad["elevation_number"][0] - 1
first_file.seek(total_seek_bytes)
all_tilt_data[el_num] = first_file.read()
for f in files[1:]:
with prepare_file(f) as buf:
buf.seek(total_seek_bytes)
rad = np.frombuffer(buf.read(64), SDD_rad_header)
buf.seek(total_seek_bytes)
el_num = rad["elevation_number"][0] - 1
all_tilt_data[el_num] = buf.read()
with open(output, "wb") as out:
out.write(header_bytes)
out.write(b"".join(all_tilt_data))
[docs] def get_raw(
self, tilt: int, drange: Number_T, dtype: str
) -> Union[np.ndarray, tuple]:
r"""
Get radar raw data
Args:
tilt (int): Index of elevation angle starting from zero.
drange (float): Radius of data.
dtype (str): Type of product (REF, VEL, etc.)
Returns:
numpy.ndarray or tuple of numpy.ndarray: Raw data
"""
# The scan number is set to zero in RHI mode.
self.tilt = tilt if self.scan_type == "PPI" else 0
if self.scan_type == "RHI":
max_range = self.scan_config[0].max_range1 / 1000
if drange > max_range:
drange = max_range
self.drange = drange
self.elev = self.el[tilt]
if dtype in ["VEL", "SW"]:
reso = self.scan_config[tilt].dop_reso / 1000
else:
reso = self.scan_config[tilt].log_reso / 1000
try:
raw = np.array(self.data[tilt][dtype])
except KeyError:
raise RadarDecodeError("Invalid product name")
ngates = int(drange // reso)
if raw.size == 0:
warnings.warn("Empty data", RuntimeWarning)
# Calculate size equivalent
nrays = len(self.aux[tilt]["azimuth"])
out = np.zeros((nrays, ngates)) * np.ma.masked
return out
# Data below 5 are used as reserved codes, which are used to indicate other
# information instead of real data, so they should be masked.
data = np.ma.masked_less(raw, 5)
cut = data[:, :ngates]
shape_diff = ngates - cut.shape[1]
append = np.zeros((cut.shape[0], int(shape_diff))) * np.ma.masked
if dtype in ["VEL", "SW"]:
# The reserved code 1 indicates folded velocity.
# These region will be shaded by color of `RF`.
rf = np.ma.masked_not_equal(cut.data, 1)
rf = np.ma.hstack([rf, append])
cut = np.ma.hstack([cut, append])
scale, offset = self.aux[tilt][dtype]
r = (cut - offset) / scale
if dtype in ["VEL", "SW"]:
ret = (r, rf)
# RF data is separately packed into the data.
else:
ret = r
return ret
[docs] def get_data(self, tilt: int, drange: Number_T, dtype: str) -> xr.Dataset:
r"""
Get radar data with extra information
Args:
tilt (int): Index of elevation angle starting from zero.
drange (float): Radius of data.
dtype (str): Type of product (REF, VEL, etc.)
Returns:
xarray.Dataset: Data.
"""
if dtype in ["VEL", "SW"]:
reso = self.scan_config[tilt].dop_reso / 1000
else:
reso = self.scan_config[tilt].log_reso / 1000
ret = self.get_raw(tilt, drange, dtype)
shape = ret[0].shape[1] if isinstance(ret, tuple) else ret.shape[1]
if self.scan_type == "PPI":
x, y, z, d, a = self.projection(reso)
if dtype in ["VEL", "SW"]:
da = xr.DataArray(ret[0], coords=[a, d], dims=["azimuth", "distance"])
else:
da = xr.DataArray(ret, coords=[a, d], dims=["azimuth", "distance"])
ds = xr.Dataset(
{dtype: da},
attrs={
"elevation": self.elev,
"range": int(np.round(shape * reso)),
"scan_time": self.scantime.strftime("%Y-%m-%d %H:%M:%S"),
"site_code": self.code,
"site_name": self.name,
"site_longitude": self.stationlon,
"site_latitude": self.stationlat,
"tangential_reso": reso,
"nyquist_vel": self.scan_config[tilt].nyquist_spd,
"task": self.task_name,
},
)
ds["longitude"] = (["azimuth", "distance"], x)
ds["latitude"] = (["azimuth", "distance"], y)
ds["height"] = (["azimuth", "distance"], z)
if dtype in ["VEL", "SW"]:
ds["RF"] = (["azimuth", "distance"], ret[1])
else:
# Manual projection
gate_num = ret[0].shape[1] if dtype in ["VEL", "SW"] else ret.shape[1]
dist = np.linspace(reso, self.drange, gate_num)
azimuth = self.aux[tilt]["azimuth"][0]
elev = self.aux[tilt]["elevation"]
d, e = np.meshgrid(dist, elev)
h = height(d, e, self.radarheight)
if dtype in ["VEL", "SW"]:
da = xr.DataArray(
ret[0], coords=[elev, dist], dims=["tilt", "distance"]
)
else:
da = xr.DataArray(ret, coords=[elev, dist], dims=["tilt", "distance"])
# Calculate the "start" and "end" of RHI scan
# to facilitate tick labeling
start_lon = self.stationlon
start_lat = self.stationlat
end_lon, end_lat = get_coordinate(
self.drange, azimuth * deg2rad, 0, self.stationlon, self.stationlat
)
ds = xr.Dataset(
{dtype: da},
attrs={
"range": self.drange,
"scan_time": self.scantime.strftime("%Y-%m-%d %H:%M:%S"),
"site_code": self.code,
"site_name": self.name,
"site_longitude": self.stationlon,
"site_latitude": self.stationlat,
"tangential_reso": reso,
"azimuth": azimuth,
"start_lon": start_lon,
"start_lat": start_lat,
"end_lon": end_lon,
"end_lat": end_lat,
"nyquist_vel": self.scan_config[tilt].nyquist_spd
},
)
ds["x_cor"] = (["tilt", "distance"], d)
ds["y_cor"] = (["tilt", "distance"], h)
return ds
def projection(self, reso: float) -> tuple:
r = self.get_range(self.drange, reso)
theta = np.array(self.aux[self.tilt]["azimuth"]) * deg2rad
lonx, latx = get_coordinate(
r, theta, self.elev, self.stationlon, self.stationlat
)
hght = (
height(r, self.elev, self.radarheight)
* np.ones(theta.shape[0])[:, np.newaxis]
)
return lonx, latx, hght, r, theta
[docs] def available_tilt(self, product: str) -> List[int]:
r"""Get all available tilts for given product"""
tilt = list()
for i in list(self.data.keys()):
if product in self.data[i].keys():
tilt.append(i)
return tilt
def iter_tilt(self, drange: Number_T, dtype: str) -> Generator:
for i in self.available_tilt(dtype):
yield self.get_data(i, drange, dtype)
[docs]class PhasedArrayData(RadarBase):
def __init__(self, file):
f = prepare_file(file)
filename = Path(file).name if isinstance(file, str) else ""
try:
self.code = self.name = filename.split("_")[3]
except IndexError:
self.code = self.name = "None"
self._d = data = np.frombuffer(f.read(), dtype=PA_radial)
self.stationlon = data["header"]["longitude"][0] * 360 / 65535
self.stationlat = data["header"]["latitude"][0] * 360 / 65535
self.radarheight = data["header"]["height"][0] * 1000 / 65535
self.scantime = datetime.datetime.utcfromtimestamp(
data["data"]["radial_time"][0]
)
self.reso = np.round(data["data"]["gate_length"][0] * 1000 / 65535) / 1000
self.first_gate_dist = (
np.round(data["data"]["first_gate_dist"][0] * 1000 / 65535) / 1000
)
el_num = data["data"]["el_num"][0]
az_num = data["data"]["az_num"][0]
radial_num = 2000
el = data["data"]["elevation"].astype(int) * 360 / 65535
self.el = el.reshape(az_num, el_num).max(axis=0)
self.data = dict()
self.aux = dict()
# fmt:off
tref = (
np.ma.masked_equal(
data["data"]["tref"].reshape(az_num, el_num, radial_num), 255
).astype(int) * 106 / 255 - 20
)
ref = (
np.ma.masked_equal(
data["data"]["ref"].reshape(az_num, el_num, radial_num), 255
).astype(int) * 106 / 255 - 20
)
vel = (
np.ma.masked_equal(
data["data"]["vel"].reshape(az_num, el_num, radial_num), 255
).astype(int) * 101 / 255 - 50
)
sw = (
np.ma.masked_equal(
data["data"]["sw"].reshape(az_num, el_num, radial_num), 255
).astype(int) * 101 / 255 - 50
)
zdr = (
np.ma.masked_equal(
data["data"]["zdr"].reshape(az_num, el_num, radial_num), 255
).astype(int) * 16 / 255 - 5
)
phi = (
np.ma.masked_equal(
data["data"]["phi"].reshape(az_num, el_num, radial_num), 255
).astype(int) * 181 / 255
)
kdp = (
np.ma.masked_equal(
data["data"]["kdp"].reshape(az_num, el_num, radial_num), 255
).astype(int) * 31 / 255 - 10
)
rho = (
np.ma.masked_equal(
data["data"]["rho"].reshape(az_num, el_num, radial_num), 255
).astype(int) * 1.1 / 255
)
# fmt:on
az = data["data"]["azimuth"].astype(int).reshape(az_num, el_num) * 360 / 65535
for el_idx in range(el_num):
self.data[el_idx] = dict()
self.data[el_idx]["TREF"] = tref[:, el_idx, :]
self.data[el_idx]["REF"] = ref[:, el_idx, :]
self.data[el_idx]["VEL"] = vel[:, el_idx, :]
self.data[el_idx]["SW"] = sw[:, el_idx, :]
self.data[el_idx]["ZDR"] = zdr[:, el_idx, :]
self.data[el_idx]["PHI"] = phi[:, el_idx, :]
self.data[el_idx]["KDP"] = kdp[:, el_idx, :]
self.data[el_idx]["RHO"] = rho[:, el_idx, :]
self.aux[el_idx] = dict()
self.aux[el_idx]["azimuth"] = az[:, el_idx]
[docs] def get_raw(
self, tilt: int, drange: Number_T, dtype: str
) -> Union[np.ndarray, tuple]:
r"""
Get radar raw data
Args:
tilt (int): Index of elevation angle starting from zero.
drange (float): Radius of data.
dtype (str): Type of product (REF, VEL, etc.)
Returns:
numpy.ndarray or tuple of numpy.ndarray: Raw data
"""
rf_flag = False
self.tilt = tilt
dmax = np.round(self.data[tilt][dtype][0].shape[0] * self.reso)
if dmax < drange:
warnings.warn("Requested data range exceed max range in this tilt")
self.drange = drange
self.elev = self.el[tilt]
try:
data = np.ma.array(self.data[tilt][dtype])
# The structure of `out_data`:
# The key of `out_data` is the number of scan counting from zero (int).
# The value of `out_data` is also dictionary, the key of it are the abbreviations of
# product name, such as `REF`, `VEL`.
# The value of this sub-dictionary is the data stored in np.ma.MaskedArray.
except KeyError:
raise RadarDecodeError("Invalid product name")
ngates = int(drange // self.reso)
cut = data.T[:ngates]
shape_diff = ngates - cut.shape[0]
append = np.zeros((int(np.round(shape_diff)), cut.shape[1])) * np.ma.masked
if dtype in ["VEL", "SW"]:
try:
rf = self.data[tilt]["RF"]
except KeyError:
pass
else:
rf_flag = True
rf = rf.T[:ngates]
rf = np.ma.vstack([rf, append])
r = np.ma.vstack([cut, append])
if rf_flag:
r.mask = np.logical_or(r.mask, ~rf.mask)
ret = (r.T, rf.T)
else:
ret = r.T
return ret
[docs] def get_data(self, tilt: int, drange: Number_T, dtype: str) -> xr.Dataset:
r"""
Get radar data with extra information
Args:
tilt (int): Index of elevation angle starting from zero.
drange (float): Radius of data.
dtype (str): Type of product (REF, VEL, etc.)
Returns:
xarray.Dataset: Data.
"""
# task = getattr(self, "task_name", None)
task = ""
ret = self.get_raw(tilt, drange, dtype)
rf_flag = (dtype in ["VEL", "SW"]) and ("RF" in self.data[tilt])
x, y, z, d, a = self.projection(self.reso)
shape = ret[0].shape[1] if rf_flag else ret.shape[1]
if rf_flag:
da = xr.DataArray(ret[0], coords=[a, d], dims=["azimuth", "distance"])
else:
da = xr.DataArray(ret, coords=[a, d], dims=["azimuth", "distance"])
ds = xr.Dataset(
{dtype: da},
attrs={
"elevation": self.elev,
"range": int(np.round(shape * self.reso)),
"scan_time": self.scantime.strftime("%Y-%m-%d %H:%M:%S"),
"site_code": self.code,
"site_name": self.name,
"site_longitude": self.stationlon,
"site_latitude": self.stationlat,
"tangential_reso": self.reso,
# "nyquist_vel": self.nyquist_v[tilt],
"task": task,
},
)
ds["longitude"] = (["azimuth", "distance"], x)
ds["latitude"] = (["azimuth", "distance"], y)
ds["height"] = (["azimuth", "distance"], z)
if rf_flag:
ds["RF"] = (["azimuth", "distance"], ret[1])
return ds
def projection(self, reso: float) -> tuple:
r = self.get_range(self.drange, reso)
theta = np.array(self.aux[self.tilt]["azimuth"]) * deg2rad
lonx, latx = get_coordinate(
r, theta, self.elev, self.stationlon, self.stationlat
)
hght = (
height(r, self.elev, self.radarheight)
* np.ones(theta.shape[0])[:, np.newaxis]
)
return lonx, latx, hght, r, theta