Source code for radar.utils.calculate.pattern

from abc import ABC, abstractmethod
import numpy as np
from typing import Tuple
import polars as pl

from radar.utils.typing import DataHeader, Angle
from radar.utils.calculate.convert import to_db
from radar.utils.typing.enums import PhaseUnit
from radar.utils.typing.validator import AngleBound


[docs] class Pattern(ABC): """Abstract base class defining the required interface for radar beam patterns."""
[docs] @abstractmethod def calculate_pattern(self, df: pl.DataFrame) -> pl.DataFrame: """Calculates the pattern's gain metrics and appends them to the DataFrame. Args: df (pl.DataFrame): The input spatial coordinate dataset. Returns: pl.DataFrame: The modified DataFrame including linear and dB gains. """ pass
[docs] class CustomPattern(Pattern): """A customizable data-driven antenna pattern built from empirical lookup data. Attributes: df (pl.DataFrame): The underlying reference pattern dataset. """ def __init__(self, df: pl.DataFrame): """Initializes the CustomPattern lookup table and calculates boundaries. Args: df (pl.DataFrame): Dataframe containing the reference pattern mapping. Must include azimuth, elevation, and linear gain columns. Raises: ValueError: If any required configuration headers are missing. """ required_init = [ DataHeader.AZIMUTH_DEG, DataHeader.ELEVATION_DEG, DataHeader.BEAM_GAIN_LINEAR, ] self._validate_presence(df, required_init) metrics = df.select( [ pl.col(DataHeader.AZIMUTH_DEG).min().alias("az_min"), pl.col(DataHeader.AZIMUTH_DEG).max().alias("az_max"), pl.col(DataHeader.ELEVATION_DEG).min().alias("el_min"), pl.col(DataHeader.ELEVATION_DEG).max().alias("el_max"), ] ) self._az_bound = AngleBound( ( Angle(metrics["az_min"][0], PhaseUnit.DEGREE), Angle(metrics["az_max"][0], PhaseUnit.DEGREE), ) ) self._el_bound = AngleBound( ( Angle(metrics["el_min"][0], PhaseUnit.DEGREE), Angle(metrics["el_max"][0], PhaseUnit.DEGREE), ) ) if DataHeader.BEAM_GAIN_DB not in df.columns: df = df.with_columns( pl.col(DataHeader.BEAM_GAIN_LINEAR) .map_batches(to_db) .alias(DataHeader.BEAM_GAIN_DB) ) self.df = df
[docs] def _validate_presence(self, df: pl.DataFrame, columns: list[str]) -> None: """Internal helper to ensure columns exist before executing operations. Args: df (pl.DataFrame): Target Polars DataFrame to inspect. columns (list[str]): List of expected column names. Raises: ValueError: If one or more columns are not present in the dataframe. """ missing = [col for col in columns if col not in df.columns] if missing: raise ValueError( f"Required columns missing from Polars DataFrame: {missing}" )
[docs] def calculate_pattern(self, df: pl.DataFrame) -> pl.DataFrame: """Maps input coordinates against the custom reference pattern. Performs strict shape, coordinate, and boundary matches before executing an inner join to assign calculated gains to incoming data rows. Args: df (pl.DataFrame): The spatial grid coordinates to compute patterns for. Returns: pl.DataFrame: Joined dataframe incorporating historical reference gains. Raises: ValueError: If required headers are missing, coordinate boundaries do not match, or coordinates are missing from the lookup reference. """ lookup_keys = [DataHeader.AZIMUTH_DEG, DataHeader.ELEVATION_DEG] self._validate_presence(df, lookup_keys) unique_az = df[DataHeader.AZIMUTH_DEG].unique() unique_el = df[DataHeader.ELEVATION_DEG].unique() actual_az_min, actual_az_max = unique_az.min(), unique_az.max() actual_el_min, actual_el_max = unique_el.min(), unique_el.max() # Validates that grid bounds align perfectly with the source data boundaries if ( actual_az_min != self._az_bound[0].deg or actual_az_max != self._az_bound[1].deg or actual_el_min != self._el_bound[0].deg or actual_el_max != self._el_bound[1].deg ): raise ValueError( f"Surface mismatch. Input corners must exactly match pattern corners: " f"Az ({self._az_bound[0].deg}, {self._az_bound[1].deg}), " f"El ({self._el_bound[0].deg}, {self._el_bound[1].deg})" ) expected_row_count = len(unique_az) * len(unique_el) if len(df) != expected_row_count: raise ValueError( f"Incomplete surface. Expected {expected_row_count} points " f"({len(unique_az)} az x {len(unique_el)} el), but got {len(df)}." ) result = df.join( self.df.select( [*lookup_keys, DataHeader.BEAM_GAIN_LINEAR, DataHeader.BEAM_GAIN_DB] ), on=lookup_keys, how="inner", ) if len(result) < len(df): missing_count = len(df) - len(result) raise ValueError( f"Surface mapping failed: {missing_count} coordinate pairs are " f"missing from the beam pattern lookup." ) return result
[docs] class Isotropic(Pattern): """An ideal isotropic antenna pattern radiating uniformly with 0 dB gain."""
[docs] def calculate_pattern(self, df: pl.DataFrame) -> pl.DataFrame: """Appends static isotropic gains (0 dB / 1.0 Linear) to the DataFrame. Args: df (pl.DataFrame): Input dataset. Returns: pl.DataFrame: Modified DataFrame with uniform gain entries. """ return df.with_columns( [ pl.lit(0).alias(DataHeader.BEAM_GAIN_DB), pl.lit(1).alias(DataHeader.BEAM_GAIN_LINEAR), ] )
[docs] class Cosine(Pattern): """A hemispherical cosine-power beam pattern model.""" def __init__(self, order: int = 1) -> None: """Initializes the Cosine model with a mathematical scaling power factor. Args: order (int, optional): The exponential factor modifying the cosine window. Higher values yield narrower main beams. Defaults to 1. """ self._order = order
[docs] def calculate_pattern(self, df: pl.DataFrame) -> pl.DataFrame: """Calculates directional cosine gains relative to boresight at (0,0). Args: df (pl.DataFrame): Input dataset containing `AZIMUTH_RAD` and `ELEVATION_RAD`. Returns: pl.DataFrame: Dataset containing appended cosine gain metrics. """ az = df.select(DataHeader.AZIMUTH_RAD).to_numpy() el = df.select(DataHeader.ELEVATION_RAD).to_numpy() # Typical implementation: cos(theta) where theta is the angle from boresight # Assuming boresight is at (0,0) cos_theta = np.cos(az) * np.cos(el) # Clip to 0 to ensure no back-lobes (hemispherical) mag_linear = np.maximum(0, cos_theta) ** self._order mag_db = to_db(mag_linear) return df.with_columns( [ pl.Series(DataHeader.BEAM_GAIN_DB, mag_db.ravel()), pl.Series(DataHeader.BEAM_GAIN_LINEAR, mag_linear.ravel()), ] )
[docs] class Gaussian(Pattern): """A mathematical Gaussian distribution beam pattern model.""" def __init__( self, beam_width: Tuple[Angle, Angle], ) -> None: """Initializes the Gaussian pattern with designated half-power beamwidths. Args: beam_width (Tuple[Angle, Angle]): Target sizing bounds configured as (Azimuth HPBW, Elevation HPBW). """ self._beam_width = beam_width
[docs] def calculate_pattern(self, df: pl.DataFrame) -> pl.DataFrame: """Calculates normal Gaussian scaling gains across spatial dimensions. Args: df (pl.DataFrame): Input dataset containing `AZIMUTH_RAD` and `ELEVATION_RAD`. Returns: pl.DataFrame: Dataset containing appended Gaussian gain metrics. """ bw_az, bw_el = self._beam_width[0].rad, self._beam_width[1].rad sigma_const = -4 * np.log(2) mag_linear = np.exp( sigma_const * ( (df.select(DataHeader.AZIMUTH_RAD).to_numpy() / bw_az) ** 2 + (df.select(DataHeader.ELEVATION_RAD).to_numpy() / bw_el) ** 2 ) ) mag_db = to_db(mag_linear) return df.with_columns( [ pl.Series(DataHeader.BEAM_GAIN_DB, mag_db.ravel()), pl.Series(DataHeader.BEAM_GAIN_LINEAR, mag_linear.ravel()), ] )
[docs] class Sinc(Pattern): """An analytical Sinc (cardinal sine) distribution beam pattern model.""" def __init__(self, beam_width: Tuple[Angle, Angle]) -> None: """Initializes the Sinc pattern with designated half-power beamwidths. Args: beam_width (Tuple[Angle, Angle]): Target sizing bounds configured as (Azimuth HPBW, Elevation HPBW). """ self._beam_width = beam_width
[docs] def calculate_pattern(self, df: pl.DataFrame) -> pl.DataFrame: """Calculates structural Sinc gains representing uniform aperture characteristics. Args: df (pl.DataFrame): Input dataset containing `AZIMUTH_RAD` and `ELEVATION_RAD`. Returns: pl.DataFrame: Dataset containing appended Sinc gain metrics. """ bw_az, bw_el = self._beam_width[0].rad, self._beam_width[1].rad az = df.select(DataHeader.AZIMUTH_RAD).to_numpy() el = df.select(DataHeader.ELEVATION_RAD).to_numpy() # Constant for Sinc Half-Power Beamwidth (HPBW) # 1.3915 is the value where sinc^2(x) = 0.5 k = 1.3915 * 2 # np.sinc in numpy is sin(pi*x)/(pi*x) arg_az = (k * az / bw_az) / np.pi arg_el = (k * el / bw_el) / np.pi mag_linear = np.abs(np.sinc(arg_az) * np.sinc(arg_el)) mag_db = to_db(mag_linear) return df.with_columns( [ pl.Series(DataHeader.BEAM_GAIN_DB, mag_db.ravel()), pl.Series(DataHeader.BEAM_GAIN_LINEAR, mag_linear.ravel()), ] )