Source code for metadata_crawler.api.index
"""API for adding new cataloging systems."""
from __future__ import annotations
import abc
from pathlib import Path
from typing import (
Any,
AsyncIterator,
Dict,
List,
Optional,
Tuple,
Union,
cast,
)
from ..api.config import SchemaField
from ..api.metadata_stores import CatalogueReader, IndexStore
from ..logger import logger
[docs]
class BaseIndex:
"""Base class to index metadata in the indexing system.
Any data ingestion class that implements metadata ingestion into
cataloguing systems should inherit from this class.
This abstract class will setup consumer threads and a fifo queue that wait
for new data to harvest metadata and add it to the cataloguing system.
Only :py:func:`add` and :py:func:`delete` are abstract methods that need
to be implemented for each cataloguing ingestion class. The rest is done
by this base class.
Parameters
^^^^^^^^^^
catalogue_file:
Path to the intake catalogue
batch_size:
The amount for metadata that should be gathered `before` ingesting
it into the catalogue.
Attributes
^^^^^^^^^^
"""
[docs]
def __init__(
self,
catalogue_file: Optional[Union[str, Path]] = None,
batch_size: int = 2500,
storage_options: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> None:
self._store: Optional[IndexStore] = None
if catalogue_file is not None:
_reader = CatalogueReader(
catalogue_file=catalogue_file or "",
batch_size=batch_size,
storage_options=storage_options,
)
self._store = _reader.store
self.__post_init__()
def __post_init__(self) -> None: ...
@property
def index_schema(self) -> Dict[str, SchemaField]:
"""Get the index schema."""
return cast(Dict[str, SchemaField], getattr(self._store, "schema", {}))
@property
def index_names(self) -> Tuple[str, str]:
"""Get the names of the indexes for latests and all data."""
return cast(
Tuple[str, str], getattr(self._store, "index_names", ("", ""))
)
[docs]
@abc.abstractmethod
async def delete(self, **kwargs: Any) -> None:
"""Delete data from the cataloguing system.
Parameters
^^^^^^^^^^
flush:
Boolean indicating whether or not the data should be flushed after
amending the catalogue (if implemented).
search_keys:
key-value based query for data that should be deleted.
"""
[docs]
@abc.abstractmethod
async def index(
self,
metadata: Optional[dict[str, Any]] = None,
core: Optional[str] = None,
**kwags: Any,
) -> None:
"""Add metadata into the cataloguing system.
Parameters
^^^^^^^^^^
metadata_batch:
batch of metadata stored in a two valued tuple. The first entry
of the tuple represents a name of the catalog. This entry
might have different meanings for different cataloguing systems.
For example apache solr will receive the name of the ``core``.
The second entry is the meta data itself, saved in a dictionary.
flush:
Boolean indicating whether or not the data should be flushed after
adding to the catalogue (if implemented)
"""