Datasources#

Dataloaders help avoid the N+1 error. They also help with caching and reducing boilerplate. Here is an example using the HTTP Data Source which can be used to query a remote http resource:

import dataclasses
import pathlib
import httpx
from cannula import CannulaAPI, Context, ResolveInfo
from cannula.datasource import http

from .database import DBUser, DBWidget, create_tables, drop_tables, session
from .api import remote_app

ROOT = pathlib.Path(__file__).parent


# Graph Models that our repositories use to return data
@dataclasses.dataclass
class Widget:
    id: int
    user_id: int
    name: str


@dataclasses.dataclass
class User:
    id: int
    email: str | None
    name: str | None

    # This resolver uses the widgets repository that is on the context
    async def widgets(self, info: ResolveInfo["MyContext"]) -> list["Widget"]:
        return await info.context.widgets.get_widgets(self.id)


# Our HTTP datasources
class UserDatasource(
    http.HTTPDataSource[User],
    graph_model=User,
    base_url="http://localhost",
):

    async def get_user(self, user_id: int) -> User | None:
        response = await self.get(f"/users/{user_id}")
        return self.model_from_response(response)


class WidgetDatasource(
    http.HTTPDataSource[Widget],
    graph_model=Widget,
    base_url="http://localhost",
):

    async def get_widgets(self, user_id: int) -> list[Widget]:
        response = await self.get(f"/users/{user_id}/widgets")
        return self.model_list_from_response(response)


# Create a custom context and add the datasource
class MyContext(Context):
    def __init__(self, client: httpx.AsyncClient) -> None:
        self.users = UserDatasource(client=client)
        self.widgets = WidgetDatasource(client=client)


# Example query that uses the UserRepository on the context object
async def get_user(info: ResolveInfo[MyContext], id: int) -> User | None:
    return await info.context.users.get_user(id)


# Our example graph api
api = CannulaAPI(ROOT / "schema.graphql", root_value={"user": get_user})


async def main():
    await create_tables()
    # Create some data
    async with session() as db_session:
        user = DBUser(id=1, name="ted", email="ted@lasso.com", password="pass")
        widget1 = DBWidget(id=1, name="Hammer", user_id=1, type="tool")
        widget2 = DBWidget(id=2, name="Drill", user_id=1, type="tool")
        widget3 = DBWidget(id=3, name="Nail", user_id=1, type="tool")
        db_session.add_all([user, widget1, widget2, widget3])
        await db_session.commit()

    # Create a httpx client that responds with the 'remote_app' and add to context
    async with httpx.AsyncClient(
        transport=httpx.ASGITransport(app=remote_app)
    ) as client:

        # Run a query and return nested data
        results = await api.call(
            """
            query User {
                user(id: 1) {
                    widgets {
                        name
                    }
                }
                another: user(id: 1) {
                    widgets {
                        name
                    }
                }
            }
            """,
            context=MyContext(client),
        )

    await drop_tables()

    return results


if __name__ == "__main__":
    import asyncio

    print(asyncio.run(main()))

The output looks like this:

DEBUG:cannula.schema:loading schema from file: /home/rmyers/workspace/cannula/examples/datasources/schema.graphql
DEBUG:cannula.schema:Adding default empty Mutation type
DEBUG:cannula.schema:Adding computed directive
DEBUG:asyncio:Using selector: EpollSelector
DEBUG:cannula.datasource.http:cache set for GET 'http://localhost/users/1'
DEBUG:cannula.datasource.http:cache found for GET 'http://localhost/users/1'
INFO:httpx:HTTP Request: GET http://localhost/users/1 "HTTP/1.1 200 OK"
DEBUG:cannula.datasource.http:cache set for GET 'http://localhost/users/1/widgets'
DEBUG:cannula.datasource.http:cache found for GET 'http://localhost/users/1/widgets'
INFO:httpx:HTTP Request: GET http://localhost/users/1/widgets "HTTP/1.1 200 OK"
ExecutionResult(data={
    'user': {'widgets': [{'name': 'Hammer'}, {'name': 'Drill'}, {'name': 'Nail'}]},
    'another': {'widgets': [{'name': 'Hammer'}, {'name': 'Drill'}, {'name': 'Nail'}]}
}, errors=None)

Notice the second request is cached since the datasource already resovled it. This cache is only stored for single GraphQL request. If you want to persist that for longer you’ll need to implement that yourself.

