Source code for snowex_db.upload.points

"""
Module for classes that upload single files to the database.
"""
from pathlib import Path
import logging
import geopandas as gpd
import pandas as pd
from geoalchemy2 import WKTElement

from snowexsql.tables import (
    Campaign, DOI, Instrument, MeasurementType, Observer, PointData,
    PointObservation
)

from insitupy.io.strings import StringManager

from .base import BaseUpload
from ..point_data import PointDataCollection, SnowExPointData


LOG = logging.getLogger("snowex_db.upload.points")


[docs] class DataValidationError(ValueError): pass
[docs] class PointDataCSV(BaseUpload): """ Class for submitting whole csv files of point data """ expected_attributes = [c for c in dir(PointData) if c[0] != '_'] TABLE_CLASS = PointData # Remapping for special keywords for snowdepth measurements MEASUREMENT_NAMES = {'mp': 'magnaprobe', 'm2': 'mesa', 'pr': 'pit ruler'} # Units to apply UNITS_MAP = { 'depth': 'cm', 'two_way_travel': 'ns', 'swe': 'mm', 'density': 'kg/m^3' } def __init__(self, session, profile_filename, timezone="US/Mountain", **kwargs): """ Args: session: SQLAlchemy session to use for the upload profile_filename: Path to the csv file to upload timezone: Timezone to assume for the data, defaults to "US/Mountain" **kwargs: doi instrument header_sep id campaign_name derived instrument_model comments observer name row_based_timezone instrument_map single_date """ super().__init__() self.filename = profile_filename self._session = session self._timezone = timezone self._doi = kwargs.get("doi") self._instrument = kwargs.get("instrument") # a map of measurement type to instrument name self._instrument_map = kwargs.get("instrument_map", {}) if self._instrument_map and self._instrument: raise ValueError( "Cannot provide both 'instrument' and 'instrument_map'. " "Please choose one." ) self._header_sep = kwargs.get("header_sep", ",") # Site ID/Name self._id = kwargs.get("id") self._campaign_name = kwargs.get("campaign_name") # Is this file for derived measurements self._derived = kwargs.get("derived", False) # SMP passed in self._instrument_model = kwargs.get("instrument_model") self._comments = kwargs.get("comments") # Observer name for the whole file self._observer = kwargs.get("observer") # assign name to each measurement if given self._name = kwargs.get("name") # Assign if details are row-based (generally for the SWE files) self._row_based_tz = kwargs.get("row_based_timezone", False) # TODO: what do we do here? if self._row_based_tz: in_timezone = None else: in_timezone = timezone # All observations are from the same date self._single_date = kwargs.get("single_date", False) # Read in data self.data = self._read(in_timezone=in_timezone) def _read(self, in_timezone=None): """ Read in the csv """ try: # TODO: row based crs, tz options data = PointDataCollection.from_csv( self.filename, timezone=self._timezone, header_sep=self._header_sep, site_id=self._id, campaign_name=self._campaign_name, units_map=self.UNITS_MAP, row_based_timezone=self._row_based_tz, primary_variable_file=Path(__file__).parent.joinpath( "../point_primary_variable_overrides.yaml" ), single_date=self._single_date, ) except pd.errors.ParserError as e: LOG.error(e) raise RuntimeError(f"Failed reading {self.filename}") return data
[docs] def build_data(self, series: SnowExPointData) -> gpd.GeoDataFrame: """ Build out the original dataframe with the metadata to avoid doing it during the submission loop. Removes all other main profile columns and assigns data_name as the value column Args: series: The object of a variable of point data Returns: df: Dataframe ready for submission """ # TODO: DRY up? df = series.df.copy() if df.empty: LOG.debug("df is empty, returning") return df variable = series.variable # The type of measurement df['type'] = [variable.code] * len(df) # Manage nans and nones for c in df.columns: df[c] = df[c].apply(lambda x: StringManager.parse_none(x)) df['value'] = df[variable.code].astype(float) if 'units' not in df.columns: unit_str = series.units_map.get(variable.code) df['units'] = [unit_str] * len(df) columns = df.columns.values # Clean up comments a bit if 'comments' in columns: df['comments'] = df['comments'].apply( lambda x: x.strip(' ') if isinstance(x, str) else x) # In case of SMP, pass comments in if self._comments is not None: df["comments"] = [self._comments] * len(df) # Fill in more optional overrides for column_name, param in [ ('instrument', self._instrument), ('doi', self._doi), ('instrument_model', self._instrument_model), ('observer', self._observer), ('name', self._name) ]: if column_name not in columns: df[column_name] = [param] * len(df) # Anywhere the instrument is None, use the instrument map # based on the measurement name if self._instrument_map and 'instrument' in df.columns: df['instrument'] = df['instrument'].fillna( df['type'].map(self._instrument_map) ) # Map the measurement names or default to original df["instrument"] = df['instrument'].map( lambda x: self.MEASUREMENT_NAMES.get(x.lower(), x) ) return df
[docs] def submit(self): """ Submit values to the db from dictionary. Manage how some profiles have multiple values and get submitted individual """ # Construct a dataframe with all metadata for series in self.data.series: df = self.build_data(series) # Grab each row, convert it to dict and join it with site info if not df.empty: c_observations = self._add_campaign_observation(df) measurement_types = self._add_measurement_types(df) all_records_map = [] for row in df.to_dict(orient="records"): row["geometry"] = WKTElement( str(f"POINT ({row['longitude']} {row['latitude']})"), srid=4326, ) all_records_map.append( self._add_entry(row, c_observations, measurement_types) ) self._session.bulk_insert_mappings(self.TABLE_CLASS, all_records_map) self._session.commit() # Mark all cached objects as expired self._session.expunge_all() else: # procedure to still upload metadata (sites, etc) LOG.warning( f'Point data file {self.filename} is empty.' )
def _observation_name_from_row(self, row): name = row.get("name") or row.get("pit_id") value = f"{name} {row['instrument']}" if row.get('instrument_model'): value = f"{value} {row['instrument_model']}" return value def _get_first_check_unique(self, df, key): """ Get the first entry for a given key if present in the dataframe and check if it is unique. If not, raise a DataValidationError """ unique_values = df.get(key, None) if unique_values is None: return None unique_values = df[key].unique() if len(unique_values) > 1: raise DataValidationError( f"Multiple values for {key} found: {unique_values}" ) return unique_values[0] def _add_campaign_observation(self, df) -> dict: """ Processes a DataFrame and adds unique entries of instruments, measurement types, campaigns, and observer. Parameters: df : pandas.DataFrame DataFrame containing relevant point data metadata information. Returns: dict A nested dictionary with primary keys of measurement names, and the secondary keys are dates of observations. Each value corresponds to an observation object or entry created in the session. """ c_observations = {} df["date"] = pd.to_datetime(df["datetime"]).dt.date # Group by our observation keys to add records uniquely into the database base_groups = ['instrument', 'instrument_model', 'name', 'date'] if 'pit_id' in df.columns: base_groups.append('pit_id') # Process each unique combination of keys (key) and its corresponding group (grouped_df) for keys, grouped_df in df.groupby(base_groups, dropna=False): # Add instrument instrument = self._check_or_add_object( self._session, Instrument, dict( name=self._get_first_check_unique(grouped_df, 'instrument'), model=self._get_first_check_unique(grouped_df, 'instrument_model'), ) ) # Check name is unique because we are adding ONE # campaign observation here self._get_first_check_unique(grouped_df, "name") if 'pit_id' in grouped_df.columns: self._get_first_check_unique(grouped_df, "pit_id") # Get the measurement name measurement_name = self._observation_name_from_row(grouped_df.iloc[0]) # Add doi doi_string = self._get_first_check_unique(grouped_df, "doi") if doi_string is not None: doi = self._check_or_add_object( self._session, DOI, dict(doi=doi_string) ) else: doi = None # Add campaign campaign_name = self._get_first_check_unique(grouped_df, "campaign") \ or self._campaign_name if campaign_name is None: raise DataValidationError("Campaign cannot be None") campaign = self._check_or_add_object( self._session, Campaign, dict(name=campaign_name) ) # Add observer observer_name = self._get_first_check_unique( grouped_df, "observer" ) or self._observer observer_name = observer_name or "unknown" observer = self._check_or_add_object( self._session, Observer, dict(name=observer_name) ) # Construct description string description = None if ["comments"] in grouped_df.columns.values: description = (description or "") + self._get_first_check_unique( grouped_df, "comments" ) if ["flags"] in grouped_df.columns.values: description = (description or "") + self._get_first_check_unique( grouped_df, "flags" ) date_obj = self._get_first_check_unique(grouped_df, "date") check_args = dict( date=date_obj, name=measurement_name, doi_id=doi.id, instrument_id=instrument.id, ) observation = self._check_or_add_object( self._session, PointObservation, check_args, object_kwargs=dict( **check_args, description=description, campaign_id=campaign.id, observers_id=observer.id, ) ) if measurement_name not in c_observations: c_observations[measurement_name] = {} c_observations[measurement_name][date_obj] = observation return c_observations def _add_measurement_types(self, df) -> dict: """ Adds unique measurement types from the points data to the database. Parameters: df: DataFrame Parsed CSV dataframe Returns: dict Dictionary with DB records mapped to measurement names. """ types = {} for name, grouped_df in df.groupby('type'): units = grouped_df.units.unique() if len(units) > 1: raise DataValidationError( f"Multiple units found for measurement type {name}: {units}" ) measurement_args = dict( name=name, units=units[0], derived=self._derived ) types[name] = self._check_or_add_object( self._session, MeasurementType, measurement_args, measurement_args ) return types def _add_entry(self, row: dict, observations: dict, measurement_types: dict) -> dict: """ Add a single point entry and map with the metadata. Args: row: (DataFrame) Row data to add observations: (dict) PointObservation created previously as metadata measurement_types: (dict) Measurement types created previously as metadata Returns: Added point data record object """ observation_name = self._observation_name_from_row(row) try: observation = observations[observation_name][row["date"]] except KeyError: raise RuntimeError( f"No corresponding PointObservation for {observation_name} and " f"date: {row['date']}" ) # Now that the other objects exist, create the entry new_entry = dict( datetime=row["datetime"], elevation=row.get('elevation', None), geom=row['geometry'], measurement_type_id=measurement_types[row["type"]].id, observation_id=observation.id, value=row["value"], ) return new_entry