Source code for snowex_db.upload.layers

"""
Module for classes that upload single files to the database.
"""

import logging
from pathlib import Path
from typing import List, Union

import geopandas as gpd
import pandas as pd
from geoalchemy2 import WKTElement

from insitupy.io.strings import StringManager
from insitupy.campaigns.snowex import SnowExProfileData
from snowexsql.tables import (
    Campaign,
    DOI,
    Instrument,
    LayerData,
    MeasurementType,
    Observer,
    Site,
)
from .base import BaseUpload
from .batch import BatchBase
from ..metadata import SnowExProfileMetadata
from ..profile_data import ExtendedSnowExProfileDataCollection
from ..utilities import get_logger

LOG = logging.getLogger("snowex_db.upload.layers")


[docs] class DataValidationError(ValueError): pass
[docs] class UploadProfileData(BaseUpload): """ Class for submitting a single profile. Since layers are uploaded layer by layer this allows for submitting them one file at a time. """ expected_attributes = [c for c in dir(LayerData) if c[0] != "_"] TABLE_CLASS = LayerData INSERT_BATCH_SIZE = 10_000 def __init__( self, session, filename: Union[str, Path], timezone: str = "US/Mountain", **kwargs, ): """ Arguments: session: The DB session object filename (Union[str, Path]): The path to the profile file. timezone (str): The timezone used, default is "US/Mountain". kwargs: Additional optional keyword arguments related to the profile. doi (str): Digital Object Identifier instrument (str): Name of the instrument used in the collection. header_sep (str): Delimiter for separating values in the header. Default is ','. id (str): Identifier for the profile data file. campaign_name (str): The name of the campaign. derived (bool): Indicates if the file contains derived measurements. Default False. instrument_model (str): Instrument name. comments (str): Additional comments. """ super().__init__() self.log = get_logger(__name__) self.filename = filename self._session = session self._timezone = timezone # Optional information self._doi = kwargs.get("doi") self._header_sep = kwargs.get("header_sep", ",") # Is this file for derived measurements self._derived = kwargs.get("derived", False) # Metadata overwrites # TODO - Rename this to site_id self._id = kwargs.get("id") self._campaign_name = kwargs.get("campaign_name") self._instrument = kwargs.get("instrument") self._instrument_model = kwargs.get("instrument_model") self._comments = kwargs.get("comments", "") # Read in data self.data = self._read() def _read(self) -> ExtendedSnowExProfileDataCollection: """ Read in a profile file. Managing the number of lines to skip and adjusting column names Returns: list of ProfileData objects """ try: return ExtendedSnowExProfileDataCollection.from_csv( filename=self.filename, timezone=self._timezone, header_sep=self._header_sep, site_id=self._id, campaign_name=self._campaign_name, metadata_variable_file=Path(__file__).parent.joinpath( "../metadata_variable_overrides.yaml" ), primary_variable_file=Path(__file__).parent.joinpath( "../profile_primary_variable_overrides.yaml" ), ) except pd.errors.ParserError as e: LOG.error(e) raise RuntimeError(f"Failed reading {self.filename}")
[docs] def build_data(self, profile: SnowExProfileData) -> gpd.GeoDataFrame: """ Build out the original dataframe with the metadata to avoid doing it during the submission loop. Removes all other main profile columns and assigns data_name as the value column Args: profile: The object of a single profile Returns: df: Dataframe ready for submission """ if profile.df is None or len(profile.df) == 0: LOG.debug("df is empty, returning") return gpd.GeoDataFrame() metadata = profile.metadata variable = profile.variable df = profile.df.copy() # The type of measurement df["type"] = [variable.code] * len(df) # Manage nans and nones df["value"] = df[variable.code].astype(str) for c in df.columns: df[c] = df[c].apply(lambda x: StringManager.parse_none(x)) if "units" not in df.columns: unit_str = profile.units_map.get(variable.code) df["units"] = [unit_str] * len(df) columns = df.columns.values # Clean up comments a bit if "comments" in columns: df["value"] = df["value"].apply( lambda x: x.strip(" ") if isinstance(x, str) else x ) # Add flags to the comments. flag_string = metadata.flags if flag_string: flag_string = " Flags: " + flag_string if "comments" in columns: df["comments"] += flag_string else: df["comments"] = flag_string return df
[docs] def submit(self): """ Submit values to the DB. Can handle multiple profiles and uses information supplied in the constructor. """ # Construct a dataframe with all metadata for profile in self.data.profiles: df = self.build_data(profile) # Grab each row, convert it to dict and join it with site info if not df.empty: # Metadata for all layers campaign, observer_list, site = self._add_metadata(profile.metadata) instrument = None if "instrument" not in df.columns.values: instrument = self._add_instrument(profile.metadata) # Skip empty records df_filtered = df[df["value"].notna()] # for the cases when all rows are None after filtering if df_filtered.empty: self.log.warning( "File contains data rows but no valid data after" " filtering. Skipping row submissions." ) continue all_records_map = [ self._add_entry(row, campaign, observer_list, site, instrument) for row in df_filtered.to_dict(orient="records") ] # Process records in batches for i in range(0, len(all_records_map), self.INSERT_BATCH_SIZE): batch = all_records_map[i : i + self.INSERT_BATCH_SIZE] if not batch: continue self._session.bulk_insert_mappings(self.TABLE_CLASS, batch) self._session.commit() # Mark all cached objects as expired self._session.expunge_all() else: # procedure to still upload metadata (sites, etc) self.log.warning( "File contains header but no data which is sometimes" " expected. Skipping row submissions, and only inserting" " metadata." ) self._add_metadata(profile.metadata, update=True)
def _add_metadata(self, metadata: SnowExProfileMetadata, update=False): """ Add the metadata entry and return objects to associate with each row. Args: metadata: ProfileMetadata information update: Update the metadata if it exists. Returns: Record objects that were inserted. """ # Campaign record campaign = self._check_or_add_object( self._session, Campaign, dict(name=metadata.campaign_name) ) # List of observers records observer_list = [] observer_names = metadata.observers or [] for obs_name in observer_names: observer = self._check_or_add_object( self._session, Observer, dict(name=obs_name) ) observer_list.append(observer) # DOI record if self._doi is not None: doi = self._check_or_add_object(self._session, DOI, dict(doi=self._doi)) else: doi = None # Datetime from metadata dt = metadata.date_time # Geometry from metadata geom = WKTElement( f"Point ({metadata.longitude} {metadata.latitude})", srid=4326 ) # Combine found comments and passed in comments to this class comments = "; ".join( [ comment for comment in [metadata.comments, self._comments] if comment is not None ] ) # Site record site_id = metadata.site_name site = self._check_or_add_object( self._session, Site, dict(name=site_id), object_kwargs=dict( air_temp=metadata.air_temp, aspect=metadata.aspect, campaign=campaign, comments=comments, datetime=dt, doi=doi, geom=geom, ground_condition=metadata.ground_condition, ground_roughness=metadata.ground_roughness, ground_vegetation=metadata.ground_vegetation, name=site_id, observers=observer_list, precip=metadata.precip, sky_cover=metadata.sky_cover, slope_angle=metadata.slope, total_depth=metadata.total_depth, tree_canopy=metadata.tree_canopy, vegetation_height=metadata.vegetation_height, weather_description=metadata.weather_description, wind=metadata.wind, ), update=update, ) return campaign, observer_list, site def _add_instrument(self, metadata: SnowExProfileMetadata): """ Add or lookup an instrument in the DB. Args: session: The database session metadata: SnowExProfileMetadata object Returns: Instrument DB record """ # Give priority to passed information from kwargs instrument_name = self._instrument or metadata.instrument instrument_model = self._instrument_model or metadata.instrument_model return self._check_or_add_object( self._session, Instrument, dict(name=instrument_name, model=instrument_model), ) def _add_entry( self, row: dict, campaign: Campaign, observer_list: List[Observer], site: Site, instrument: Instrument, ): """ Args: row: dataframe row of data to add campaign: Campaign object inserted into db observer_list: List of Observers inserted into db site: the Site inserted into db instrument: Instrument found in metadata Returns: """ # An instrument associated with a row has precedence over the # given via arguments if row.get("instrument") is not None: instrument = self._check_or_add_object( self._session, Instrument, dict(name=row["instrument"], model=row["instrument_model"]), ) # Add measurement type measurement_type = row["type"] measurement_obj = self._check_or_add_object( self._session, # Add units and 'derived' flag for the measurement MeasurementType, dict(name=measurement_type, units=row["units"], derived=self._derived), ) # Create a dictionary for bulk insert new_entry = dict( depth=row["depth"], bottom_depth=row.get("bottom_depth"), value=row["value"], instrument_id=instrument.id, measurement_type_id=measurement_obj.id, site_id=site.id, ) return new_entry
[docs] class UploadProfileBatch(BatchBase): """ Class for submitting multiple files of profile type data. """ UploaderClass = UploadProfileData