Source code for snowex_db.upload.cog_handler

import os
from subprocess import STDOUT, check_output
from pathlib import Path
from os.path import exists, join
from os import makedirs, remove
import boto3
import logging

LOG = logging.getLogger(__name__)


[docs] class COGHandler: """ Class to convert TIFs to COGs, persist them, and generate the command to insert them into the db """ AWS_REGION = os.getenv("AWS_DEFAULT_REGION", "us-west-2") def __init__(self, tif_file, s3_bucket="m3w-snowex", s3_prefix="cogs", cog_dir="./snowex_cog_storage", use_s3=True): """ Args: tif_file: local or abs bath to file that will be persisted s3_bucket: optional s3 bucket name s3_prefix: optional s3 bucket prefix cog_dir: option local directory for storing cog files use_s3: boolean whether or not we persist files in S3 """ self.tif_file = Path(tif_file) self.s3_bucket = s3_bucket self.s3_prefix = s3_prefix self.tmp_dir = Path(cog_dir).expanduser().absolute() self.use_s3 = use_s3 if not self.tmp_dir.exists(): LOG.info(f"Making directory {self.tmp_dir}") makedirs(self.tmp_dir) # state variables self._cog_path = None self._cog_uri = None self._key_name = None self._sql_path = None
[docs] def create_cog(self, nodata=None): """ Create a cloud optimized geotif from tif Args: nodata: no data value Returns: path to COG """ cmd = [ "gdal_translate", "-co", "COMPRESS=DEFLATE", "-co", "ZLEVEL=9", # Use highest compression "-co", "PREDICTOR=2", # Compression predictor "-co", "TILED=YES", # Apply default (256x256) tiling ] if nodata is not None: cmd += ["-a_nodata", f"{nodata}"] output_file = Path(self.tmp_dir)\ .joinpath(self.tif_file.name)\ .with_suffix(".tif") cmd += [ str(self.tif_file), # Input file str(output_file) # Output file ] LOG.info('Executing: {}'.format(' '.join(cmd))) check_output(cmd, stderr=STDOUT).decode('utf-8') self._cog_path = output_file return output_file
def _remove_cog(self): """ Delete COG file. This should be used of the files are persisted in S3 """ if self._cog_path.exists(): remove(str(self._cog_path)) else: raise RuntimeError( f"Cannot remove the COG {self._cog_path}" f" because it does not exist" )
[docs] def persist_cog(self): """ persist COG either locally or in S3 Returns: S3 or local path """ if exists(self._cog_path): if self.use_s3: self._key_name = join(self.s3_prefix, self._cog_path.name) LOG.info(f'Uploading {self._cog_path} to {self.s3_bucket}/{self._key_name}') s3 = boto3.resource('s3', region_name=self.AWS_REGION) s3.meta.client.upload_file( str(self._cog_path), # local file self.s3_bucket, # bucket name self._key_name # key name ) result = Path(self.s3_bucket).joinpath(self._key_name) # delete cog since it is stored in S3 self._remove_cog() else: # COG is already stored locally result = self._cog_path else: raise RuntimeError( f"Cannot upload COG {self._cog_path}" f" because it does not exist" ) self._sql_path = result return result
[docs] def to_sql_command(self, epsg, no_data=None): """ Generate command to insert into database Args: epsg: string EPSG no_data: optional nodata value Returns: list of raster2pgsql command """ # This produces a PSQL command with auto tiling if self.use_s3: cog_path = Path("/vsis3").joinpath(self._sql_path) else: cog_path = self._sql_path cmd = [ 'raster2pgsql', '-s', str(epsg), # '-I', '-t', '256x256', '-R', str(cog_path), ] # If nodata applied: if no_data is not None: cmd.append('-N') cmd.append(str(no_data)) return cmd