Source code for cinrad.io.level3

# -*- coding: utf-8 -*-
# Author: Puyuan Du

from collections import OrderedDict, defaultdict
from typing import Optional, Union, Any
import datetime
from io import BytesIO

import numpy as np
from xarray import Dataset, DataArray

from cinrad.projection import get_coordinate, height
from cinrad.constants import deg2rad
from cinrad._typing import Boardcast_T
from cinrad.io.base import RadarBase, prepare_file, _get_radar_info
from cinrad.io._dtype import *
from cinrad.error import RadarDecodeError


def xy2polar(x: Boardcast_T, y: Boardcast_T) -> tuple:
    return np.sqrt(x ** 2 + y ** 2), np.arctan2(x, y) * 180 / np.pi


# As metpy use a different table to map the data
# The color table constructed from PUP software is used to replace metpy
# fmt: off
velocity_tbl = np.array([np.nan, -27.5, -20.5, -15.5, -10.5, -5.5, -1.5,
                         -0.5, 0.5, 4.5, 9.5, 14.5, 19.5, 26.5, 27.5, 30])
# fmt: on


[docs]class PUP(RadarBase): r""" Class handling PUP data (Nexrad Level III data) Args: file (str, IO): Path points to the file or a file object. """ def __init__(self, file: Any): from metpy.io.nexrad import Level3File f = Level3File(file) # Because metpy interface doesn't provide station codes, # it's necessary to reopen it and read the code. with open(file, "rb") as buf: buf.seek(12) code = np.frombuffer(buf.read(2), ">i2")[0] cds = str(code).zfill(3) self.code = "Z9" + cds self._update_radar_info() product_code = f.prod_desc.prod_code self.dtype = self._det_product_type(product_code) self.radial_flag = self._is_radial(product_code) data_block = f.sym_block[0][0] data = np.array(data_block["data"], dtype=int) if self.dtype == "VEL": mapped_data = np.ma.masked_invalid(velocity_tbl[data]) rf = np.ma.masked_not_equal(mapped_data, 30) data = np.ma.masked_equal(mapped_data, 30) self.data = (data, rf) else: data[data == 0] = np.ma.masked self.data = np.ma.masked_invalid(f.map_data(data)) if self.dtype == "ET": # convert kft to km self.data *= 0.30478 elif self.dtype == "OHP": # convert in to mm self.data *= 25.4 station_info = _get_radar_info(self.code) self.radar_type = station_info[3] self.max_range = int(f.max_range) # Hard coding to adjust max range for different types of radar if f.max_range >= 230: if self.radar_type in ["SC", "CC"]: self.max_range = 150 elif self.radar_type in ["CA", "CB"]: self.max_range = 200 elif self.radar_type == "CD": self.max_range = 125 if self.radial_flag: start_az = data_block["start_az"][0] az = np.linspace(0, 360, data.shape[0]) az += start_az az[az > 360] -= 360 self.az = az * deg2rad self.reso = self.max_range / data.shape[1] self.rng = np.arange(self.reso, self.max_range + self.reso, self.reso) else: xdim, ydim = data.shape x = np.linspace(self.max_range * -1, self.max_range, xdim) / 111 + f.lon y = np.linspace(self.max_range, self.max_range * -1, ydim) / 111 + f.lat self.lon, self.lat = np.meshgrid(x, y) self.reso = self.max_range / data.shape[0] * 2 self.stationlat = f.lat self.stationlon = f.lon self.el = np.round_(f.metadata.get("el_angle", 0), 1) self.scantime = f.metadata["vol_time"]
[docs] def get_data(self) -> Dataset: r""" Get radar data with extra information. Returns: xarray.Dataset: Data. """ if self.radial_flag: lon, lat = self.projection() if self.dtype in ["VEL", "SW"]: da = DataArray( self.data[0], coords=[self.az, self.rng], dims=["azimuth", "distance"], ) else: da = DataArray( self.data, coords=[self.az, self.rng], dims=["azimuth", "distance"] ) ds = Dataset( {self.dtype: da}, attrs={ "elevation": self.el, "range": self.max_range, "scan_time": self.scantime.strftime("%Y-%m-%d %H:%M:%S"), "site_code": self.code, "site_name": self.name, "site_longitude": self.stationlon, "site_latitude": self.stationlat, "tangential_reso": self.reso, }, ) ds["longitude"] = (["azimuth", "distance"], lon) ds["latitude"] = (["azimuth", "distance"], lat) if self.dtype in ["VEL", "SW"]: ds["RF"] = (["azimuth", "distance"], self.data[1]) else: da = DataArray( self.data, coords=[self.lat[:, 0], self.lon[0]], dims=["latitude", "longitude"], ) ds = Dataset( {self.dtype: da}, attrs={ "elevation": 0, "range": self.max_range, "scan_time": self.scantime.strftime("%Y-%m-%d %H:%M:%S"), "site_code": self.code, "site_name": self.name, "site_longitude": self.stationlon, "site_latitude": self.stationlat, "tangential_reso": self.reso, }, ) return ds
@staticmethod def _is_radial(code: int) -> bool: return code in range(16, 31) or code == 78 def projection(self) -> tuple: return get_coordinate( self.rng, self.az, self.el, self.stationlon, self.stationlat, h_offset=False ) @staticmethod def _det_product_type(spec: int) -> str: if spec in range(16, 22): return "REF" elif spec in range(22, 28): return "VEL" elif spec in range(28, 31): return "SW" elif spec in range(37, 39): return "CR" elif spec == 41: return "ET" elif spec == 57: return "VIL" elif spec == 78: return "OHP" else: raise RadarDecodeError("Unsupported product type {}".format(spec))
[docs]class SWAN(object): r""" Class reading SWAN grid data. Args: file (str, IO): Path points to the file or a file object. """ dtype_conv = {0: "B", 1: "b", 2: "u2", 3: "i2", 4: "u2"} size_conv = {0: 1, 1: 1, 2: 2, 3: 2, 4: 2} def __init__(self, file: Any, product: Optional[str] = None): f = prepare_file(file) header = np.frombuffer(f.read(1024), SWAN_dtype) xdim, ydim, zdim = ( header["x_grid_num"][0], header["y_grid_num"][0], header["z_grid_num"][0], ) dtype = header["m_data_type"][0] data_size = int(xdim) * int(ydim) * int(zdim) * self.size_conv[dtype] bittype = self.dtype_conv[dtype] data_body = np.frombuffer(f.read(data_size), bittype).astype(int) # Convert data to i4 to avoid overflow in later calculation if zdim == 1: # 2D data out = data_body.reshape(ydim, xdim) else: # 3D data out = data_body.reshape(zdim, ydim, xdim) self.data_time = datetime.datetime( header["year"][0], header["month"][0], header["day"][0], header["hour"][0], header["minute"][0], ) # TODO: Recognize correct product name product_name = ( (b"".join(header["data_name"]).decode("gbk", "ignore").replace("\x00", "")) if not product else product ) self.product_name = product_name for pname in ["CR", "3DREF"]: if product_name.startswith(pname): self.product_name = pname start_lon = header["start_lon"][0] start_lat = header["start_lat"][0] center_lon = header["center_lon"][0] center_lat = header["center_lat"][0] end_lon = center_lon * 2 - start_lon end_lat = center_lat * 2 - start_lat # x_reso = header['x_reso'][0] # y_reso = header['y_reso'][0] self.lon = np.linspace(start_lon, end_lon, xdim) # For shape compatibility self.lat = np.linspace(start_lat, end_lat, ydim) if self.product_name in ["CR", "3DREF"]: self.data = (np.ma.masked_equal(out, 0) - 66) / 2 else: # Leave data unchanged because the scale and offset are unclear self.data = np.ma.masked_equal(out, 0)
[docs] def get_data(self, level: int = 0) -> Dataset: r""" Get radar data with extra information Args: level (int): The level of reflectivity data. Only used in `3DREF` data. Returns: xarray.Dataset: Data. """ dtype = self.product_name if self.data.ndim == 2: ret = self.data else: ret = self.data[level] if self.product_name == "3DREF": dtype = "CR" da = DataArray(ret, coords=[self.lat, self.lon], dims=["latitude", "longitude"]) ds = Dataset( {dtype: da}, attrs={ "scan_time": self.data_time.strftime("%Y-%m-%d %H:%M:%S"), "site_code": "SWAN", "site_name": "SWAN", "tangential_reso": np.nan, "range": np.nan, "elevation": 0, }, ) return ds
class StormTrackInfo(object): def __init__(self, filepath: str): from metpy.io.nexrad import Level3File self.handler = Level3File(filepath) self.info = self.get_all_sti() self.storm_list = self.get_all_id() def get_all_sti(self) -> OrderedDict: f = self.handler if not hasattr(f, "sym_block"): return OrderedDict() else: data_block = f.sym_block[0] sti_data = OrderedDict() data_dict = [i for i in data_block if isinstance(i, defaultdict)] for i in data_dict: if i["type"] == "Storm ID": sti_data[i["id"]] = defaultdict() sti_data[i["id"]]["current storm position"] = tuple( [i["x"], i["y"]] ) else: stid = list(sti_data.keys())[-1] if "markers" in i.keys(): pos = i["markers"] if isinstance(pos, dict): pos = [pos] name = list(pos[0].keys())[0] sti_data[stid][name] = list() sti_data[stid][name] += i.get("track") elif "STI Circle" in i.keys(): circle_dict = i["STI Circle"] sti_data[stid]["radius"] = circle_dict["radius"] sti_data[stid]["current storm position"] = tuple( [circle_dict["x"], circle_dict["y"]] ) return sti_data def get_all_id(self) -> list: return list(self.info.keys()) def current(self, storm_id: str) -> tuple: curpos = self.info[storm_id]["current storm position"] dist, az = xy2polar(*curpos) lonlat = get_coordinate( dist, az * deg2rad, 0, self.handler.lon, self.handler.lat, h_offset=False ) return lonlat def track(self, storm_id: str, tracktype: str) -> Optional[tuple]: if tracktype == "forecast": key = "forecast storm position" elif tracktype == "past": key = "past storm position" else: raise KeyError("Key {} does not exist".format(key)) if key not in self.info[storm_id].keys(): return None forpos = self.info[storm_id][key] if forpos == None: return x_pos = np.array([i[0] for i in forpos]) y_pos = np.array([i[1] for i in forpos]) pol_pos = xy2polar(x_pos, y_pos) lon = list() lat = list() for dis, azi in zip(pol_pos[0], pol_pos[1]): pos_tup = get_coordinate( dis, azi * deg2rad, 0, self.handler.lon, self.handler.lat, h_offset=False, ) lon.append(pos_tup[0]) lat.append(pos_tup[1]) return np.array(lon), np.array(lat) class HailIndex(object): def __init__(self, filepath: str): from metpy.io.nexrad import Level3File self.handler = Level3File(filepath) self.info = self.get_all_hi() self.storm_list = self.get_all_id() def get_all_hi(self) -> OrderedDict: f = self.handler if not hasattr(f, "sym_block"): return OrderedDict() else: data_block = f.sym_block[0] sti_data = OrderedDict() data_dict = [i for i in data_block if isinstance(i, defaultdict)] storm_id = [i for i in data_dict if i["type"] == "Storm ID"] info = [i for i in data_dict if i["type"] == "HDA"] for sid, inf in zip(storm_id, info): stid = sid["id"] sti_data[stid] = defaultdict() sti_data[stid]["current storm position"] = tuple([sid["x"], sid["y"]]) sti_data[stid]["POH"] = inf["POH"] sti_data[stid]["POSH"] = inf["POSH"] sti_data[stid]["MESH"] = inf["Max Size"] return sti_data def get_all_id(self) -> list: return list(self.info.keys()) def get_hail_param(self, storm_id: str) -> dict: out = dict(self.info[storm_id]) xy = out.get("current storm position") dist, az = xy2polar(*xy) lonlat = get_coordinate( dist, az * deg2rad, 0, self.handler.lon, self.handler.lat, h_offset=False ) out["position"] = tuple(lonlat) return out class _ProductParams(object): def __init__(self, ptype: int, param_bytes: bytes): self.buf = BytesIO(param_bytes) self.params = dict() map_func = {1: self._ppi, 2: self._rhi, 51: self._ppi} map_func[ptype]() self.buf.close() def _ppi(self): elev = np.frombuffer(self.buf.read(4), "f4")[0] self.params["elevation"] = elev def _rhi(self): azi = np.frombuffer(self.buf.read(4), "f4")[0] top = np.frombuffer(self.buf.read(4), "f4")[0] bot = np.frombuffer(self.buf.read(4), "f4")[0] self.params["azimuth"] = azi self.params["top"] = top self.params["bottom"] = bot def get_product_param(ptype: int, param_bytes: bytes) -> dict: return _ProductParams(ptype, param_bytes).params
[docs]class StandardPUP(RadarBase): # fmt: off dtype_corr = {1:'TREF', 2:'REF', 3:'VEL', 4:'SW', 5:'SQI', 6:'CPA', 7:'ZDR', 8:'LDR', 9:'RHO', 10:'PHI', 11:'KDP', 12:'CP', 14:'HCL', 15:'CF', 16:'SNRH', 17:'SNRV', 32:'Zc', 33:'Vc', 34:'Wc', 35:'ZDRc', 71:'RR', 72:'HGT', 73:'VIL', 74:'SHR', 75:'RAIN', 76:'RMS', 77:'CTR'} # fmt: on def __init__(self, file): self.f = prepare_file(file) self._parse() self._update_radar_info() self.stationlat = self.geo["lat"][0] self.stationlon = self.geo["lon"][0] self.radarheight = self.geo["height"][0] if self.name == "None": self.name = self.code del self.geo self.f.close() def _parse(self): header = np.frombuffer(self.f.read(32), SDD_header) if header["magic_number"] != 0x4D545352: raise RadarDecodeError("Invalid standard data") site_config = np.frombuffer(self.f.read(128), SDD_site) self.code = site_config["site_code"][0].decode().replace("\x00", "") self.geo = geo = dict() geo["lat"] = site_config["Latitude"] geo["lon"] = site_config["Longitude"] geo["height"] = site_config["ground_height"] task = np.frombuffer(self.f.read(256), SDD_task) self.task_name = task["task_name"][0].decode().replace("\x00", "") cut_num = task["cut_number"][0] self.scan_config = np.frombuffer(self.f.read(256 * cut_num), SDD_cut) ph = np.frombuffer(self.f.read(128), SDD_pheader) ptype = ph["product_type"][0] self.scantime = datetime.datetime.utcfromtimestamp(ph["scan_start_time"][0]) self.dtype = self.dtype_corr[ph["dtype_1"][0]] params = get_product_param(ptype, self.f.read(64)) radial_header = np.frombuffer(self.f.read(64), L3_radial) bin_length = radial_header["bin_length"][0] scale = radial_header["scale"][0] offset = radial_header["offset"][0] self.reso = radial_header["reso"][0] / 1000 self.start_range = radial_header["start_range"][0] / 1000 self.end_range = radial_header["max_range"][0] / 1000 data = list() azi = list() while True: buf = self.f.read(32) if not buf: break data_block = np.frombuffer(buf, L3_rblock) start_a = data_block["start_az"][0] nbins = data_block["nbins"][0] raw = np.frombuffer( self.f.read(bin_length * nbins), "u{}".format(bin_length) ) data.append(raw) azi.append(start_a) raw = np.vstack(data).astype(int) self.data_rf = np.ma.masked_not_equal(raw, 1) raw = np.ma.masked_less(raw, 5) self.data = (raw - offset) / scale self.el = params["elevation"] az = np.linspace(0, 360, raw.shape[0]) az += azi[0] az[az > 360] -= 360 self.azi = az * deg2rad # self.azi = np.deg2rad(azi) def get_data(self): dist = np.arange( self.start_range + self.reso, self.end_range + self.reso, self.reso ) lon, lat = get_coordinate( dist, self.azi, self.el, self.stationlon, self.stationlat ) hgt = ( height(dist, self.el, self.radarheight) * np.ones(self.azi.shape[0])[:, np.newaxis] ) da = DataArray(self.data, coords=[self.azi, dist], dims=["azimuth", "distance"]) ds = Dataset( {self.dtype: da}, attrs={ "elevation": self.el, "range": self.end_range, "scan_time": self.scantime.strftime("%Y-%m-%d %H:%M:%S"), "site_code": self.code, "site_name": self.name, "site_longitude": self.stationlon, "site_latitude": self.stationlat, "tangential_reso": self.reso, "task": self.task_name, }, ) ds["longitude"] = (["azimuth", "distance"], lon) ds["latitude"] = (["azimuth", "distance"], lat) ds["height"] = (["azimuth", "distance"], hgt) if self.dtype in ["VEL", "SW"]: ds["RF"] = (["azimuth", "distance"], self.data_rf) return ds