import logging
from typing import List
import numpy as np
import pandas as pd
from insitupy.io.dates import DateTimeManager
from insitupy.io.locations import LocationManager
from insitupy.io.metadata import MetaDataParser
from insitupy.io.yaml_codes import YamlCodes
from insitupy.profiles.base import MeasurementData
from insitupy.profiles.metadata import ProfileMetaData
from insitupy.variables import MeasurementDescription
from timezonefinder import TimezoneFinder
from .point_metadata import PointSnowExMetadataParser
LOG = logging.getLogger(__name__)
[docs]
class SnowExPointData(MeasurementData):
OUT_TIMEZONE = "UTC"
META_PARSER = PointSnowExMetadataParser
def __init__(
self, variable: MeasurementDescription = None,
meta_parser: MetaDataParser = None,
row_based_timezone=False,
timezone=None,
single_date=False,
skip_format_df=False,
):
"""
Args:
See MeasurementData.__init__
row_based_timezone: does each row have a unique timezone implied
timezone: input timezone for the whole file
single_date: Dataset is from a single day
skip_format_df: Skip parsing shared columns
"""
self._row_based_timezone = row_based_timezone
self._in_timezone = timezone
self._timezonefinder = None
self._single_date = single_date
self._skip_format_df = skip_format_df
super().__init__(variable, meta_parser)
@property
def timezonefinder(self):
if self._timezonefinder is None:
self._timezonefinder = TimezoneFinder(in_memory=True)
return self._timezonefinder
[docs]
@staticmethod
def read_csv_dataframe(profile_filename, columns, header_position):
"""
Read in a profile file. Managing the number of lines to skip and
adjusting column names
Args:
profile_filename: Filename containing a manually measured
profile
columns: list of columns to use in dataframe
header_position: skiprows for pd.read_csv
Returns:
df: pd.dataframe contain csv data with desired column names
"""
# header=0 because docs say to if using skip rows and columns
# NOTE: Using the 'c' engine won't automatically detect any delimiters
# and won't parse any files that hava a non comma separator,
df = pd.read_csv(
profile_filename, header=0,
skiprows=header_position,
names=columns,
encoding='latin',
dtype=str, # treat all columns as strings to get weird date format,
engine='c',
)
if "flags" in df.columns:
# Max length of the flags column
df["flags"] = df["flags"].str.replace(" ", "")
return df
def _get_location(self, row):
"""
fill in the location info for a row
Args:
row: pandas row
"""
try:
lat, lon, *_ = LocationManager.parse(row)
except ValueError:
if self.metadata is not None:
LOG.warning(
f"Row {row.name} does not have a valid location. "
"Attempting to use header metadata."
)
lat, lon = self.metadata.latitude, self.metadata.longitude
else:
raise RuntimeError("No valid location found in row or metadata.")
return lat, lon
def _get_datetime(self, row):
"""
fill in the datetime info for a row
Args:
row: pandas row
"""
tz = self._in_timezone
if self._row_based_timezone:
# Look up the timezone for the location and apply that
timezone_str = self.timezonefinder.timezone_at(
lat=row["latitude"], lng=row["longitude"]
)
tz = timezone_str # e.g., 'America/Denver'
try:
datetime = None
# In case we found a date entry that has date and time
if row.get(YamlCodes.DATE_TIME) is not None:
str_date = str(
row[YamlCodes.DATE_TIME].replace('T', '-')
)
datetime = pd.to_datetime(str_date)
if datetime is None:
datetime = DateTimeManager.handle_separate_datetime(row)
result = DateTimeManager.adjust_timezone(
datetime,
in_timezone=tz,
out_timezone=self.OUT_TIMEZONE
)
except ValueError as e:
if self.metadata is not None:
result = self.metadata.date_time
else:
raise e
return result
def _format_df(self):
"""
Format the incoming df with the column headers and other info we want
This will filter to a single measurement as well as the expected
shared columns like depth
"""
self._set_column_mappings()
# If the variable is real (not -1), check columns
if self.variable.code != "-1":
# Verify the sample column exists and rename to variable
self._check_sample_columns()
if self._skip_format_df:
return
# If we do not have a geometry column, we need to parse
# the raw df, otherwise we assume this has been done already,
# likely on the first read of the file
# Get the campaign name
if "campaign" not in self._df.columns:
self._df["campaign"] = self._df.get(YamlCodes.SITE_NAME)
# TODO: How do we speed this up?
# campaign should be very quick with a df level logic
# but the other ones will take morelogic
# parse the location
self._df[["latitude", "longitude"]] = self._df.apply(
self._get_location, axis=1, result_type="expand"
)
# If the datetime isn't already parsed, parse it
if (
"datetime" in self._df.columns.tolist()
and pd.api.types.is_datetime64_any_dtype(
self._df["datetime"]
)
):
LOG.debug("not parsing date")
else:
# Parse the datetime
if self._single_date:
self._df["datetime"] = DateTimeManager.handle_separate_datetime(
self._df.iloc[0]
)
else:
self._df["datetime"] = self._df.apply(
self._get_datetime, axis=1, result_type="expand"
)
self._df = self._df.replace(-9999, np.NaN)
[docs]
class PointDataCollection:
"""
This could be a collection of profiles
"""
DATA_CLASS = SnowExPointData
def __init__(self, series: List[SnowExPointData], metadata: ProfileMetaData):
self._series = series
self._metadata = metadata
@property
def metadata(self) -> ProfileMetaData:
return self._metadata
@property
def series(self) -> List[SnowExPointData]:
return self._series
@classmethod
def _read_csv(
cls,
fname,
meta_parser: PointSnowExMetadataParser,
timezone=None,
row_based_timezone=False,
single_date=False,
) -> List[SnowExPointData]:
"""
Args:
fname: path to csv
meta_parser: parser for the metadata
timezone: input timezone
row_based_timezone: is the timezone row based?
single_date: All observations are from single date
Returns:
a list of ProfileData objects
"""
# parse the file for metadata before parsing the individual
# variables
all_file = cls.DATA_CLASS(
variable=None, # we do not have a variable yet
meta_parser=meta_parser,
timezone=timezone,
row_based_timezone=row_based_timezone,
single_date=single_date,
)
all_file.from_csv(fname)
result = []
shared_column_options = [
# TODO: could we make this a 'shared' option in the definition
meta_parser.primary_variables.entries["CAMPAIGN"],
meta_parser.primary_variables.entries["COMMENTS"],
meta_parser.primary_variables.entries["DATE"],
meta_parser.primary_variables.entries["DATETIME"],
meta_parser.primary_variables.entries["EASTING"],
meta_parser.primary_variables.entries["ELEVATION"],
meta_parser.primary_variables.entries["FLAGS"],
meta_parser.primary_variables.entries["FREQUENCY"],
meta_parser.primary_variables.entries["INSTRUMENT"],
meta_parser.primary_variables.entries["INSTRUMENT_MODEL"],
meta_parser.primary_variables.entries["LATITUDE"],
meta_parser.primary_variables.entries["LONGITUDE"],
meta_parser.primary_variables.entries["NORTHING"],
meta_parser.primary_variables.entries["PIT_ID"],
meta_parser.primary_variables.entries["TIME"],
meta_parser.primary_variables.entries["UTCDOY"],
meta_parser.primary_variables.entries["UTCTOD"],
meta_parser.primary_variables.entries["UTCYEAR"],
meta_parser.primary_variables.entries["UTM_ZONE"],
]
shared_columns = [
c for c, v in all_file.meta_columns_map.items()
if v in shared_column_options
]
variable_columns = [
c for c in all_file.meta_columns_map.keys() if c not in shared_columns
]
# Filter out ignore columns
variable_columns = [
v for v in variable_columns
if all_file.meta_columns_map[v].code != "ignore"
]
# Create an object for each measurement
for column in variable_columns:
points = cls.DATA_CLASS(
variable=all_file.meta_columns_map[column],
meta_parser=meta_parser,
timezone=timezone,
row_based_timezone=row_based_timezone,
skip_format_df=True,
)
drop_columns = [
variable for variable in variable_columns if variable != column
]
points.df = all_file.df.copy()
points.df.drop(columns=drop_columns, inplace=True)
# --------
result.append(points)
return result, all_file.metadata
[docs]
@classmethod
def from_csv(
cls,
fname,
timezone="US/Mountain",
header_sep=",",
site_id=None,
campaign_name=None,
allow_map_failure=False,
units_map=None,
row_based_timezone=False,
metadata_variable_file=None,
primary_variable_file=None,
single_date=False,
):
"""
Find all variables in a single csv file
Args:
fname: path to file
timezone: expected timezone in file
header_sep: header sep in the file
site_id: Site id override for the metadata
campaign_name: Campaign.name override for the metadata
allow_map_failure: allow metadata and column unknowns
units_map: units map for the metadata
row_based_timezone: is the timezone row based
metadata_variable_file: list of files to override the metadata
variables
primary_variable_file: list of files to override the
primary variables
single_date: This dataset collection is from a single date
Returns:
This class with a collection of profiles and metadata
"""
# parse multiple files and create an iterable of ProfileData
meta_parser = PointSnowExMetadataParser(
timezone, primary_variable_file, metadata_variable_file,
header_sep=header_sep, _id=site_id,
campaign_name=campaign_name, allow_map_failures=allow_map_failure,
units_map=units_map,
)
# read in the actual data
profiles, metadata = cls._read_csv(
fname,
meta_parser,
timezone=timezone,
row_based_timezone=row_based_timezone,
single_date=single_date,
)
# ignore profiles with the name 'ignore'
profiles = [
p for p in profiles if
# Keep the profile if it is None because we need the metadata
(p.variable is None or p.variable.code != "ignore")
]
return cls(profiles, metadata)