Skip to content

odmantic.engine

odmantic.engine.AIOEngine

The AIOEngine object is responsible for handling database operations with MongoDB in an asynchronous way using motor.

__init__(self, motor_client=None, database='test') special

Engine constructor.

Parameters:

Name Type Description Default
motor_client AsyncIOMotorClient

instance of an AsyncIO motor client. If None, a default one will be created

None
database str

name of the database to use

'test'

Source code in odmantic/engine.py
def __init__(self, motor_client: AsyncIOMotorClient = None, database: str = "test"):
    """Engine constructor.

    Args:
        motor_client: instance of an AsyncIO motor client. If None, a default one
                will be created
        database: name of the database to use

    <!---
    #noqa: DAR401 ValueError
    -->
    """
    # https://docs.mongodb.com/manual/reference/limits/#naming-restrictions
    forbidden_characters = _FORBIDDEN_DATABASE_CHARACTERS.intersection(
        set(database)
    )
    if len(forbidden_characters) > 0:
        raise ValueError(
            f"database name cannot contain: {' '.join(forbidden_characters)}"
        )
    if motor_client is None:
        motor_client = AsyncIOMotorClient()
    self.client = motor_client
    self.database_name = database
    self.database = motor_client[self.database_name]

count(self, model, *queries) async

Get the count of documents matching a query

Parameters:

Name Type Description Default
model Type[~ModelType]

model to perform the operation on

required
queries Union[odmantic.query.QueryExpression, Dict, bool]

query filters to apply

()

Returns:

Type Description
int

number of document matching the query


Source code in odmantic/engine.py
async def count(
    self, model: Type[ModelType], *queries: Union[QueryExpression, Dict, bool]
) -> int:
    """Get the count of documents matching a query

    Args:
        model: model to perform the operation on
        queries: query filters to apply

    Returns:
        number of document matching the query

    <!---
    #noqa: DAR401 TypeError
    -->
    """
    if not lenient_issubclass(model, Model):
        raise TypeError("Can only call count with a Model class")
    query = AIOEngine._build_query(*queries)
    collection = self.database[model.__collection__]
    count = await collection.count_documents(query)
    return int(count)

delete(self, instance) async

Delete an instance from the database

Parameters:

Name Type Description Default
instance ~ModelType

the instance to delete

required

Exceptions:

Type Description
DocumentNotFoundError

the instance has not been persisted to the database

Source code in odmantic/engine.py
async def delete(self, instance: ModelType) -> None:
    """Delete an instance from the database

    Args:
        instance: the instance to delete

    Raises:
        DocumentNotFoundError: the instance has not been persisted to the database

    """
    # TODO handle cascade deletion
    collection = self.database[instance.__collection__]
    pk_name = instance.__primary_field__
    result = await collection.delete_many({"_id": getattr(instance, pk_name)})
    count = int(result.deleted_count)
    if count == 0:
        raise DocumentNotFoundError(instance)

find(self, model, *queries, *, sort=None, skip=0, limit=None)

Search for Model instances matching the query filter provided

Parameters:

Name Type Description Default
model Type[~ModelType]

model to perform the operation on

required
queries Union[odmantic.query.QueryExpression, Dict, bool]

query filter to apply

()
sort Optional[Any]

sort expression

None
skip int

number of document to skip

0
limit Optional[int]

maximum number of instance fetched

None

Exceptions:

Type Description
DocumentParsingError

unable to parse one of the resulting documents

Returns:

Type Description
odmantic.engine.AIOCursor[~ModelType]

odmantic.engine.AIOCursor of the query


Source code in odmantic/engine.py
def find(
    self,
    model: Type[ModelType],
    *queries: Union[
        QueryExpression, Dict, bool
    ],  # bool: allow using binary operators with mypy
    sort: Optional[Any] = None,
    skip: int = 0,
    limit: Optional[int] = None,
) -> AIOCursor[ModelType]:
    """Search for Model instances matching the query filter provided

    Args:
        model: model to perform the operation on
        queries: query filter to apply
        sort: sort expression
        skip: number of document to skip
        limit: maximum number of instance fetched

    Raises:
        DocumentParsingError: unable to parse one of the resulting documents

    Returns:
        [odmantic.engine.AIOCursor][] of the query

    <!---
    #noqa: DAR401 ValueError
    #noqa: DAR401 TypeError
    #noqa: DAR402 DocumentParsingError
    -->
    """
    if not lenient_issubclass(model, Model):
        raise TypeError("Can only call find with a Model class")
    sort_expression = self._validate_sort_argument(sort)
    if limit is not None and limit <= 0:
        raise ValueError("limit has to be a strict positive value or None")
    if skip < 0:
        raise ValueError("skip has to be a positive integer")
    query = AIOEngine._build_query(*queries)
    collection = self.get_collection(model)
    pipeline: List[Dict] = [{"$match": query}]
    if sort_expression is not None:
        pipeline.append({"$sort": sort_expression})
    if skip > 0:
        pipeline.append({"$skip": skip})
    if limit is not None and limit > 0:
        pipeline.append({"$limit": limit})
    pipeline.extend(AIOEngine._cascade_find_pipeline(model))
    motor_cursor = collection.aggregate(pipeline)
    return AIOCursor(model, motor_cursor)

