Custom index backends#
An index backend stores the final, translated metadata records. Built‑in backends include a DuckDB database (either on disk, in memory or on S3) and MongoDB via Motor. You can implement additional index backends to suit your needs.
Base classes and helpers#
The metadata_stores.py module defines two key abstractions:
IndexStore– An abstract base class representing an index backend. Concrete implementations must implement methods toaddbatches of records,readchunks from an index, anddeletebased on facet filters. A convenienceclosemethod cleans up resources.StorageIndex– A simple data class grouping together the index name and any configuration needed by the backend.
SolrIndex#
SolrIndex indexes metadata into a Apache Solr. When
initialised you specify the solr server and the core names to
create (latest, files, etc.). The schema is
derived from the configuration. The store supports two modes:
MongoIndexStore#
MongoIndexStore stores records in MongoDB collections. Each
index name corresponds to a collection. Records are upserted based
on the file facet: if a document with the same file exists
it will be replaced; otherwise it is inserted. Deletion uses
$regex queries for glob patterns and $eq for exact values.
Provide the MongoDB connection URL and database name via the
url and database parameters. You may specify additional
options (e.g. TLS settings) in storage_options.
Recipe: Implementing a custom index#
To add a new index backend:
Subclass
IndexStoreand implement the abstract methodsindexto add anddeleterecords.Register your implementation under the entry point
metadata_crawler.index_backendsso it can be discovered via theindex_backendCLI option.The
schemaargument passed to your constructor containsSchemaFieldobjects that describe the canonical facets (see Configuration). Use this information to construct tables or documents with appropriate types.
Example skeleton#
import os
from typing import Any, Dict, Iterator, List, Optional, Tuple
from metadata_crawler.metadata_stores import IndexStore
class MySQLIndex(IndexStore):
def __post_init__(self):
"""Any additional attributes can be set in this method."""
self.password = os.getenv("MYSQL_PASSWD") or ""
async def index(
self, server: Optional[str] = None, user: Optional[str] = None, pw: bool = True
) -> None:
"""insert or upsert records."""
if pw and not self.password:
self.password = getpass("Give DB password: ")
with self.db_connection(server, user, self.password) as con:
for table in self.index_names:
async for chnunk in self.get_metadata(index):
con.add(chunk)
async def delete(
self,
facets: Optional[List[Tuple[str, str]]] = None,
server: Optional[str] = None,
user: Optional[str] = None,
pw: bool = True,
) -> None:
"""remove matching records."""
if pw and not self.password:
self.password = getpass("Give DB password: ")
with self.db_connection(server, user, self.password) as con:
for table in self.index_names:
con.delete(**dir(facets))
pyproject.toml
# register in pyproject.toml
[project.entry-points."metadata_crawler.index_backends"]
mysql = "my_package.my_index:MySQLIndex"
Extending the CLI#
The CLI entry point metadata-crawler registers its commands in cli.py.
You can extend the CLI by defining new commands or options and registering
them. This registration is inspired by the Typer
library.
CLI API#
cli.py defines decorators @cli_function and the cli_parameter method
to annotate functions with help messages and parameter metadata. The
actual CLI commands are defined in your Custom index backends via the
@cli_function decorator. To add a new command:
Decorate the
indexanddeletefunctions in our Custom index backends Use the@cli_functiondecorator to register it.Annotate the function parameters with
Annotatedandcli_parameterto supply CLI options (seeSolrIndexfor examples).Registering Once decorated the registering will happen automatically.
Example: adding a cli for the MySQL Index#
The MySQL index backend from above can be turned to a CLI as follows:
from typing import Optional
from typing_extensions import Annotated
from .metadata_stores import IndexStore
@cli_function(help="Index data in MySQL")
def index(
self,
server: Annotated[str, cli_parameter("--server", help="Server name")],
user: Annotate[Optional[str], cli_parameter("--user", help="User name")] = None,
db: Annotated[str, cli_parameter("--database", help="Database name")] = "foo",
pw: Annotate[
bool,
cli_parmeter("--password", "-p", action="store_true", help="Ask for password"),
] = False,
) -> None:
"""Your index implementation here."""
Note
The arguments and keyword arguments of th e``cli_parameter`` method follow the logic of argparse.ArgumentParser.add_argument.
When you run metadata-crawler mysql --server localhost -p
the function executes your custom logic.
API for adding commands to the cli.
- metadata_crawler.api.cli.cli_function(help: str = '') Callable[[Callable[[...], Any]], Callable[[...], Any]][source]#
Wrap command line arguments around a method.
Those arguments represent the arguments you would normally use to create a argparse subcommand.
- Parameters:
help – Help string for this sub command.
- metadata_crawler.api.cli.cli_parameter(*args: str, **kwargs: Any) Dict[str, Any][source]#
Construct a
argparse.Namespace.- Parameters:
*args – Any arguments passed to
argparse.ArgumentParser().add_argument**kwargs – Any keyword arguments passed to
argparse.ArgumentParser().add_arguent
API Reference:
- class metadata_crawler.api.index.BaseIndex(catalogue_file: str | Path | None = None, batch_size: int = 2500, storage_options: Dict[str, Any] | None = None, **kwargs: Any)[source]#
Bases:
objectBase class to index metadata in the indexing system.
Any data ingestion class that implements metadata ingestion into cataloguing systems should inherit from this class.
This abstract class will setup consumer threads and a fifo queue that wait for new data to harvest metadata and add it to the cataloguing system. Only
add()anddelete()are abstract methods that need to be implemented for each cataloguing ingestion class. The rest is done by this base class.- Parameters:
catalogue_file – Path to the intake catalogue
batch_size – The amount for metadata that should be gathered before ingesting it into the catalogue.
- __init__(catalogue_file: str | Path | None = None, batch_size: int = 2500, storage_options: Dict[str, Any] | None = None, **kwargs: Any) None[source]#
- abstractmethod async delete(**kwargs: Any) None[source]#
Delete data from the cataloguing system.
- Parameters:
flush – Boolean indicating whether or not the data should be flushed after amending the catalogue (if implemented).
search_keys – key-value based query for data that should be deleted.
- async get_metadata(index_name: str) AsyncIterator[List[Dict[str, Any]]][source]#
Get the metadata of an index in batches.
- Parameters:
index_name – Name of the index that should be read.
- abstractmethod async index(metadata: dict[str, Any] | None = None, core: str | None = None, **kwags: Any) None[source]#
Add metadata into the cataloguing system.
- Parameters:
metadata_batch – batch of metadata stored in a two valued tuple. The first entry of the tuple represents a name of the catalog. This entry might have different meanings for different cataloguing systems. For example apache solr will receive the name of the
core. The second entry is the meta data itself, saved in a dictionary.flush – Boolean indicating whether or not the data should be flushed after adding to the catalogue (if implemented)
- property index_names: Tuple[str, str]#
Get the names of the indexes for latests and all data.
- property index_schema: Dict[str, SchemaField]#
Get the index schema.