Source code for satpy.readers.fci_l2_nc

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2019 Satpy developers
#
# satpy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# satpy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with satpy.  If not, see <http://www.gnu.org/licenses/>.

"""Reader for the FCI L2 products in NetCDF4 format."""

import logging
from contextlib import suppress
from datetime import datetime, timedelta

import numpy as np
import xarray as xr

from satpy import CHUNK_SIZE
from satpy.readers._geos_area import get_area_definition, make_ext
from satpy.readers.file_handlers import BaseFileHandler

logger = logging.getLogger(__name__)

PRODUCT_DATA_DURATION_MINUTES = 20

SSP_DEFAULT = 0.0


[docs]class FciL2CommonFunctions(object): """Shared operations for file handlers.""" @property def _start_time(self): try: start_time = datetime.strptime(self.nc.attrs['time_coverage_start'], '%Y%m%d%H%M%S') except (ValueError, KeyError): # TODO if the sensing_start_time_utc attribute is not valid, uses a hardcoded value logger.warning("Start time cannot be obtained from file content, using default value instead") start_time = datetime.strptime('20200101120000', '%Y%m%d%H%M%S') return start_time @property def _end_time(self): """Get observation end time.""" try: end_time = datetime.strptime(self.nc.attrs['time_coverage_end'], '%Y%m%d%H%M%S') except (ValueError, KeyError): # TODO if the sensing_end_time_utc attribute is not valid, adds 20 minutes to the start time end_time = self._start_time + timedelta(minutes=PRODUCT_DATA_DURATION_MINUTES) return end_time @property def _spacecraft_name(self): """Return spacecraft name.""" try: return self.nc.attrs['platform'] except KeyError: # TODO if the platform attribute is not valid, return a default value logger.warning("Spacecraft name cannot be obtained from file content, using default value instead") return 'DEFAULT_MTG' @property def _sensor_name(self): """Return instrument.""" try: return self.nc.attrs['data_source'] except KeyError: # TODO if the data_source attribute is not valid, return a default value logger.warning("Sensor cannot be obtained from file content, using default value instead") return 'fci' def _get_global_attributes(self): """Create a dictionary of global attributes to be added to all datasets. Returns: dict: A dictionary of global attributes. filename: name of the product file start_time: sensing start time from best available source end_time: sensing end time from best available source spacecraft_name: name of the spacecraft ssp_lon: longitude of subsatellite point sensor: name of sensor creation_time: creation time of the product platform_name: name of the platform """ attributes = { 'filename': self.filename, 'start_time': self._start_time, 'end_time': self._end_time, 'spacecraft_name': self._spacecraft_name, 'ssp_lon': self.ssp_lon, 'sensor': self._sensor_name, 'creation_time': self.filename_info['creation_time'], 'platform_name': self._spacecraft_name, } return attributes def __del__(self): """Close the NetCDF file that may still be open.""" with suppress(OSError): self.nc.close()
[docs]class FciL2NCFileHandler(BaseFileHandler, FciL2CommonFunctions): """Reader class for FCI L2 products in NetCDF4 format.""" def __init__(self, filename, filename_info, filetype_info): """Open the NetCDF file with xarray and prepare for dataset reading.""" super().__init__(filename, filename_info, filetype_info) # Use xarray's default netcdf4 engine to open the file self.nc = xr.open_dataset( self.filename, decode_cf=True, mask_and_scale=True, chunks={ 'number_of_columns': CHUNK_SIZE, 'number_of_rows': CHUNK_SIZE } ) # Read metadata which are common to all datasets self.nlines = self.nc['y'].size self.ncols = self.nc['x'].size self._projection = self.nc['mtg_geos_projection'] # Compute the area definition self._area_def = self._compute_area_def() @property def ssp_lon(self): """Return subsatellite point longitude.""" try: return float(self._projection.attrs['longitude_of_projection_origin']) except KeyError: logger.warning("ssp_lon cannot be obtained from file content, using default value instead") return SSP_DEFAULT
[docs] def get_dataset(self, dataset_id, dataset_info): """Get dataset using the file_key in dataset_info.""" var_key = dataset_info['file_key'] logger.debug('Reading in file to get dataset with key %s.', var_key) try: variable = self.nc[var_key] except KeyError: logger.warning("Could not find key %s in NetCDF file, no valid Dataset created", var_key) return None # TODO in some of the test files, invalid pixels contain the value defined as "fill_value" in the YAML file # instead of being masked directly in the netCDF variable. # therefore NaN is applied where such value is found or (0 if the array contains integer values) # the next 11 lines have to be removed once the product files are correctly configured try: mask_value = dataset_info['mask_value'] except KeyError: mask_value = np.NaN try: fill_value = dataset_info['fill_value'] except KeyError: fill_value = np.NaN if dataset_info['file_type'] == 'nc_fci_test_clm': data_values = variable.where(variable != fill_value, mask_value).astype('uint32', copy=False) else: data_values = variable.where(variable != fill_value, mask_value).astype('float32', copy=False) data_values.attrs = variable.attrs variable = data_values # If the variable has 3 dimensions, select the required layer if variable.ndim == 3: layer = dataset_info.get('layer', 0) logger.debug('Selecting the layer %d.', layer) variable = variable.sel(maximum_number_of_layers=layer) if dataset_info['file_type'] == 'nc_fci_test_clm' and var_key != 'cloud_mask_cmrt6_test_result': variable.values = (variable.values >> dataset_info['extract_byte'] << 31 >> 31) # Rename the dimensions as required by Satpy variable = variable.rename({"number_of_rows": 'y', "number_of_columns": 'x'}) # Manage the attributes of the dataset variable.attrs.setdefault('units', None) variable.attrs.update(dataset_info) variable.attrs.update(self._get_global_attributes()) return variable
[docs] def get_area_def(self, key): """Return the area definition (common to all data in product).""" return self._area_def
def _compute_area_def(self): """Compute the area definition. Returns: AreaDefinition: A pyresample AreaDefinition object containing the area definition. """ # Read the projection data from the mtg_geos_projection variable a = float(self._projection.attrs['semi_major_axis']) b = float(self._projection.attrs['semi_minor_axis']) h = float(self._projection.attrs['perspective_point_height']) # TODO sweep_angle_axis value not handled at the moment, therefore commented out # sweep_axis = self._projection.attrs['sweep_angle_axis'] # Coordinates of the pixel in radians x = self.nc['x'] y = self.nc['y'] # TODO conversion to radians: offset and scale factor are missing from some test NetCDF file # TODO the next two lines should be removed when the offset and scale factor are correctly configured if not hasattr(x, 'standard_name'): x = np.radians(x * 0.003202134 - 8.914740401) y = np.radians(y * 0.003202134 - 8.914740401) # Convert to degrees as required by the make_ext function x_deg = np.degrees(x) y_deg = np.degrees(y) # Select the extreme points of the extension area x_l, x_r = x_deg.values[0], x_deg.values[-1] y_l, y_u = y_deg.values[0], y_deg.values[-1] # Compute the extension area in meters area_extent = make_ext(x_l, x_r, y_l, y_u, h) # Assemble the projection definition dictionary p_dict = { 'nlines': self.nlines, 'ncols': self.ncols, 'ssp_lon': self.ssp_lon, 'a': a, 'b': b, 'h': h, 'a_name': 'FCI Area', # TODO to be confirmed 'a_desc': 'Area for FCI instrument', # TODO to be confirmed 'p_id': 'geos' } # Compute the area definition area_def = get_area_definition(p_dict, area_extent) return area_def
[docs]class FciL2NCSegmentFileHandler(BaseFileHandler, FciL2CommonFunctions): """Reader class for FCI L2 Segmented products in NetCDF4 format.""" def __init__(self, filename, filename_info, filetype_info): """Open the NetCDF file with xarray and prepare for dataset reading.""" super().__init__(filename, filename_info, filetype_info) # Use xarray's default netcdf4 engine to open the file self.nc = xr.open_dataset( self.filename, decode_cf=True, mask_and_scale=True, chunks={ 'number_of_FoR_cols': CHUNK_SIZE, 'number_of_FoR_rows': CHUNK_SIZE } ) # Read metadata which are common to all datasets self.nlines = self.nc['number_of_FoR_rows'].size self.ncols = self.nc['number_of_FoR_cols'].size self.ssp_lon = SSP_DEFAULT
[docs] def get_dataset(self, dataset_id, dataset_info): """Get dataset using the file_key in dataset_info.""" var_key = dataset_info['file_key'] logger.debug('Reading in file to get dataset with key %s.', var_key) try: variable = self.nc[var_key] except KeyError: logger.warning("Could not find key %s in NetCDF file, no valid Dataset created", var_key) return None # TODO in some of the test files, invalid pixels contain the value defined as "fill_value" in the YAML file # instead of being masked directly in the netCDF variable. # therefore NaN is applied where such value is found or (0 if the array contains integer values) # the next 11 lines have to be removed once the product files are correctly configured mask_value = dataset_info.get('mask_value', np.NaN) fill_value = dataset_info.get('fill_value', np.NaN) float_variable = variable.where(variable != fill_value, mask_value).astype('float32', copy=False) float_variable.attrs = variable.attrs variable = float_variable # Rename the dimensions as required by Satpy variable = variable.rename({"number_of_FoR_rows": 'y', "number_of_FoR_cols": 'x'}) # # Manage the attributes of the dataset variable.attrs.setdefault('units', None) variable.attrs.update(dataset_info) variable.attrs.update(self._get_global_attributes()) return variable