"""API for adding new storage backends via :py:class:`BasePath`."""
import abc
import os
import pathlib
import threading
from getpass import getuser
from typing import (
Any,
AsyncIterator,
ClassVar,
Dict,
List,
Mapping,
Optional,
Tuple,
Union,
cast,
)
from urllib.parse import urlsplit
import fsspec
import xarray as xr
from anyio import Path
from jinja2 import Environment, Undefined
from pydantic import BaseModel, Field
from ..backends.lookup_tables import cmor_lookup
class Metadata(BaseModel):
"""Meta data that is attached to each discovered path."""
path: str
metadata: Dict[str, Any] = Field(default_factory=dict)
[docs]
class TemplateMixin:
"""Apply templating egine jinja2."""
storage_options: Optional[Dict[str, Any]] = None
[docs]
@staticmethod
def render_templates(
data: Any,
context: Mapping[str, Any],
*,
max_passes: int = 2,
) -> Any:
"""Recursively render Jinja2 templates found in strings within data.
This function traverses common container types (``dict``, ``list``,
``tuple``, ``set``), dataclasses, namedtuples, and ``pathlib.Path`` objects.
Every string encountered is treated as a Jinja2 template and rendered with
the provided ``context``. Rendering can be repeated up to ``max_passes``
times to resolve templates that produce further templates on the first pass.
Parameters
^^^^^^^^^^
data:
Arbitrary Python data structure. Supported containers are ``dict``
(keys and values), ``list``, ``tuple`` (including namedtuples),
``set``, dataclasses (fields), and ``pathlib.Path``.
Scalars (e.g., ``int``, ``float``, ``bool``, ``None``) are returned
unchanged. Strings are rendered as Jinja2 templates.
context:
Mapping of template variables available to Jinja2 during rendering.
max_passes:
Maximum number of rendering passes to perform on each string,
by default ``2``. Increase this if templates generate further
templates that need resolution.
Returns
^^^^^^^
Any:
A structure of the same shape with all strings rendered. Container and
object types are preserved where feasible (e.g., ``tuple`` stays a
``tuple``, namedtuple stays a namedtuple, dataclass remains the
same dataclass type).
Raises
^^^^^^^
jinja2.TemplateError
For other Jinja2 template errors encountered during rendering.
Notes
^^^^^^
* Dictionary keys are also rendered if they are strings (or nested
containers with strings). If rendering causes key collisions, the
**last** rendered key wins.
* For dataclasses, all fields are rendered and a new instance is returned using
``dataclasses.replace``. Frozen dataclasses are supported.
* Namedtuples are detected via the ``_fields`` attribute and
reconstructed with the same type.
Examples
^^^^^^^^^
.. code-block::python
data = {
"greeting": "Hello, {{ name }}!",
"items": ["{{ count }} item(s)", 42],
"path": {"root": "/home/{{ user }}", "cfg": "{{ root }}/cfg"},
}
ctx = {"name": "Ada", "count": 3, "user": "ada", "root": "/opt/app"}
render_templates(data, ctx)
# {'greeting': 'Hello, Ada!',
# 'items': ['3 item(s)', 42],
# 'path': {'root': '/home/ada', 'cfg': '/opt/app/cfg'}}
"""
env = Environment(undefined=Undefined, autoescape=False)
env_map = dict(os.environ)
def _env_get(name: str, default: Any = None) -> Any:
return env_map.get(name, default)
def _getenv_filter(varname: str, default: Any = None) -> Any:
return env_map.get(varname, default)
env.globals.setdefault("env", _env_get)
env.globals.setdefault("ENV", env_map)
env.filters.setdefault("getenv", _getenv_filter)
def _render_str(s: str) -> str:
out = s
for _ in range(max_passes):
new = env.from_string(out).render(context)
if new == out:
break
out = new
return out
def _walk(obj: Any) -> Any:
if isinstance(obj, str):
return _render_str(obj)
if isinstance(obj, dict):
rendered: dict[Any, Any] = {}
for k, v in obj.items():
rk = _render_str(k) if isinstance(k, str) else k
rendered[rk] = _walk(v)
return rendered
if isinstance(obj, list):
return [_walk(x) for x in obj]
if isinstance(obj, tuple):
return tuple(_walk(x) for x in obj)
if isinstance(obj, set):
return {_walk(x) for x in obj}
return obj
return _walk(data)
[docs]
class PathMixin:
"""Class that defines typical Path operations."""
[docs]
async def suffix(self, path: Union[str, Path, pathlib.Path]) -> str:
"""Get the suffix of a given input path.
Parameters
^^^^^^^^^^
path: str, asyncio.Path, pathlib.Path
Path of the object store
Returns
^^^^^^-
str: The file type extension of the path.
"""
return Path(path).suffix
[docs]
def get_fs_and_path(self, uri: str) -> Tuple[fsspec.AbstractFileSystem, str]:
"""Return (fs, path) suitable for xarray.
Parameters
^^^^^^^^^^
uri:
Path to the object store / file name
Returns
^^^^^^-
fsspec.AbstractFileSystem, str:
The AbstractFileSystem class and the corresponding path to the
data store.
"""
protocol, path = fsspec.core.split_protocol(uri)
protocol = protocol or "file"
path = urlsplit(uri.removeprefix(f"{protocol}://")).path
return fsspec.filesystem(protocol), path
class BasePath(abc.ABCMeta):
"""Every storage backend class should be of this type."""
[docs]
class PathTemplate(abc.ABC, PathMixin, TemplateMixin, metaclass=BasePath):
"""Base class for interacting with different storage systems.
This class defines fundamental methods that should be implemented
to retrieve information across different storage systems.
Parameters
^^^^^^^^^^
suffixes: List[str], default: [".nc", ".girb", ".zarr", ".tar", ".hdf5"]
A list of available file suffixes.
Other Parameters
^^^^^^^^^^^^^^^^
storage_options: Any
Information needed to interact with the storage system.
Attributes
^^^^^^^^^^
_user : str
Value of the ``DRS_STORAGE_USER`` env variable (defaults to current user)
_pw : str
a password passed by the ``DRS_STORAGE_PASSWD`` env variable
suffixes: List[str]
A list of available file suffixes.
storage_options: Dist[str, Any]
A dict with information needed to interact with the storage system.
"""
_fs_type: ClassVar[Optional[str]]
"""Definition of the file system time for each implementation."""
_lock = threading.RLock()
[docs]
def __init__(
self, suffixes: Optional[List[str]] = None, **storage_options: Any
) -> None:
self._user: str = os.environ.get("DRS_STORAGE_USER") or getuser()
self._pw: str = os.environ.get("DRS_STORAGE_PASSWD") or ""
self.suffixes = suffixes or [".nc", ".girb", ".zarr", ".tar", ".hdf5"]
self.storage_options = cast(
Dict[str, Any], self.render_templates(storage_options or {}, {})
)
self.__post_init__()
def __post_init__(self) -> None:
"""Call this method after the __init__ get called.
If you need to assign any attributes redefine this method in your class.
"""
[docs]
async def close(self) -> None:
"""Close any open sessions."""
[docs]
def open_dataset(self, path: str, **read_kws: Any) -> xr.Dataset:
"""Open a dataset with xarray.
Parameters
^^^^^^^^^^
path:
Path to the object store / file name
**read_kws:
Keyword arguments passed to open the datasets.
Returns
^^^^^^-
xarray.Dataset:
The xarray dataset.
"""
fs, path = self.get_fs_and_path(path)
def _get_engine(file_name: str) -> str:
engines = {
"cfgrib": (".grb", ".grib", ".gb"),
"h5netcdf": (".nc", ".nc4", ".netcdf", ".cdf", ".hdf5", ".h5"),
"zarr": (".zarr", ".zar"),
}
for eng, suffixes in engines.items():
for suffix in suffixes:
if file_name.endswith(suffix):
return eng
return ""
kwargs = read_kws.copy()
engine = kwargs.setdefault("engine", _get_engine(path) or None)
if engine == "zarr":
dset: xr.Dataset = xr.open_zarr(fs.get_mapper(path))
return dset
if fs.protocol[0] == "file":
return xr.open_mfdataset(path, **kwargs)
with fs.open(path, "rb") as stream:
return xr.open_dataset(stream, **kwargs)
[docs]
def lookup(
self, path: str, attribute: str, *tree: str, **read_kws: Any
) -> Any:
"""Get metadata from a lookup table.
This function will read metadata from a pre-defined cache table and if
the metadata is not present in the cache table it'll read the
the object store and add the metadata to the cache table.
Parameters
^^^^^^^^^^
path:
Path to the object store / file name
attribute:
The attribute that is retrieved from the data.
variable attributes can be defined by a ``.``.
For example: ``tas.long_name`` would get attribute ``long_name``
from variable ``tas``.
*tree:
A tuple representing nested attributes. Attributes are nested for
more efficient lookup. ('atmos', '1hr', 'tas') will translate into
a tree of ['atmos']['1hr']['tas']
**read_kws:
Keyword arguments passed to open the datasets.
"""
keys = tuple(tree) + (attribute,)
d: Dict[str, Any] = cmor_lookup
with self._lock:
for a in keys[:-1]:
d = d.setdefault(a, {})
if keys[-1] in d:
return d[attribute]
d[keys[-1]] = self.read_attr(attribute, path, **read_kws)
return d[keys[-1]]
[docs]
def read_attr(
self, attribute: str, path: Union[str, pathlib.Path], **read_kws: Any
) -> Any:
"""Get a metadata attribute from a datastore object.
Parameters
^^^^^^^^^^
attr: The attribute that is queried can be of the form of
<attribute>, <variable>.<attribute>, <attribute>,
<variable>.<attribute>
path: Path to the object store / file path
read_kws: Keyword arguments for opening the datasets.
Returns
^^^^^^^
str: Metadata from the data.
"""
with self.open_dataset(str(path), **read_kws) as dset:
attrs = dset.attrs
for var in dset.variables:
for name, value in dset[var].attrs.items():
attrs[f"{var}.{name}"] = value
return attrs[attribute]
[docs]
@abc.abstractmethod
async def is_dir(self, path: Union[str, Path, pathlib.Path]) -> bool:
"""Check if a given path is a directory object on the storage system.
Parameters
^^^^^^^^^^
path : str, asyncio.Path, pathlib.Path
Path of the object store
Returns
^^^^^^-
bool: True if path is dir object, False if otherwise or doesn't exist
"""
[docs]
@abc.abstractmethod
async def is_file(self, path: Union[str, Path, pathlib.Path]) -> bool:
"""Check if a given path is a file object on the storage system.
Parameters
^^^^^^^^^^
path:
Path of the object store
Returns
^^^^^^^
bool:
True if path is file object, False if otherwise or doesn't exist
"""
... # pragma: no cover
[docs]
@abc.abstractmethod
async def iterdir(
self,
path: Union[str, Path, pathlib.Path],
) -> AsyncIterator[str]:
"""Get all sub directories from a given path.
Parameters
^^^^^^^^^^
path:
Path of the object store
Yields
^^^^^^
str:
1st level sub directory
"""
yield "" # pragma: no cover
[docs]
@abc.abstractmethod
async def rglob(
self, path: Union[str, Path, pathlib.Path], glob_pattern: str = "*"
) -> AsyncIterator[Metadata]:
"""Search recursively for paths matching a given glob pattern.
Parameters
^^^^^^^^^^
path:
Path of the object store
glob_pattern: str
Pattern that the target files must match
Yields
^^^^^^
str: Path of the object store that matches the glob pattern.
"""
yield Metadata(path="") # pragma: no cover
[docs]
def fs_type(self, path: Union[str, Path, pathlib.Path]) -> str:
"""Define the file system type."""
return self._fs_type or ""
[docs]
@abc.abstractmethod
def path(self, path: Union[str, Path, pathlib.Path]) -> str:
"""Get the full path (including any schemas/netlocs).
Parameters
^^^^^^^^^^
path:
Path of the object store
Returns
^^^^^^^
str:
URI of the object store
"""
... # pragma: no cover
[docs]
@abc.abstractmethod
def uri(self, path: Union[str, Path, pathlib.Path]) -> str:
"""Get the uri of the object store.
Parameters
^^^^^^^^^^
path:
Path of the object store
Returns
^^^^^^^
str:
URI of the object store
"""
... # pragma: no cover