ORM Data Source#

This is useful for mapping SQLAlchemy ORM models to generated graph models. It follows the Repository Pattern and assists with memoized queries to allow your resolvers to run concurrently and return the same results for identical queryies.

Note

This requires that you have SQLAlchemy installed and configured properly.

class cannula.datasource.orm.DatabaseRepository(session_maker, readonly_session_maker=None)#

Repository pattern for performing database queries and returning hydrated models.

This object is constructed with both the database model and a generated graph model from Code Generation. It provides a couple helper methods to memoize queries that are read only operations cutting down on duplicate SQL calls.

This class has an __init_subclass__ that simplifies construction and will raise errors if incorrect types are used or missing. Construct a new object:

class UserRepo(
    DatabaseRepository[DBUser, User],  # Adds specific types for return values
    db_model=DBUser,  # The
    graph_model=User
):

    async def get_user(pk: uuid.UUID) -> User:
        return self.get_model(pk)

Class Arguments:

  • db_model: The database ORM model to perform queries with.

  • graph_model: Subclass of a generated graph model

Parameters:
  • session_maker (async_sessionmaker[AsyncSession]) – Session maker object that is read/write.

  • readonly_session_maker (async_sessionmaker[AsyncSession] | None (default: None)) – Optional readonly session maker object for spliting queries to different nodes.

from_db(db_obj, **kwargs)#

Hook for returning a GraphModel instance from a DBModel.

Parameters:

db_obj (TypeVar(DBModel, bound= DeclarativeBase))

Return type:

TypeVar(GraphModel)

async add(**data)#

Insert a new object in the database.

Parameters:

data (Any)

Return type:

TypeVar(GraphModel)

async get_by_pk(pk)#

Get a single database ORM model by primary key.

Note

This is query is memoized and intended to be used internally but is available to chain other database operations like resolving related objects.

Parameters:

pk (Any | Tuple[Any, ...])

Return type:

TypeVar(DBModel, bound= DeclarativeBase) | None

HTTP Data Source#

Note

This requires the http extras to be installed:

pip install cannula[http]

This is modeled after the apollo http datasource. It uses httpx to preform async requests to any remote service you wish to query. All GET and HEAD requests will be memoized so that they are only performed once per graph resolution.

Example Usage#

@dataclass(kw_only=True)
class User(UserType):
    id: UUID
    name: str

class UserAPI(
    HTTPDataSource[User],
    graph_model=User,
    base_url="https://auth.com",
):

    async def get_user(self, id) -> User:
        response = await self.get(f"/users/{id}")
        return self.model_from_response(response)

    async def get_users(self) -> list[User]:
        response = await self.get(f"/users")
        return self.model_list_from_response(response)

You can then add this to your context to make it available to your resolvers. It is best practice to setup a client for all your http datasources to share in order to handle auth and use the built in connection pool. First add to your context object:

class Context(cannula.Context):

    def __init__(self, client: httpx.AsyncClient) -> None:
        self.userAPI = UserAPI(client=client)
        self.groupAPI = GroupAPI(client=client)

Next in your graph handler function create a httpx client to use:

@api.post('/graph')
async def graph(
    graph_call: Annotated[
        GraphQLExec,
        Depends(GraphQLDepends(cannula_app)),
    ],
    request: Request,
) -> ExecutionResponse:
    # Grab the authorization header and create the client
    authorization = request.headers.get('authorization')
    headers = {'authorization': authorization}

    async with httpx.AsyncClient(headers=headers) as client:
        context = Context(client)
        return await graph_call(context=context)

Finally you can now use this datasource in your resolver functions like so:

async def resolve_person(
    # Using this type hint for the ResolveInfo will make it so that
    # we can inspect the `info` object in our editors and find the `user_api`
    info: cannula.ResolveInfo[Context],
    id: uuid.UUID,
) -> UserType | None:
    return await info.context.user_api.get_user(id)

API Reference#

class cannula.datasource.http.HTTPDataSource(client=None)#

HTTP Data Source

Class Properties:

  • graph_model: This is the object type your schema is expecting to respond with.

  • base_url: Optional base_url to apply to all requests

  • timeout: Default timeout in seconds for requests (5 seconds)

This uses a __init_subclass__ which you can use to create a subclass like:

