Source code for smos.interface

# -*- coding: utf-8 -*-
# The MIT License (MIT)
#
# Copyright (c) 2021, TU Wien
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import os
import warnings
import numpy as np
import pandas as pd

from pynetcf.time_series import GriddedNcOrthoMultiTs
from pygeogrids.netcdf import load_grid

from pygeobase.io_base import ImageBase, MultiTemporalImageBase
from pygeobase.object_base import Image
from datetime import timedelta, datetime
from netCDF4 import Dataset, date2num, num2date
from smos.grid import EASE25CellGrid
from smos import dist_name as subdist_name
from smos import dist_name, __version__
from collections import OrderedDict


[docs] class SMOSTs(GriddedNcOrthoMultiTs): _t0_vars = {'sec': 'UTC_Seconds', 'days': 'Days'} _t0_unit = 'days since 2000-01-01' def __init__(self, ts_path, grid_path=None, index_add_time=False, drop_missing=True, **kwargs): """ Class for reading SMOS time series after reshuffling images. Missing images are represented in time series as lines where all variables are NaN. Parameters ---------- ts_path : str Directory where the netcdf time series files are stored grid_path : str, optional (default: None) Path to grid file, that is used to organize the location of time series to read. If None is passed, grid.nc is searched for in the ts_path. index_add_time : bool, optional (default: False) Add overpass time stamps to the data frame index. This needs the 'Days' and 'UTC_Seconds' variable available in the time series files. drop_missing : bool, optional (default: True) Drop Lines in TS where ALL variables are missing. Optional keyword arguments that are passed to the Gridded Base: ------------------------------------------------------------------------ parameters : list, optional (default: None) Specific variable names to read, if None are selected, all are read. offsets : dict, optional (default:None) Offsets (values) that are added to the parameters (keys) scale_factors : dict, optional (default:None) Offset (value) that the parameters (key) is multiplied with ioclass_kws: dict Optional keyword arguments to pass to OrthoMultiTs class: ---------------------------------------------------------------- read_bulk : boolean, optional (default:False) if set to True the data of all locations is read into memory, and subsequent calls to read_ts read from the cache and not from disk this makes reading complete files faster# read_dates : boolean, optional (default:False) if false dates will not be read automatically but only on specific request useable for bulk reading because currently the netCDF num2date routine is very slow for big datasets autofill : bool, (default: True) Fill missing values with nans """ if grid_path is None: grid_path = os.path.join(ts_path, "grid.nc") self.drop_missing = drop_missing grid = load_grid(grid_path) super(SMOSTs, self).__init__(ts_path, grid, **kwargs) self.index_add_time = index_add_time if (self.parameters is not None) and self.index_add_time: for v in self._t0_vars.values(): self.parameters.append(v) def _to_datetime(self, df:pd.DataFrame) -> pd.DataFrame: # convert Days and UTC_Seconds to actual datetimes units = self._t0_unit df['_date'] = df.index.values if (self._t0_vars['days'] not in df.columns) or \ (self._t0_vars['sec'] not in df.columns): raise KeyError(f"Could not find {self._t0_vars['days']} or {self._t0_vars['sec']} " f"in reshuffled data.") # days + (seconds / seconds per day) num = df[self._t0_vars['days']].dropna() + (df[self._t0_vars['sec']].dropna() / 86400.) if len(num) == 0: df.loc[num.index, '_datetime_UTC'] = [] else: df.loc[num.index, '_datetime_UTC'] = \ pd.DatetimeIndex(num2date(num.values, units=units, calendar='standard', only_use_cftime_datetimes=False)) df = df.set_index('_datetime_UTC') df = df[df.index.notnull()] return df
[docs] def read(self, *args, **kwargs): """ Read time series by grid point index, or by lonlat. Convert columns to ints if possible (if there are no Nans in it). Parameters ---------- lon: float Location longitude lat : float Location latitude .. OR gpi : int Grid point Index Returns ------- df : pd.DataFrame Time Series data at the selected location """ ts = super(SMOSTs, self).read(*args, **kwargs) if self.drop_missing: ts = ts.dropna(how='all') for col in ts: # convert to ints, if possible if (not np.isnan(ts[col]).any()) and \ (np.all(np.mod(ts[col].values, 1) == 0.)): ts[col] = ts[col].astype(int) if self.index_add_time: ts = self._to_datetime(ts) return ts
[docs] class SMOSImg(ImageBase): """ Class for reading one SMOS netcdf image file. Parameters ---------- filename: str filename of the SMOS nc image file mode: str, optional (default: 'r') mode of opening the file, only 'r' is implemented at the moment parameters : str or list, optional (default: None) one or list of parameters to read. We add 'Quality_Flags' if 'read_flags' is not None. All parameters are described in docs/varnames.rst. If None are passed, all parameters are read. flatten: bool, optional (default: False) If set then the data is read into 1D arrays. This is used to e.g reshuffle the data. grid : pygeogrids.CellGrid, optional (default: EASE25CellGrid) Grid that the image data is organised on, by default the global EASE25 grid is used. read_flags : tuple or None, optional (default: (0, 1)) Filter values to read based on the selected QUALITY_FLAGS. Values for locations that are not assigned any of the here passed flags are replaces with NaN (by default only the missing-data, i.e. flag=2, locations are filtered out). If None is passed, no flags are considered. float_fillval : float or None, optional (default: np.nan) Fill Value for masked pixels, this is only applied to float variables. Therefore e.g. mask variables are never filled but use the fill value as in the data. """ def __init__(self, filename, mode='r', parameters=None, flatten=False, grid=EASE25CellGrid(bbox=None), read_flags=(0, 1), float_fillval=np.nan): super(SMOSImg, self).__init__(filename, mode=mode) if parameters is None: parameters = [] if type(parameters) != list: parameters = [parameters] self.read_flags = read_flags self.parameters = parameters self.flatten = flatten self.grid = grid self.image_missing = False self.img = None # to be loaded self.glob_attrs = None self.float_fillval = float_fillval
[docs] def get_global_attrs(self, exclude=('history', 'NCO', 'netcdf_version_id', 'contact', 'institution', 'creation_time')): return {k: v for k, v in self.glob_attrs.items() if k not in exclude}
def _read_empty(self) -> (dict, dict): """ Create an empty image for filling missing dates, this is necessary for reshuffling as img2ts cannot handle missing days. Returns ------- empty_img : dict Empty arrays of image size for each object parameter """ self.image_missing = True return_img = {} return_metadata = {} try: rows, cols = self.grid.subset_shape except AttributeError: rows, cols = self.grid.shape except ValueError: # Flat grid case rows = self.grid.subset_shape cols = None # Create shape based on grid type shape = rows if cols is None else (rows, cols) for param in self.parameters: data = np.full(shape, np.nan) return_img[param] = data.flatten() return_metadata[param] = {'image_missing': 1} return return_img, return_metadata def _read_img(self) -> (dict, dict): raise NotImplementedError
[docs] def read(self, timestamp): """ Read a single SMOS image, if it exists, otherwise fill an empty image Parameters -------- timestamp : datetime Time stamp for the image to read. """ if timestamp is None: raise ValueError("No time stamp passed") try: return_img, return_metadata = self._read_img() except IOError: warnings.warn('Error loading image for {}, ' 'generating empty image instead'.format(timestamp.date())) return_img, return_metadata = self._read_empty() if self.flatten: self.img = Image(self.grid.activearrlon, self.grid.activearrlat, return_img, return_metadata, timestamp) else: try: shape = self.grid.subset_shape except AttributeError: shape = self.grid.shape rows, cols = shape for key in return_img: return_img[key] = np.flipud(return_img[key].reshape(rows, cols)) self.img = Image(self.grid.activearrlon.reshape(rows, cols), np.flipud(self.grid.activearrlat.reshape(rows, cols)), return_img, return_metadata, timestamp) return self.img
[docs] def write(self, image, **kwargs): """ Write the image to a separate output path. E.g. after reading only a subset of the parameters, or when reading a spatial subset (with a subgrid). If there is already a file, the new image is appended along the time dimension. Parameters ---------- image : str Path to netcdf file to create. kwargs Additional kwargs are given to netcdf4.Dataset """ if self.img is None: raise IOError("No data found for current image, load data first") if self.img.timestamp is None: raise IOError("No time stamp found for current image.") lons = np.unique(self.img.lon.flatten()) lats = np.flipud(np.unique(self.img.lat.flatten())) mode = 'w' if not os.path.isfile(image) else 'a' ds = Dataset(image, mode=mode, **kwargs) ds.set_auto_scale(True) ds.set_auto_mask(True) units = 'Days since 2000-01-01 00:00:00' if mode == 'w': ds.createDimension('timestamp', None) # stack dim ds.createDimension('lat', len(lats)) ds.createDimension('lon', len(lons)) # this is not the obs time, but an image time stamp ds.createVariable('timestamp', datatype=np.double, dimensions=('timestamp',), zlib=True, chunksizes=None) ds.createVariable('lat', datatype='float64', dimensions=('lat',), zlib=True) ds.createVariable('lon', datatype='float64', dimensions=('lon',), zlib=True) ds.variables['timestamp'].setncatts({'long_name': 'timestamp', 'units': units}) ds.variables['lat'].setncatts({'long_name': 'latitude', 'units': 'Degrees_North', 'valid_range': (-90, 90)}) ds.variables['lon'].setncatts({'long_name': 'longitude', 'units': 'Degrees_East', 'valid_range': (-180, 180)}) ds.variables['lon'][:] = lons ds.variables['lat'][:] = lats ds.variables['timestamp'][:] = np.array([]) this_global_attrs = \ OrderedDict([('subset_img_creation_time', str(datetime.now())), ('subset_img_bbox_corners_latlon', str(self.grid.bbox)), ('subset_software', f"{dist_name} | {subdist_name} | {__version__}")]) glob_attrs = self.glob_attrs for k in ['ease_global', 'history', 'creation_time', 'NCO']: try: glob_attrs.pop(k) except KeyError: continue glob_attrs.update(this_global_attrs) ds.setncatts(glob_attrs) idx = ds.variables['timestamp'].shape[0] ds.variables['timestamp'][idx] = date2num(self.img.timestamp, units=units) for var, vardata in self.img.data.items(): if var not in ds.variables.keys(): ds.createVariable(var, vardata.dtype, dimensions=('timestamp', 'lat', 'lon'), zlib=True, complevel=6) ds.variables[var].setncatts(self.img.metadata[var]) ds.variables[var][-1] = vardata ds.close()
[docs] def read_masked_data(self, **kwargs): raise NotImplementedError
[docs] def flush(self): pass
[docs] def close(self): pass
[docs] class SMOSDs(MultiTemporalImageBase): """ Class for reading SMOS images in nc format. Images are orgnaised in subdirs for each year. Parameters ---------- data_path : str Path to the nc files parameter : str or list, optional (default: None) one or list of parameters to read, see SMOS documentation for more information (default: 'Soil_Moisture'). flatten: bool, optional (default: False) If set then the data is read into 1D arrays. This is used to e.g reshuffle the data. grid : pygeogrids.CellGrid, optional (default: EASE25CellGrid) Grid that the image data is organised on, by default the global EASE25 grid is used. read_flags : tuple or None, optional (default: (0, 1)) Filter values to read based on the selected QUALITY_FLAGS. Values for locations that are not assigned any of the here passed flags are replaces with NaN (by default only the missing-data, i.e. flag=2, locations are filtered out). float_fillval : float or None, optional (default: np.nan) Fill Value for masked pixels, this is only applied to float variables. Therefore e.g. mask variables are never filled but use the fill value as in the data. """ def __init__(self, data_path, ioclass, parameters=None, flatten=False, grid=EASE25CellGrid(bbox=None), filename_templ=None, read_flags=(0, 1), float_fillval=np.nan, additional_kws=None): ioclass_kws = {'parameters': parameters, 'flatten': flatten, 'grid': grid, 'read_flags': read_flags, 'float_fillval': float_fillval} if additional_kws: ioclass_kws.update(additional_kws) sub_path = ['%Y'] super().__init__(data_path, ioclass=ioclass, fname_templ=filename_templ, datetime_format="%Y%m%d", subpath_templ=sub_path, exact_templ=False, ioclass_kws=ioclass_kws) def _assemble_img(self, timestamp, mask=False, **kwargs): img = None try: filepath = self._build_filename(timestamp) except IOError: filepath = None if self._open(filepath): kwargs['timestamp'] = timestamp if mask is False: img = self.fid.read(**kwargs) else: img = self.fid.read_masked_data(**kwargs) return img
[docs] def read(self, timestamp, **kwargs): return self._assemble_img(timestamp, **kwargs)
[docs] def write_multiple(self, root_path, start_date, end_date, stackfile='stack.nc', **kwargs): """ Create multiple netcdf files or a netcdf stack in the passed directoy for a range of time stamps. Note that stacking gets slower when the stack gets larger. Empty images (if no original data can be loaded) are excluded here as well. Parameters ---------- root : str Directory where the files / the stack are/is stored start_date : datetime Start date of images to write down end_date Last date of images to write down stackfile : str, optional (default: 'stack.nc') Name of the stack file to create in root_path. If no name is passed we create single images instead of a stack with the same name as the original images (faster). kwargs: kwargs that are passed to the image reading function """ timestamps = self.tstamps_for_daterange(start_date, end_date) for t in timestamps: self.read(t, **kwargs) if self.fid.image_missing: continue if stackfile is None: subdir = os.path.join(root_path, str(t.year)) if not os.path.exists(subdir): os.makedirs(subdir) filepath = os.path.join(subdir, os.path.basename(self.fid.filename)) else: filepath = os.path.join(root_path, stackfile) print(f"{'Write' if not stackfile else 'Stack'} image for {str(t)}...") self.fid.write(filepath)
[docs] def tstamps_for_daterange(self, start_date, end_date): """ return timestamps for daterange, Parameters ---------- start_date: datetime.datetime start of date range end_date: datetime.datetime end of date range Returns ------- timestamps : list list of datetime objects of each available image between start_date and end_date """ img_offsets = np.array([timedelta(hours=0)]) timestamps = [] diff = end_date - start_date for i in range(diff.days + 1): daily_dates = start_date + timedelta(days=i) + img_offsets timestamps.extend(daily_dates.tolist()) return timestamps