"""
Module for intentional interpretation of data/scenarios. These are often
decisions being made about situations that are perhaps not universal but useful
in the context of snowex data and creating the database.
"""
import datetime
import warnings
import numpy as np
import pandas as pd
import pytz
from .utilities import get_logger
from.string_management import parse_none
[docs]
def is_point_data(columns):
"""
Searches the csv column names to see if the data set is point data,
which will have latitude or easting in the columns. If it is, return True
Args:
columns: List of dataframe columns
Return:
result: Boolean indicating if the data is point data
"""
result = False
# Check for point data which will contain this in the data not the header
if columns is not None and ('latitude' in columns or 'easting' in columns):
result = True
return result
[docs]
def manage_degree_values(v):
"""
Handle parsing of degree strings that may have special characters
Args:
v: value theoretically a degree
Returns:
"""
if isinstance(v, str) and v is not None:
# Remove any degrees symbols
v = v.replace('\u00b0', '')
v = v.replace('Â', '')
# Sometimes a range is used for the slope. Always pick the
# larger value
if '-' in v:
v = v.split('-')[-1]
if v.lower() == 'flat':
v = '0'
if v.isnumeric():
v = float(v)
return v
[docs]
def manage_degrees_keys(info):
"""
Manages and interprets string values relating to degrees. Removes
degrees symbols and interprets key word flat for slope.
Args:
info: Dictionary containing potential degrees entries to be converted
to numbers
Returns:
info: Modificed dictionary containing string numeric representations of keys
aspect and slope_angle
"""
# Manage degrees symbols
for k in ['aspect', 'slope_angle', 'air_temp']:
if k in info.keys():
v = info[k]
info[k] = manage_degree_values(v)
return info
[docs]
def manage_aspect(info):
"""
Manages when aspect is recorded in cardinal directions and converts it to
a degrees from North float.
Args:
info: Dictionary potentially containing key aspect. Converts cardinal
Returns:
info: Dictionary with any key named aspect converted to a float of degrees from north
"""
log = get_logger(__name__)
# Convert Cardinal dirs to degrees
if 'aspect' in info.keys():
aspect = info['aspect']
if aspect is not None and isinstance(aspect, str):
# Check for number of numeric values.
numeric = len([True for c in aspect if c.isnumeric()])
if numeric != len(aspect) and aspect is not None:
log.warning('Aspect recorded for site {} is in cardinal '
'directions, converting to degrees...'
''.format(info['site_id']))
deg = convert_cardinal_to_degree(aspect)
info['aspect'] = deg
return info
[docs]
def is_number(s):
try:
float(s) # Try to convert the string to a float
return True
except ValueError:
return False
[docs]
def convert_cardinal_to_degree(cardinal) -> float:
"""
Converts cardinal directions to degrees. Also removes any / or - that
might get used to say between two cardinal directions
e.g. S/SW turns into SSW which is interpreted as halfway between those
two directions allowing for 22.5 degree increments.
Args:
Cardinal: Letters representing cardinal direction
Returns:
degrees: Float representing cardinal direction in degrees from north
"""
dirs = [
'N',
'NNE',
'NE',
'ENE',
'E',
'ESE',
'SE',
'SSE',
'S',
'SSW',
'SW',
'WSW',
'W',
'WNW',
'NW',
'NNW']
# Manage extra characters separating composite dirs, make it all upper case
d = ''.join([c.upper() for c in cardinal if c not in '/-'])
# Go straight to degrees if numeric
if is_number(d):
degrees = float(d)
else:
# Assume West, East, South, Or North
if len(d) > 3:
d = d[0]
warnings.warn("Assuming {} is {}".format(cardinal, d))
if d in dirs:
i = dirs.index(d)
degrees = i * (360. / len(dirs))
else:
raise ValueError('Invalid cardinal direction {}!'.format(cardinal))
return degrees
[docs]
def manage_utm_zone(info):
"""
Manage the nuance of having a utm zone string sometimes and
then not being in the keys at all. If the utm_zone is in the
dictionary then convert it to an integer. Otherwise add with
assign None
Args:
info: Dictionary potentially carrying utm_zone
Returns:
info: Dictionary containing utm_zone
"""
if 'utm_zone' in info.keys():
info['utm_zone'] = int(''.join([c for c in info['utm_zone'] if c.isnumeric()]))
info['epsg'] = int(f"269{info['utm_zone']:02}")
elif 'epsg' in info.keys():
if info['epsg'] is not None:
info['utm_zone'] = int(str(info['epsg'])[-2:])
else:
info['utm_zone'] = None
info['epsg'] = None
return info
[docs]
def add_date_time_keys(data, in_timezone=None, out_timezone='UTC'):
"""
Convert string info from a date/time keys in a dictionary to date and time
objects and assign it back to the dictionary as date and time
Args:
data: dictionary containing either the keys date/time or two keys date
and time
in_timezone: String representing Pytz valid timezone of the data coming in
out_timezone: String representing Pytz valid timezone of the data being returned
Returns:
d: Python Datetime object
"""
keys = [k.lower() for k in data.keys()]
d = None
out_tz = pytz.timezone(out_timezone)
in_tz = None
# Convert timezones if it is provided
if in_timezone is not None:
in_tz = pytz.timezone(in_timezone)
# Otherwise assume incoming data is the same timezone
else:
raise ValueError("We did not recieve a valid in_timezone")
# Look for a single header entry containing date and time.
# This would handle key of 'datetime'
for k in data.keys():
kl = k.lower()
if 'date' in kl and 'time' in kl:
str_date = str(data[k].replace('T', '-'))
d = pd.to_datetime(str_date)
break
# If we didn't find date/time combined.
if d is None:
# Handle data dates and times
if 'date' in keys and 'time' in keys:
# Assume MMDDYY format
if len(data['date']) == 6:
dt = data['date']
# Put into YY-MM-DD
data['date'] = f'20{dt[-2:]}-{dt[0:2]}-{dt[2:4]}'
# Allow for nan time
data['time'] = parse_none(data['time'])
dstr = ' '.join([str(data[k]) for k in ['date', 'time']
if data[k] is not None])
d = pd.to_datetime(dstr)
elif 'date' in keys:
d = pd.to_datetime(data['date'])
# Handle gpr data dates
elif 'utcyear' in keys and 'utcdoy' in keys and 'utctod' in keys:
base = pd.to_datetime(
'{:d}-01-01 00:00:00 '.format(int(data['utcyear'])), utc=True)
# Number of days since january 1
d = int(data['utcdoy']) - 1
# Zulu time (time without colons)
time = str(data['utctod'])
hr = int(time[0:2]) # hours
mm = int(time[2:4]) # minutes
ss = int(time[4:6]) # seconds
ms = int(float('0.' + time.split('.')[-1]) * 1000) # milliseconds
delta = datetime.timedelta(
days=d,
hours=hr,
minutes=mm,
seconds=ss,
milliseconds=ms)
# This is the only key set that ignores in_timezone
d = base.astimezone(pytz.timezone('UTC')) + delta
# Avoid using in_timezone and UTC defined keys
in_timezone = None
d = d.astimezone(out_tz)
else:
raise ValueError(
'Data is missing date/time info!\n{}'.format(data))
if in_timezone is not None:
d = d.tz_localize(in_tz)
d = d.astimezone(out_tz)
else:
d.replace(tzinfo=out_tz)
data['date'] = d.date()
# Dont add time to a time that was nan or none
if 'time' not in data.keys():
data['time'] = d.timetz()
else:
if data['time'] is not None:
data['time'] = d.timetz()
return data
[docs]
def standardize_depth(depths, desired_format='snow_height', is_smp=False):
"""
Data that is a function of depth comes in 2 formats. Sometimes 0 is
the snow surface, sometimes 0 is the ground. This function standardizes it
for each profile. desired_format can be:
snow_height: Zero at the bottom of the data.
surface_datum: Zero at the top of the data and uses negative depths
(easier for plotting)
Args:
depths: Pandas series of depths in either format
desired_format: string indicating which format the data is in
is_smp: Boolean indicating which data this is, if smp then the data is
surface_datum but with positive depths
Returns:
new:
"""
log = get_logger(__name__)
max_depth = depths.max()
min_depth = depths.min()
new = depths.copy()
# How is the depth ordered
max_depth_at_top = depths.iloc[0] > depths.iloc[-1]
# Is the data in surface_datum already
bottom_is_negative = depths.iloc[-1] < 0
if desired_format == 'snow_height':
if is_smp:
log.info('Converting SMP depths to snow height format.')
new = (depths - max_depth).abs()
elif bottom_is_negative:
log.info('Converting depths in surface datum to snow height format.')
new = (depths + abs(min_depth))
elif desired_format == 'surface_datum':
if is_smp:
log.info('Converting SMP depths to surface datum format.')
new = depths.mul(-1)
elif not bottom_is_negative:
log.info('Converting depths in snow height to surface datum format.')
new = depths - max_depth
else:
raise ValueError('{} is an invalid depth format! Options are: {}'
''.format(', '.join(['snow_height', 'surface_datum'])))
return new
[docs]
def avg_from_multi_sample(layer, value_type):
"""
Our database entries sometimes have multiple values. We want to extract
those, cast them, average them and return the the value to be used as the main
value in the database
e.g.
layer = {density_a: 180, density_b: 200, density_c: nan}
result = 190
Args:
layer: layer dictionary (a single entry from a vertical profile)
value_type: string labeling type of data were looking for (density, dielectric constant..)
Returns:
result: Nan mean of the values found
"""
values = []
for k, v in layer.items():
if value_type in k:
# If the bool is not nan and is not empty
if str(v).lower() != 'nan' and bool(str(v).strip()):
values.append(float(v))
if values:
result = np.mean(np.array(values))
else:
result = np.nan
return result