Contrib#
These extras are bundled with cannula but are not ‘core’ to the operation.
Config#
Simple configuration management using dotenv. This provides a BaseConfig class that you can expose env vars and set defaults.
This is not as feature-full as pydantic-settings so use that if you are looking for advanced features. But this will work for simple applications like the ones we auto generate.
Note
Currently only supports the following types:
String
Integer
Boolean
- class cannula.contrib.config.BaseConfig#
Simple environment management with dotenv.
Example:
class Configuration( BaseConfig, prefix="APP", # Optional prefix for env settings env_file=".env_secret" # Optional setting for overriding `.env` filename ): port: int = 9000 host: str = "0.0.0.0" database_uri: str = "mydb.com@user:pass"
Then in your .env_secret file you can override any defaults:
APP_PORT=8000 APP_HOST=127.0.0.1 APP_DATABASE_URI=something_else_here
Your application will see the overridden values and will have the correct types:
assert Configuration.port == 8000 assert Configuration.host == '127.0.0.1'
- cannula.contrib.config.alias(env)#
Set an alias for a field to override the default name.
Example:
class Config(BaseConfig): some_identifier: Annotated[str, alias("REAL_ENV_SETTING")]
- Parameters:
env (
str)- Return type:
str
OpenTelemetry Integration#
This package provides OpenTelemetry instrumentation for CannulaAPI, enabling detailed tracing of GraphQL operations including parsing, validation, and execution phases.
Installation#
pip install opentelemetry-api opentelemetry-sdk
Basic Usage#
There are two ways to use the instrumentation:
1. Using the Factory Function (Recommended)#
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, BatchSpanProcessor
# Initialize OpenTelemetry
provider = TracerProvider()
processor = BatchSpanProcessor(ConsoleSpanExporter())
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
# Create an instrumented API
api = create_instrumented_api(schema="type Query { hello: String }")
2. Using the Class Directly#
# Create an instrumented API using the class
api = InstrumentedCannulaAPI(schema="type Query { hello: String }")
Type Safety#
The instrumented API maintains full type safety with your root types:
from typing import TypedDict
class MyRootType(TypedDict, total=False):
hello: str
# Type-safe instrumented API
api = create_instrumented_api[MyRootType](
schema="type Query { hello: String }",
root_value={"hello": "world"}
)
Generated Spans#
The instrumentation creates the following spans for each GraphQL operation:
graphql.execute(Parent span)- Attributes:
graphql.operation_name: Name of the operation if provided, “anonymous” otherwisegraphql.context: String representation of the contextgraphql.response.size: Size of the response data (if available)
graphql.parse.document(Child span)Created when parsing string queries
- Attributes:
graphql.document: The GraphQL query string
Records parsing errors if they occur
graphql.validation(Child span)Records validation errors if they occur
graphql.subscribe(Subscription span)- Attributes:
graphql.operation_name: Name of the operation if provided, “anonymous” otherwisegraphql.context: String representation of the context
Error Handling#
All spans automatically capture errors with appropriate status codes:
Parsing errors are captured in the parse span
Validation errors are captured in the validate span
Execution errors are captured in the execute span
All errors are recorded using
span.record_exception()and set appropriate error status
Integration with Other Exporters#
You can use any OpenTelemetry exporter. Here are some common examples:
Jaeger#
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor
jaeger_exporter = JaegerExporter(
agent_host_name="localhost",
agent_port=6831,
)
provider.add_span_processor(BatchSpanProcessor(jaeger_exporter))
OTLP (OpenTelemetry Protocol)#
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
otlp_exporter = OTLPSpanExporter(
endpoint="your-otlp-endpoint:4317",
insecure=True
)
provider.add_span_processor(BatchSpanProcessor(otlp_exporter))
Advanced Configuration#
Custom Context#
The instrumentation will automatically include any context from your CannulaAPI context class:
class MyContext(Context):
def __init__(self, request):
self.user_id = request.headers.get('X-User-Id')
self.tenant_id = request.headers.get('X-Tenant-Id')
api = InstrumentedCannulaAPI(
schema="...",
context=MyContext
)
Adding Custom Attributes#
You can extend the instrumentation by adding custom middleware that adds attributes to the spans:
from opentelemetry import trace
async def custom_middleware(resolve, root, info, **args):
current_span = trace.get_current_span()
current_span.set_attribute("custom.attribute", "value")
return await resolve(root, info, **args)
api = InstrumentedCannulaAPI(
schema="...",
middleware=[custom_middleware]
)
Testing#
When testing instrumented code, you can use the provided test utilities:
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
# Setup test provider
provider = TracerProvider()
provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
trace.set_tracer_provider(provider)
# Your tests...
Best Practices#
Use BatchSpanProcessor in production for better performance
Configure appropriate sampling rates for high-traffic services
Add relevant business context to spans via middleware
Monitor the overhead of tracing in your application
Consider using sampling in production environments
Performance Considerations#
The OpenTelemetry instrumentation adds minimal overhead to your application. However, consider the following:
Use BatchSpanProcessor instead of SimpleSpanProcessor in production
Configure appropriate sampling rates for high-traffic services
Monitor the memory usage of your span processors
Configure appropriate flush intervals for batch processors
- class cannula.contrib.otel.InstrumentedCannulaAPI(schema, context=None, middleware=[], root_value=None, scalars=[], logger=None, level=10, **kwargs)#
- Parameters:
schema (
str|DocumentNode|Path)context (
Type[Context] |None(default:None))middleware (
List[Any] (default:[]))root_value (
TypeVar(RootType, covariant=True) |None(default:None))scalars (
List[ScalarInterface] (default:[]))logger (
Logger|None(default:None))level (
int(default:10))
- async call(document, *, variables=None, operation_name=None, context=None, request=None)#
Preform a query against the schema.
This is meant to be called in an asyncio.loop, if you are using a web framework that is synchronous use the call_sync method.
- Parameters:
document (
DocumentNode|str) – The query or mutation to execute.variables (
Dict[str,Any] |None(default:None)) – Dictionary of variable values.operation_name (
str|None(default:None)) – The named operation this can be used to cache queries.context (
Any|None(default:None)) – The context instance to use for this operation.request (
Any|None(default:None)) – The original request instance for the query, this is used when no context is passed. By default it will be set on the info object: info.context.request
- Return type:
ExecutionResult
- parse_document(document)#
Parse and store the document in lru_cache.
- Parameters:
document (
str)- Return type:
- async subscribe(document, *, variables=None, operation_name=None, context=None, request=None)#
Preform a query against the schema.
This is meant to be called in an asyncio.loop, if you are using a web framework that is synchronous use the call_sync method.
- Parameters:
document (
DocumentNode|str) – The query or mutation to execute.variables (
Dict[str,Any] |None(default:None)) – Dictionary of variable values.operation_name (
str|None(default:None)) – The named operation this can be used to cache queries.context (
Any|None(default:None)) – The context instance to use for this operation.request (
Any|None(default:None)) – The original request instance for the query, this is used when no context is passed. By default it will be set on the info object: info.context.request
- Return type:
AsyncIterable[ExecutionResult] |ExecutionResult
- validate(document)#
Validate the document against the schema and store results in lru_cache.
- Parameters:
document (
DocumentNode)- Return type:
List[GraphQLError]
- cannula.contrib.otel.create_instrumented_api(*args, **kwargs)#
Factory function to create an instrumented CannulaAPI instance. This provides a more convenient way to create an instrumented API without explicitly using the class.
- Return type:
- Usage:
api = create_instrumented_api(schema=”type Query { hello: String }”)
- cannula.contrib.otel.resolve_name(func)#
Return a string with the dotted path of the function
- Parameters:
func (
Callable)- Return type:
str
- cannula.contrib.otel.trace_field_resolver(source, info, **kwargs)#
Defualt field resolver that wraps callables in spans.
Typically the fields are simple attributes and these do not matter for tracing and in fact may slow down processing since they are so common.
This resolver will instead just add spans and attributes for anything that is callable.
- Parameters:
source (
GraphQLObjectType)info (
GraphQLResolveInfo)
- cannula.contrib.otel.trace_middleware(next_fn, parent_object, info, **kwargs)#
Trace resolver functions middleware.
This will add spans to any resolvers that are callable.
Usage:
api = create_instrumented_api( schema="...", middleware=[trace_middleware] )
- Parameters:
next_fn (
Callable[...,Any])parent_object (
GraphQLObjectType)info (
GraphQLResolveInfo)