odmantic.engine
odmantic.engine.AIOEngine
¶
The AIOEngine object is responsible for handling database operations with MongoDB in an asynchronous way using motor.
__init__(self, motor_client=None, database='test')
special
¶
Engine constructor.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
motor_client |
AsyncIOMotorClient |
instance of an AsyncIO motor client. If None, a default one will be created |
None |
database |
str |
name of the database to use |
'test' |
Source code in odmantic/engine.py
def __init__(self, motor_client: AsyncIOMotorClient = None, database: str = "test"):
"""Engine constructor.
Args:
motor_client: instance of an AsyncIO motor client. If None, a default one
will be created
database: name of the database to use
<!---
#noqa: DAR401 ValueError
-->
"""
# https://docs.mongodb.com/manual/reference/limits/#naming-restrictions
forbidden_characters = _FORBIDDEN_DATABASE_CHARACTERS.intersection(
set(database)
)
if len(forbidden_characters) > 0:
raise ValueError(
f"database name cannot contain: {' '.join(forbidden_characters)}"
)
if motor_client is None:
motor_client = AsyncIOMotorClient()
self.client = motor_client
self.database_name = database
self.database = motor_client[self.database_name]
count(self, model, *queries)
async
¶
Get the count of documents matching a query
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
Type[~ModelType] |
model to perform the operation on |
required |
queries |
Union[odmantic.query.QueryExpression, Dict, bool] |
query filters to apply |
() |
Returns:
Type | Description |
---|---|
int |
number of document matching the query |
Source code in odmantic/engine.py
async def count(
self, model: Type[ModelType], *queries: Union[QueryExpression, Dict, bool]
) -> int:
"""Get the count of documents matching a query
Args:
model: model to perform the operation on
queries: query filters to apply
Returns:
number of document matching the query
<!---
#noqa: DAR401 TypeError
-->
"""
if not lenient_issubclass(model, Model):
raise TypeError("Can only call count with a Model class")
query = AIOEngine._build_query(*queries)
collection = self.database[model.__collection__]
count = await collection.count_documents(query)
return int(count)
delete(self, instance)
async
¶
Delete an instance from the database
Parameters:
Name | Type | Description | Default |
---|---|---|---|
instance |
~ModelType |
the instance to delete |
required |
Exceptions:
Type | Description |
---|---|
DocumentNotFoundError |
the instance has not been persisted to the database |
Source code in odmantic/engine.py
async def delete(self, instance: ModelType) -> None:
"""Delete an instance from the database
Args:
instance: the instance to delete
Raises:
DocumentNotFoundError: the instance has not been persisted to the database
"""
# TODO handle cascade deletion
collection = self.database[instance.__collection__]
pk_name = instance.__primary_field__
result = await collection.delete_many({"_id": getattr(instance, pk_name)})
count = int(result.deleted_count)
if count == 0:
raise DocumentNotFoundError(instance)
find(self, model, *queries, *, sort=None, skip=0, limit=None)
¶
Search for Model instances matching the query filter provided
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
Type[~ModelType] |
model to perform the operation on |
required |
queries |
Union[odmantic.query.QueryExpression, Dict, bool] |
query filter to apply |
() |
sort |
Optional[Any] |
sort expression |
None |
skip |
int |
number of document to skip |
0 |
limit |
Optional[int] |
maximum number of instance fetched |
None |
Exceptions:
Type | Description |
---|---|
DocumentParsingError |
unable to parse one of the resulting documents |
Returns:
Type | Description |
---|---|
odmantic.engine.AIOCursor[~ModelType] |
odmantic.engine.AIOCursor of the query |
Source code in odmantic/engine.py
def find(
self,
model: Type[ModelType],
*queries: Union[
QueryExpression, Dict, bool
], # bool: allow using binary operators with mypy
sort: Optional[Any] = None,
skip: int = 0,
limit: Optional[int] = None,
) -> AIOCursor[ModelType]:
"""Search for Model instances matching the query filter provided
Args:
model: model to perform the operation on
queries: query filter to apply
sort: sort expression
skip: number of document to skip
limit: maximum number of instance fetched
Raises:
DocumentParsingError: unable to parse one of the resulting documents
Returns:
[odmantic.engine.AIOCursor][] of the query
<!---
#noqa: DAR401 ValueError
#noqa: DAR401 TypeError
#noqa: DAR402 DocumentParsingError
-->
"""
if not lenient_issubclass(model, Model):
raise TypeError("Can only call find with a Model class")
sort_expression = self._validate_sort_argument(sort)
if limit is not None and limit <= 0:
raise ValueError("limit has to be a strict positive value or None")
if skip < 0:
raise ValueError("skip has to be a positive integer")
query = AIOEngine._build_query(*queries)
collection = self.get_collection(model)
pipeline: List[Dict] = [{"$match": query}]
if sort_expression is not None:
pipeline.append({"$sort": sort_expression})
if skip > 0:
pipeline.append({"$skip": skip})
if limit is not None and limit > 0:
pipeline.append({"$limit": limit})
pipeline.extend(AIOEngine._cascade_find_pipeline(model))
motor_cursor = collection.aggregate(pipeline)
return AIOCursor(model, motor_cursor)
find_one(self, model, *queries, *, sort=None)
async
¶
Search for a Model instance matching the query filter provided
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
Type[~ModelType] |
model to perform the operation on |
required |
queries |
Union[odmantic.query.QueryExpression, Dict, bool] |
query filter to apply |
() |
sort |
Optional[Any] |
sort expression |
None |
Exceptions:
Type | Description |
---|---|
DocumentParsingError |
unable to parse the resulting document |
Returns:
Type | Description |
---|---|
Optional[~ModelType] |
the fetched instance if found otherwise None |
Source code in odmantic/engine.py
async def find_one(
self,
model: Type[ModelType],
*queries: Union[
QueryExpression, Dict, bool
], # bool: allow using binary operators w/o plugin,
sort: Optional[Any] = None,
) -> Optional[ModelType]:
"""Search for a Model instance matching the query filter provided
Args:
model: model to perform the operation on
queries: query filter to apply
sort: sort expression
Raises:
DocumentParsingError: unable to parse the resulting document
Returns:
the fetched instance if found otherwise None
<!---
#noqa: DAR401 TypeError
#noqa: DAR402 DocumentParsingError
-->
"""
if not lenient_issubclass(model, Model):
raise TypeError("Can only call find_one with a Model class")
results = await self.find(model, *queries, sort=sort, limit=1)
if len(results) == 0:
return None
return results[0]
get_collection(self, model)
¶
Get the motor collection associated to a Model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
Type[~ModelType] |
model class |
required |
Returns:
Type | Description |
---|---|
AsyncIOMotorCollection |
the AsyncIO motor collection object |
Source code in odmantic/engine.py
def get_collection(self, model: Type[ModelType]) -> AsyncIOMotorCollection:
"""Get the motor collection associated to a Model.
Args:
model: model class
Returns:
the AsyncIO motor collection object
"""
return self.database[model.__collection__]
save(self, instance)
async
¶
Persist an instance to the database
This method behaves as an 'upsert' operation. If a document already exists with the same primary key, it will be overwritten.
All the other models referenced by this instance will be saved as well.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
instance |
~ModelType |
instance to persist |
required |
Returns:
Type | Description |
---|---|
~ModelType |
the saved instance |
Note
The save operation actually modify the instance argument in place. However, the instance is still returned for convenience.
Source code in odmantic/engine.py
async def save(self, instance: ModelType) -> ModelType:
"""Persist an instance to the database
This method behaves as an 'upsert' operation. If a document already exists
with the same primary key, it will be overwritten.
All the other models referenced by this instance will be saved as well.
Args:
instance: instance to persist
Returns:
the saved instance
NOTE:
The save operation actually modify the instance argument in place. However,
the instance is still returned for convenience.
<!---
#noqa: DAR401 TypeError
-->
"""
if not isinstance(instance, Model):
raise TypeError("Can only call find_one with a Model class")
async with await self.client.start_session() as s:
async with s.start_transaction():
await self._save(instance, s)
object.__setattr__(instance, "__fields_modified__", set())
return instance
save_all(self, instances)
async
¶
Persist instances to the database
This method behaves as multiple 'upsert' operations. If one of the document already exists with the same primary key, it will be overwritten.
All the other models referenced by this instance will be recursively saved as well.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
instances |
Sequence[~ModelType] |
instances to persist |
required |
Returns:
Type | Description |
---|---|
List[~ModelType] |
the saved instances |
Note
The save_all operation actually modify the arguments in place. However, the instances are still returned for convenience.
Source code in odmantic/engine.py
async def save_all(self, instances: Sequence[ModelType]) -> List[ModelType]:
"""Persist instances to the database
This method behaves as multiple 'upsert' operations. If one of the document
already exists with the same primary key, it will be overwritten.
All the other models referenced by this instance will be recursively saved as
well.
Args:
instances: instances to persist
Returns:
the saved instances
NOTE:
The save_all operation actually modify the arguments in place. However, the
instances are still returned for convenience.
"""
async with await self.client.start_session() as s:
async with s.start_transaction():
added_instances = await asyncio.gather(
*[self._save(instance, s) for instance in instances]
)
return added_instances
odmantic.engine.AIOCursor
¶
This object has to be built from the odmantic.engine.AIOEngine.find method.
An AIOCursor object support multiple async operations:
- async for: asynchronously iterate over the query results
- await : when awaited it will return a list of the fetched models