Custom index backends#

An index backend stores the final, translated metadata records. Built‑in backends include a JSONLines data structure (either on disk, in memory or on S3) and MongoDB via Motor. You can implement additional index backends to suit your needs.

Base classes and helpers#

The metadata_stores.py module defines two key abstractions:

  • IndexStore – An abstract base class representing an index backend. Concrete implementations must implement methods to add batches of records, read chunks from an index, and delete based on facet filters. A convenience close method cleans up resources.

  • StorageIndex – A simple data class grouping together the index name and any configuration needed by the backend.

SolrIndex#

SolrIndex indexes metadata into a Apache Solr. When initialised you specify the solr server and the core names to create (latest, files, etc.). The schema is derived from the configuration. The store supports two modes:

MongoIndexStore#

MongoIndexStore stores records in MongoDB collections. Each index name corresponds to a collection. Records are upserted based on the file facet: if a document with the same file exists it will be replaced; otherwise it is inserted. Deletion uses $regex queries for glob patterns and $eq for exact values.

Provide the MongoDB connection URL and database name via the url and database parameters. You may specify additional options (e.g. TLS settings) in storage_options.

Recipe: Implementing a custom index#

To add a new index backend:

  1. Subclass IndexStore and implement the abstract methods index to add and delete records.

  2. Register your implementation under the entry point metadata_crawler.index_backends so it can be discovered via the index_backend CLI option.

  3. The schema argument passed to your constructor contains SchemaField objects that describe the canonical facets (see Configuration). Use this information to construct tables or documents with appropriate types.

Example skeleton#

import os
from typing import Any, Dict, Iterator, List, Optional, Tuple

from metadata_crawler.metadata_stores import IndexStore


class MySQLIndex(IndexStore):
    def __post_init__(self):
        """Any additional attributes can be set in this method."""

        self.password = os.getenv("MYSQL_PASSWD") or ""

    async def index(
        self, server: Optional[str] = None, user: Optional[str] = None, pw: bool = True
    ) -> None:
        """insert or upsert records."""
        if pw and not self.password:
            self.password = getpass("Give DB password: ")
        with self.db_connection(server, user, self.password) as con:
            for table in self.index_names:
                async for chnunk in self.get_metadata(index):
                    con.add(chunk)

    async def delete(
        self,
        facets: Optional[List[Tuple[str, str]]] = None,
        server: Optional[str] = None,
        user: Optional[str] = None,
        pw: bool = True,
    ) -> None:
        """remove matching records."""
        if pw and not self.password:
            self.password = getpass("Give DB password: ")
        with self.db_connection(server, user, self.password) as con:
            for table in self.index_names:
                con.delete(**dir(facets))

pyproject.toml

# register in pyproject.toml
[project.entry-points."metadata_crawler.index_backends"]
mysql = "my_package.my_index:MySQLIndex"

Extending the CLI#

The CLI entry point metadata-crawler registers its commands in cli.py. You can extend the CLI by defining new commands or options and registering them. This registration is inspired by the Typer library.

CLI API#

cli.py defines decorators @cli_function and the cli_parameter method to annotate functions with help messages and parameter metadata. The actual CLI commands are defined in your Custom index backends via the @cli_function decorator. To add a new command:

  1. Decorate the index and delete functions in our Custom index backends Use the @cli_function decorator to register it.

  2. Annotate the function parameters with Annotated and cli_parameter to supply CLI options (see SolrIndex for examples).

  3. Registering Once decorated the registering will happen automatically.

Example: adding a cli for the MySQL Index#

The MySQL index backend from above can be turned to a CLI as follows:

from typing import Optional
from typing_extensions import Annotated
from .metadata_stores import IndexStore


@cli_function(help="Index data in MySQL")
def index(
    self,
    server: Annotated[str, cli_parameter("--server", help="Server name")],
    user: Annotate[Optional[str], cli_parameter("--user", help="User name")] = None,
    db: Annotated[str, cli_parameter("--database", help="Database name")] = "foo",
    pw: Annotate[
        bool,
        cli_parmeter("--password", "-p", action="store_true", help="Ask for password"),
    ] = False,
) -> None:
    """Your index implementation here."""

Note

The arguments and keyword arguments of th e``cli_parameter`` method follow the logic of argparse.ArgumentParser.add_argument.

When you run metadata-crawler mysql --server localhost -p the function executes your custom logic.

API for adding commands to the cli.

metadata_crawler.api.cli.cli_function(help: str = '') Callable[[Callable[[...], Any]], Callable[[...], Any]][source]#

Wrap command line arguments around a method.

Those arguments represent the arguments you would normally use to create a argparse subcommand.

Parameters:

help – Help string for this sub command.

metadata_crawler.api.cli.cli_parameter(*args: str, **kwargs: Any) Dict[str, Any][source]#

Construct a argparse.Namespace.

Parameters:
  • *args – Any arguments passed to argparse.ArgumentParser().add_argument

  • **kwargs – Any keyword arguments passed to argparse.ArgumentParser().add_arguent

API Reference#

class metadata_crawler.api.index.BaseIndex(catalogue_file: str | Path | None = None, batch_size: int = 2500, storage_options: Dict[str, Any] | None = None, progress: IndexProgress | None = None, **kwargs: Any)[source]#

Bases: object

Base class to index metadata in the indexing system.

Any data ingestion class that implements metadata ingestion into cataloguing systems should inherit from this class.

This abstract class will setup consumer threads and a fifo queue that wait for new data to harvest metadata and add it to the cataloguing system. Only add() and delete() are abstract methods that need to be implemented for each cataloguing ingestion class. The rest is done by this base class.

Parameters:
  • catalogue_file – Path to the intake catalogue

  • batch_size – The amount for metadata that should be gathered before ingesting it into the catalogue.

  • progress – Optional rich progress object that should display the progress of the tasks.

__init__(catalogue_file: str | Path | None = None, batch_size: int = 2500, storage_options: Dict[str, Any] | None = None, progress: IndexProgress | None = None, **kwargs: Any) None[source]#
abstractmethod async delete(**kwargs: Any) None[source]#

Delete data from the cataloguing system.

Parameters:
  • flush – Boolean indicating whether or not the data should be flushed after amending the catalogue (if implemented).

  • search_keys – key-value based query for data that should be deleted.

async get_metadata(index_name: str) AsyncIterator[List[Dict[str, Any]]][source]#

Get the metadata of an index in batches.

Parameters:

index_name – Name of the index that should be read.

abstractmethod async index(metadata: dict[str, Any] | None = None, core: str | None = None, **kwags: Any) None[source]#

Add metadata into the cataloguing system.

Parameters:
  • metadata_batch – batch of metadata stored in a two valued tuple. The first entry of the tuple represents a name of the catalog. This entry might have different meanings for different cataloguing systems. For example apache solr will receive the name of the core. The second entry is the meta data itself, saved in a dictionary.

  • flush – Boolean indicating whether or not the data should be flushed after adding to the catalogue (if implemented)

property index_names: Tuple[str, str]#

Get the names of the indexes for latests and all data.

property index_schema: Dict[str, SchemaField]#

Get the index schema.