class UserAPI(
    HTTPDataSource[User],  # Sets the type hints for the 'Response' object
    graph_model=User,
    base_url="https://auth.com",
    timeout=10,
): ...

Then when you construct a instance to use with an optional client:

client = httpx.AsyncClient(headers={'Authorization': 'Bearer my-token'})
my_datasource = UserAPI(client)
Parameters:

client (AsyncClient | None (default: None)) – Optional httpx client to use for requests.

After the response is returned you can use model_from_response() or model_list_from_response() to return graph models for the resolvers to use. This is especially useful if these models have computed functions on them.

did_receive_error(error, request)#

Handle errors from the remote resource

Parameters:
  • error (Exception)

  • request (Request)

async did_receive_response(response, request)#

Hook to alter the response from the server.

example:

async def did_receive_response(
    self, response: httpx.Response, request: Request
) -> typing.Any:
    response.raise_for_status()
    return Widget(**response.json())
Parameters:
  • response (Response)

  • request (Request)

Return type:

List[Dict[Any, Any]] | Dict[Any, Any] | Response

async get(path, **kwargs)#

Convience method to perform a GET fetch()

Parameters:
  • path (str) – path of the request

  • kwargs – these args are passed directly to httpx

Return type:

List[Dict[Any, Any]] | Dict[Any, Any] | Response

async head(path, **kwargs)#

Convience method to perform a HEAD fetch()

Parameters:
  • path (str) – path of the request

  • kwargs – these args are passed directly to httpx

Return type:

List[Dict[Any, Any]] | Dict[Any, Any] | Response

async options(path, **kwargs)#

Convience method to perform a OPTIONS fetch()

Parameters:
  • path (str) – path of the request

  • kwargs – these args are passed directly to httpx

Return type:

List[Dict[Any, Any]] | Dict[Any, Any] | Response

async post(path, **kwargs)#

Convience method to perform a POST fetch()

Parameters:
  • path (str) – path of the request

  • kwargs – these args are passed directly to httpx

Return type:

List[Dict[Any, Any]] | Dict[Any, Any] | Response

async patch(path, **kwargs)#

Convience method to perform a PATCH fetch()

Parameters:
  • path (str) – path of the request

  • kwargs – these args are passed directly to httpx

Return type:

List[Dict[Any, Any]] | Dict[Any, Any] | Response

async put(path, **kwargs)#

Convience method to perform a PUT fetch()

Parameters:
  • path (str) – path of the request

  • kwargs – these args are passed directly to httpx

Return type:

List[Dict[Any, Any]] | Dict[Any, Any] | Response

async delete(path, **kwargs)#

Convience method to perform a DELETE fetch()

Parameters:
  • path (str) – path of the request

  • kwargs – these args are passed directly to httpx

Return type:

List[Dict[Any, Any]] | Dict[Any, Any] | Response

async fetch(method, path, **kwargs)#

Perform request against the httpx.client

This method will perform the requests and optionally memoize the results so the can be reused by other resolvers. The conveince methods just call this with the ‘method’ set. All the kwargs accepted by the httpx client will just pass through.

Parameters:
  • method (str) – the method to perform

  • path (str) – path of the request

  • kwargs – these args are passed directly to httpx

Return type:

List[Dict[Any, Any]] | Dict[Any, Any] | Response

model_from_response(response, **kwargs)#

Return a graph model from a response.

Use this to return a single model from a http response:

async def get_user(self, user_id: int) -> User | None:
    response = await self.get(f"/users/{user_id}")
    return self.model_from_response(response)
Raises:

AttributeError – if the response object is not a dict

Parameters:
  • response (List[Dict[Any, Any]] | Dict[Any, Any] | Response) – a dict response object

  • kwargs – optional attributes to set on the GraphModel

Return type:

TypeVar(GraphModel)

model_list_from_response(response, **kwargs)#

Return a list of graph models from a response.

Use this to return a list of models from a http response:

async def get_users(self) -> list[User]:
    response = await self.get(f"/users/{user_id}")
    return self.model_from_response(response)
Raises:

AttributeError – if the response object is not a list

Parameters:
  • response (List[Dict[Any, Any]] | Dict[Any, Any] | Response) – a list of response objects

  • kwargs – optional attributes to set on the GraphModel

Return type:

List[TypeVar(GraphModel)]