Source code for metadata_crawler

"""Metadata Crawler API high level functions."""

import asyncio
from pathlib import Path
from types import ModuleType
from typing import Any, Dict, List, Literal, Optional, Union, cast, overload

from tomlkit import TOMLDocument

try:
    import uvloop

    use_uvloop = True
except ImportError:

    use_uvloop = False  # pragma: no cover


from ._version import __version__
from .api.config import ConfigMerger, DRSConfig
from .api.metadata_stores import CatalogueBackendType, IndexName
from .data_collector import DataCollector
from .logger import logger
from .run import async_add, async_delete, async_index

async_model: ModuleType

if use_uvloop:
    async_model = uvloop
    asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
else:
    async_model = asyncio  # pragma: no cover

__all__ = [
    "logger",
    "__version__",
    "DataCollector",
    "index",
    "add",
    "delete",
    "get_config",
    "async_index",
    "async_delete",
    "async_add",
]


@overload
def get_config(
    *, preserve_comments: Literal[True] = ...
) -> ConfigMerger[TOMLDocument]: ...  # noqa


@overload
def get_config(
    *, preserve_comments: Literal[False]
) -> ConfigMerger[Dict[str, Any]]: ...  # noqa


@overload
def get_config(*, preserve_comments: bool) -> ConfigMerger[Any]: ...  # noqa


[docs] def get_config( *config: Union[Path, str], preserve_comments: bool = True ) -> ConfigMerger[Any]: """Get a drs config file merged with the default config. The method is helpful to inspect all possible configurations and their default values. Parameters ^^^^^^^^^^ config: Path to a user defined config file that is going to be merged with the default config. preserve_comments: Preserve the comments in a config file. """ cfg = ConfigMerger(*config, preserve_comments=preserve_comments) doc = cast(Dict[str, Any], cfg.merged_doc) datasets = {k: v for k, v in doc.items() if k != "drs_settings"} _ = DRSConfig(datasets=datasets, **doc["drs_settings"]) return cfg
[docs] def index( index_system: str, *catalogue_files: Union[Path, str, List[str], List[Path]], batch_size: int = 2500, verbosity: int = 0, log_suffix: Optional[str] = None, **kwargs: Any, ) -> None: """Index metadata in the indexing system. Parameters ^^^^^^^^^^ index_system: The index server where the metadata is indexed. catalogue_files: Path to the file(s) where the metadata was stored. batch_size: If the index system supports batch-sizes, the size of the batches. verbosity: Set the verbosity level. log_suffix: Add a suffix to the log file output. Other Parameters ^^^^^^^^^^^^^^^^ **kwargs: Keyword arguments used to delete data from the index. Examples ^^^^^^^^ .. code-block:: python index( "solr", "/tmp/catalog-1.yml", "/tmp/catalog-2.yml", batch_size=50, server="localhost:8983", ) """ async_model.run( async_index( index_system, *catalogue_files, batch_size=batch_size, verbosity=verbosity, log_suffix=log_suffix, **kwargs, ) )
[docs] def delete( index_system: str, batch_size: int = 2500, verbosity: int = 0, log_suffix: Optional[str] = None, **kwargs: Any, ) -> None: """Delete metadata from the indexing system. Parameters ^^^^^^^^^^ index_system: The index server where the metadata is indexed. batch_size: If the index system supports batch-sizes, the size of the batches. verbosity: Set the verbosity of the system. log_suffix: Add a suffix to the log file output. Other Parameters ^^^^^^^^^^^^^^^^ **kwargs: Keyword arguments used to delete data from the index. Examples ^^^^^^^^ .. code-block:: python delete( "solr", server="localhost:8983", facets=[("project", "CMIP6"), ("institute", "MPI-M")], ) """ async_model.run( async_delete( index_system, batch_size=batch_size, log_suffix=log_suffix, **kwargs ) )
[docs] def add( *config_files: Union[Path, str, Dict[str, Any], TOMLDocument], store: Optional[Union[str, Path]] = None, data_object: Optional[Union[str, List[str]]] = None, data_set: Optional[Union[str, List[str]]] = None, data_store_prefix: str = "metadata", catalogue_backend: CatalogueBackendType = "jsonlines", batch_size: int = 25_000, comp_level: int = 4, storage_options: Optional[Dict[str, Any]] = None, shadow: Optional[Union[str, List[str]]] = None, latest_version: str = IndexName().latest, all_versions: str = IndexName().all, n_procs: Optional[int] = None, verbosity: int = 0, log_suffix: Optional[str] = None, password: bool = False, fail_under: int = -1, **kwargs: Any, ) -> None: """Harvest metadata from storage systems and add them to an intake catalogue. .. versionchanged:: 2511.0.0 The catalogue argument has been rearanged and is now a keyword argument: ``add("data.yaml", "drs-config.toml")`` becomes ``add("drs-config.toml", store="data.yaml")``. If the ``store`` keyword is omitted the output catalogue will be interpreted as config file. Parameters ^^^^^^^^^^ config_files: Path to the drs-config file / loaded configuration. store: Path to the intake catalogue where the collected metadata will be stored. data_ojbect: Instead of defining datasets that are to be crawled you can crawl data based on their directories. The directories must be a root dirs given in the drs-config file. By default all root dirs are crawled. data_set: Datasets that should be crawled. The datasets need to be defined in the drs-config file. By default all datasets are crawled. Names can contain wildcards such as ``xces-*``. data_store_prefix: Absolute path or relative path to intake catalogue source data_dir: Instead of defining datasets are are to be crawled you can crawl data based on their directories. The directories must be a root dirs given in the drs-config file. By default all root dirs are crawled. bach_size: Batch size that is used to collect the meta data. This can affect performance. comp_level: Compression level used to write the meta data to csv.gz storage_options: Set additional storage options for adding metadata to the metadata store shadow: 'Shadow' this storage options. This is useful to hide secrets in public data catalogues. catalogue_backend: Intake catalogue backend latest_version: Name of the core holding 'latest' metadata. all_versions: Name of the core holding 'all' metadata versions. password: Display a password prompt and set password before beginning. n_procs: Set the number of parallel processes for collecting. verbosity: Set the verbosity of the system. log_suffix: Add a suffix to the log file output. fail_under: Fail if less than X of the discovered files could be indexed. Other Parameters ^^^^^^^^^^^^^^^^ **kwargs: Additional keyword arguments. Examples ^^^^^^^^ .. code-block:: python add( "~/data/drs-config.toml", store="my-data.yaml", data_set=["cmip6", "cordex"], ) """ async_model.run( async_add( *config_files, store=store, data_object=data_object, data_set=data_set, batch_size=batch_size, comp_level=comp_level, password=password, catalogue_backend=catalogue_backend, data_store_prefix=data_store_prefix, shadow=shadow, latest_version=latest_version, all_versions=all_versions, n_procs=n_procs, storage_options=storage_options, verbosity=verbosity, log_suffix=log_suffix, fail_under=fail_under, **kwargs, ) )