Datasources#
Dataloaders help avoid the N+1 error. They also help with caching and reducing boilerplate. Here is an example using the HTTP Data Source which can be used to query a remote http resource:
import dataclasses
import pathlib
import httpx
from cannula import CannulaAPI, Context, ResolveInfo
from cannula.datasource import http
from .database import DBUser, DBWidget, create_tables, drop_tables, session
from .api import remote_app
ROOT = pathlib.Path(__file__).parent
# Graph Models that our repositories use to return data
@dataclasses.dataclass
class Widget:
id: int
user_id: int
name: str
@dataclasses.dataclass
class User:
id: int
email: str | None
name: str | None
# This resolver uses the widgets repository that is on the context
async def widgets(self, info: ResolveInfo["MyContext"]) -> list["Widget"]:
return await info.context.widgets.get_widgets(self.id)
# Our HTTP datasources
class UserDatasource(
http.HTTPDataSource[User],
graph_model=User,
base_url="http://localhost",
):
async def get_user(self, user_id: int) -> User | None:
response = await self.get(f"/users/{user_id}")
return self.model_from_response(response)
class WidgetDatasource(
http.HTTPDataSource[Widget],
graph_model=Widget,
base_url="http://localhost",
):
async def get_widgets(self, user_id: int) -> list[Widget]:
response = await self.get(f"/users/{user_id}/widgets")
return self.model_list_from_response(response)
# Create a custom context and add the datasource
class MyContext(Context):
def __init__(self, client: httpx.AsyncClient) -> None:
self.users = UserDatasource(client=client)
self.widgets = WidgetDatasource(client=client)
# Example query that uses the UserRepository on the context object
async def get_user(info: ResolveInfo[MyContext], id: int) -> User | None:
return await info.context.users.get_user(id)
# Our example graph api
api = CannulaAPI(ROOT / "schema.graphql", root_value={"user": get_user})
async def main():
await create_tables()
# Create some data
async with session() as db_session:
user = DBUser(id=1, name="ted", email="ted@lasso.com", password="pass")
widget1 = DBWidget(id=1, name="Hammer", user_id=1, type="tool")
widget2 = DBWidget(id=2, name="Drill", user_id=1, type="tool")
widget3 = DBWidget(id=3, name="Nail", user_id=1, type="tool")
db_session.add_all([user, widget1, widget2, widget3])
await db_session.commit()
# Create a httpx client that responds with the 'remote_app' and add to context
async with httpx.AsyncClient(
transport=httpx.ASGITransport(app=remote_app)
) as client:
# Run a query and return nested data
results = await api.call(
"""
query User {
user(id: 1) {
widgets {
name
}
}
another: user(id: 1) {
widgets {
name
}
}
}
""",
context=MyContext(client),
)
await drop_tables()
return results
if __name__ == "__main__":
import asyncio
print(asyncio.run(main()))
The output looks like this:
DEBUG:cannula.schema:loading schema from file: /home/rmyers/workspace/cannula/examples/datasources/schema.graphql
DEBUG:cannula.schema:Adding default empty Mutation type
DEBUG:cannula.schema:Adding computed directive
DEBUG:asyncio:Using selector: EpollSelector
DEBUG:cannula.datasource.http:cache set for GET 'http://localhost/users/1'
DEBUG:cannula.datasource.http:cache found for GET 'http://localhost/users/1'
INFO:httpx:HTTP Request: GET http://localhost/users/1 "HTTP/1.1 200 OK"
DEBUG:cannula.datasource.http:cache set for GET 'http://localhost/users/1/widgets'
DEBUG:cannula.datasource.http:cache found for GET 'http://localhost/users/1/widgets'
INFO:httpx:HTTP Request: GET http://localhost/users/1/widgets "HTTP/1.1 200 OK"
ExecutionResult(data={
'user': {'widgets': [{'name': 'Hammer'}, {'name': 'Drill'}, {'name': 'Nail'}]},
'another': {'widgets': [{'name': 'Hammer'}, {'name': 'Drill'}, {'name': 'Nail'}]}
}, errors=None)
Notice the second request is cached since the datasource already resovled it. This cache is only stored for single GraphQL request. If you want to persist that for longer you’ll need to implement that yourself.
ORM Data Source#
This is useful for mapping SQLAlchemy ORM models to generated graph models. It follows the Repository Pattern and assists with memoized queries to allow your resolvers to run concurrently and return the same results for identical queryies.
Note
This requires that you have SQLAlchemy installed and configured properly.
- class cannula.datasource.orm.DatabaseRepository(session_maker, readonly_session_maker=None)#
Repository pattern for performing database queries and returning hydrated models.
This object is constructed with both the database model and a generated graph model from Code Generation. It provides a couple helper methods to memoize queries that are read only operations cutting down on duplicate SQL calls.
This class has an __init_subclass__ that simplifies construction and will raise errors if incorrect types are used or missing. Construct a new object:
class UserRepo( DatabaseRepository[DBUser, User], # Adds specific types for return values db_model=DBUser, # The graph_model=User ): async def get_user(pk: uuid.UUID) -> User: return self.get_model(pk)
Class Arguments:
db_model: The database ORM model to perform queries with.
graph_model: Subclass of a generated graph model
- Parameters:
session_maker (
async_sessionmaker[AsyncSession]) – Session maker object that is read/write.readonly_session_maker (
async_sessionmaker[AsyncSession] |None(default:None)) – Optional readonly session maker object for spliting queries to different nodes.
- from_db(db_obj, **kwargs)#
Hook for returning a GraphModel instance from a DBModel.
- Parameters:
db_obj (
TypeVar(DBModel, bound=DeclarativeBase))- Return type:
TypeVar(GraphModel)
- async add(**data)#
Insert a new object in the database.
- Parameters:
data (
Any)- Return type:
TypeVar(GraphModel)
- async get_by_pk(pk)#
Get a single database ORM model by primary key.
Note
This is query is memoized and intended to be used internally but is available to chain other database operations like resolving related objects.
- Parameters:
pk (
Any|Tuple[Any,...])- Return type:
TypeVar(DBModel, bound=DeclarativeBase) |None
HTTP Data Source#
Note
This requires the http extras to be installed:
pip install cannula[http]
This is modeled after the apollo http datasource. It uses httpx to preform async requests to any remote service you wish to query. All GET and HEAD requests will be memoized so that they are only performed once per graph resolution.
Example Usage#
@dataclass(kw_only=True)
class User(UserType):
id: UUID
name: str
class UserAPI(
HTTPDataSource[User],
graph_model=User,
base_url="https://auth.com",
):
async def get_user(self, id) -> User:
response = await self.get(f"/users/{id}")
return self.model_from_response(response)
async def get_users(self) -> list[User]:
response = await self.get(f"/users")
return self.model_list_from_response(response)
You can then add this to your context to make it available to your resolvers. It is best practice to setup a client for all your http datasources to share in order to handle auth and use the built in connection pool. First add to your context object:
class Context(cannula.Context):
def __init__(self, client: httpx.AsyncClient) -> None:
self.userAPI = UserAPI(client=client)
self.groupAPI = GroupAPI(client=client)
Next in your graph handler function create a httpx client to use:
@api.post('/graph')
async def graph(
graph_call: Annotated[
GraphQLExec,
Depends(GraphQLDepends(cannula_app)),
],
request: Request,
) -> ExecutionResponse:
# Grab the authorization header and create the client
authorization = request.headers.get('authorization')
headers = {'authorization': authorization}
async with httpx.AsyncClient(headers=headers) as client:
context = Context(client)
return await graph_call(context=context)
Finally you can now use this datasource in your resolver functions like so:
async def resolve_person(
# Using this type hint for the ResolveInfo will make it so that
# we can inspect the `info` object in our editors and find the `user_api`
info: cannula.ResolveInfo[Context],
id: uuid.UUID,
) -> UserType | None:
return await info.context.user_api.get_user(id)
API Reference#
- class cannula.datasource.http.HTTPDataSource(client=None)#
HTTP Data Source
Class Properties:
graph_model: This is the object type your schema is expecting to respond with.
base_url: Optional base_url to apply to all requests
timeout: Default timeout in seconds for requests (5 seconds)
This uses a __init_subclass__ which you can use to create a subclass like:
class UserAPI( HTTPDataSource[User], # Sets the type hints for the 'Response' object graph_model=User, base_url="https://auth.com", timeout=10, ): ...
Then when you construct a instance to use with an optional client:
client = httpx.AsyncClient(headers={'Authorization': 'Bearer my-token'}) my_datasource = UserAPI(client)
- Parameters:
client (
AsyncClient|None(default:None)) – Optional httpx client to use for requests.
After the response is returned you can use
model_from_response()ormodel_list_from_response()to return graph models for the resolvers to use. This is especially useful if these models have computed functions on them.- did_receive_error(error, request)#
Handle errors from the remote resource
- Parameters:
error (
Exception)request (
Request)
- async did_receive_response(response, request)#
Hook to alter the response from the server.
example:
async def did_receive_response( self, response: httpx.Response, request: Request ) -> typing.Any: response.raise_for_status() return Widget(**response.json())
- Parameters:
response (
Response)request (
Request)
- Return type:
List[Dict[Any,Any]] |Dict[Any,Any] |Response
- async get(path, **kwargs)#
Convience method to perform a GET
fetch()- Parameters:
path (
str) – path of the requestkwargs – these args are passed directly to httpx
- Return type:
List[Dict[Any,Any]] |Dict[Any,Any] |Response
- async head(path, **kwargs)#
Convience method to perform a HEAD
fetch()- Parameters:
path (
str) – path of the requestkwargs – these args are passed directly to httpx
- Return type:
List[Dict[Any,Any]] |Dict[Any,Any] |Response
- async options(path, **kwargs)#
Convience method to perform a OPTIONS
fetch()- Parameters:
path (
str) – path of the requestkwargs – these args are passed directly to httpx
- Return type:
List[Dict[Any,Any]] |Dict[Any,Any] |Response
- async post(path, **kwargs)#
Convience method to perform a POST
fetch()- Parameters:
path (
str) – path of the requestkwargs – these args are passed directly to httpx
- Return type:
List[Dict[Any,Any]] |Dict[Any,Any] |Response
- async patch(path, **kwargs)#
Convience method to perform a PATCH
fetch()- Parameters:
path (
str) – path of the requestkwargs – these args are passed directly to httpx
- Return type:
List[Dict[Any,Any]] |Dict[Any,Any] |Response
- async put(path, **kwargs)#
Convience method to perform a PUT
fetch()- Parameters:
path (
str) – path of the requestkwargs – these args are passed directly to httpx
- Return type:
List[Dict[Any,Any]] |Dict[Any,Any] |Response
- async delete(path, **kwargs)#
Convience method to perform a DELETE
fetch()- Parameters:
path (
str) – path of the requestkwargs – these args are passed directly to httpx
- Return type:
List[Dict[Any,Any]] |Dict[Any,Any] |Response
- async fetch(method, path, **kwargs)#
Perform request against the httpx.client
This method will perform the requests and optionally memoize the results so the can be reused by other resolvers. The conveince methods just call this with the ‘method’ set. All the kwargs accepted by the httpx client will just pass through.
- Parameters:
method (
str) – the method to performpath (
str) – path of the requestkwargs – these args are passed directly to httpx
- Return type:
List[Dict[Any,Any]] |Dict[Any,Any] |Response
- model_from_response(response, **kwargs)#
Return a graph model from a response.
Use this to return a single model from a http response:
async def get_user(self, user_id: int) -> User | None: response = await self.get(f"/users/{user_id}") return self.model_from_response(response)
- Raises:
AttributeError – if the response object is not a dict
- Parameters:
response (
List[Dict[Any,Any]] |Dict[Any,Any] |Response) – a dict response objectkwargs – optional attributes to set on the GraphModel
- Return type:
TypeVar(GraphModel)
- model_list_from_response(response, **kwargs)#
Return a list of graph models from a response.
Use this to return a list of models from a http response:
async def get_users(self) -> list[User]: response = await self.get(f"/users/{user_id}") return self.model_from_response(response)
- Raises:
AttributeError – if the response object is not a list
- Parameters:
response (
List[Dict[Any,Any]] |Dict[Any,Any] |Response) – a list of response objectskwargs – optional attributes to set on the GraphModel
- Return type:
List[TypeVar(GraphModel)]