find_one(self, model, *queries, *, sort=None) async

Search for a Model instance matching the query filter provided

Parameters:

Name Type Description Default
model Type[~ModelType]

model to perform the operation on

required
queries Union[odmantic.query.QueryExpression, Dict, bool]

query filter to apply

()
sort Optional[Any]

sort expression

None

Exceptions:

Type Description
DocumentParsingError

unable to parse the resulting document

Returns:

Type Description
Optional[~ModelType]

the fetched instance if found otherwise None


Source code in odmantic/engine.py
async def find_one(
    self,
    model: Type[ModelType],
    *queries: Union[
        QueryExpression, Dict, bool
    ],  # bool: allow using binary operators w/o plugin,
    sort: Optional[Any] = None,
) -> Optional[ModelType]:
    """Search for a Model instance matching the query filter provided

    Args:
        model: model to perform the operation on
        queries: query filter to apply
        sort: sort expression

    Raises:
        DocumentParsingError: unable to parse the resulting document

    Returns:
        the fetched instance if found otherwise None

    <!---
    #noqa: DAR401 TypeError
    #noqa: DAR402 DocumentParsingError
    -->
    """
    if not lenient_issubclass(model, Model):
        raise TypeError("Can only call find_one with a Model class")
    results = await self.find(model, *queries, sort=sort, limit=1)
    if len(results) == 0:
        return None
    return results[0]

get_collection(self, model)

Get the motor collection associated to a Model.

Parameters:

Name Type Description Default
model Type[~ModelType]

model class

required

Returns:

Type Description
AsyncIOMotorCollection

the AsyncIO motor collection object

Source code in odmantic/engine.py
def get_collection(self, model: Type[ModelType]) -> AsyncIOMotorCollection:
    """Get the motor collection associated to a Model.

    Args:
        model: model class

    Returns:
        the AsyncIO motor collection object
    """
    return self.database[model.__collection__]

save(self, instance) async

Persist an instance to the database

This method behaves as an 'upsert' operation. If a document already exists with the same primary key, it will be overwritten.

All the other models referenced by this instance will be saved as well.

Parameters:

Name Type Description Default
instance ~ModelType

instance to persist

required

Returns:

Type Description
~ModelType

the saved instance

Note

The save operation actually modify the instance argument in place. However, the instance is still returned for convenience.

Source code in odmantic/engine.py
async def save(self, instance: ModelType) -> ModelType:
    """Persist an instance to the database

    This method behaves as an 'upsert' operation. If a document already exists
    with the same primary key, it will be overwritten.

    All the other models referenced by this instance will be saved as well.

    Args:
        instance: instance to persist

    Returns:
        the saved instance

    NOTE:
        The save operation actually modify the instance argument in place. However,
        the instance is still returned for convenience.

    <!---
    #noqa: DAR401 TypeError
    -->
    """
    if not isinstance(instance, Model):
        raise TypeError("Can only call find_one with a Model class")

    async with await self.client.start_session() as s:
        async with s.start_transaction():
            await self._save(instance, s)
    object.__setattr__(instance, "__fields_modified__", set())
    return instance

save_all(self, instances) async

Persist instances to the database

This method behaves as multiple 'upsert' operations. If one of the document already exists with the same primary key, it will be overwritten.

All the other models referenced by this instance will be recursively saved as well.

Parameters:

Name Type Description Default
instances Sequence[~ModelType]

instances to persist

required

Returns:

Type Description
List[~ModelType]

the saved instances

Note

The save_all operation actually modify the arguments in place. However, the instances are still returned for convenience.

Source code in odmantic/engine.py
async def save_all(self, instances: Sequence[ModelType]) -> List[ModelType]:
    """Persist instances to the database

    This method behaves as multiple 'upsert' operations. If one of the document
    already exists with the same primary key, it will be overwritten.

    All the other models referenced by this instance will be recursively saved as
    well.

    Args:
        instances: instances to persist

    Returns:
        the saved instances

    NOTE:
        The save_all operation actually modify the arguments in place. However, the
        instances are still returned for convenience.
    """
    async with await self.client.start_session() as s:
        async with s.start_transaction():
            added_instances = await asyncio.gather(
                *[self._save(instance, s) for instance in instances]
            )
    return added_instances

odmantic.engine.AIOCursor

This object has to be built from the odmantic.engine.AIOEngine.find method.

An AIOCursor object support multiple async operations:

  • async for: asynchronously iterate over the query results
  • await : when awaited it will return a list of the fetched models