Direct Usage¶
Sometimes you need the throttle to behave differently based on runtime information, such as the size of an uploaded file, the number of records in a query, or whether a previous step succeeded. Direct usage lets you call the throttle methods yourself from inside a handler, giving you full programmatic control.
When to use direct usage¶
- Dynamic cost — the cost of a request depends on something you compute at runtime (e.g., payload size, number of items).
- Multi-step workflows — you want to consume quota only after a successful operation, or conditionally skip throttling based on an intermediate result.
- Conditional throttling — different branches of your handler have different rate limit semantics.
- Per-message WebSocket throttling — you want to throttle each message inside an established WebSocket connection, not just the connection attempt.
For simpler cases, dependency injection or decorators are usually less code.
await throttle.hit(connection): check and apply¶
hit() is the core method. It records a hit, checks the quota, and rejects the request if the limit is exceeded. Returns the connection object.
from fastapi import FastAPI, Request
from traffik import HTTPThrottle
from traffik.backends.inmemory import InMemoryBackend
backend = InMemoryBackend()
app = FastAPI(lifespan=backend.lifespan)
throttle = HTTPThrottle(uid="api:data", rate="100/min")
@app.get("/data")
async def get_data(request: Request):
await throttle.hit(request)
return {"data": []}
If the client is throttled, hit() raises an exception by default (handled by Traffik's default throttled handler, which returns a 429 response), or uses your custom throttled handler if provided.
await throttle.hit(connection, cost=N): custom cost per call¶
Pass a cost argument to override the throttle's default cost for this specific call. Useful when different invocations of the same endpoint should consume different amounts of quota.
from fastapi import FastAPI, Request
from traffik import HTTPThrottle
from traffik.backends.inmemory import InMemoryBackend
backend = InMemoryBackend()
app = FastAPI(lifespan=backend.lifespan)
# Rate limit: 1000 "units" per minute
upload_throttle = HTTPThrottle(uid="uploads", rate="1000/min")
@app.post("/upload")
async def upload_file(request: Request):
body = await request.body()
# Charge 1 unit per KB, minimum 1
cost = max(1, len(body) // 1024)
await upload_throttle.hit(request, cost=cost)
# Process upload...
return {"bytes_received": len(body)}
Tip
A cost of 0 is a no-op. Traffik short-circuits immediately without recording anything or checking the backend. You can use this to conditionally skip throttling without an if branch:
await throttle(connection, context={...}): __call__ alias¶
Calling the throttle instance directly is an alias for hit(). The context keyword argument lets you pass extra information that throttle strategies, identifiers, or rules can read.
from fastapi import FastAPI, Request
from traffik import HTTPThrottle
from traffik.backends.inmemory import InMemoryBackend
backend = InMemoryBackend()
app = FastAPI(lifespan=backend.lifespan)
throttle = HTTPThrottle(uid="api:scoped", rate="200/min")
@app.get("/resource/{tenant_id}")
async def get_resource(tenant_id: str, request: Request):
# Pass tenant context so different tenants have separate quota buckets
await throttle(request, context={"scope": tenant_id})
return {"tenant": tenant_id}
The context dict is merged with the throttle's default context (set at construction time). Call-time values take precedence over defaults in case of key conflicts.
await throttle.check(connection, cost=N): non-consuming pre-check¶
check() inspects the current quota state without consuming any quota. It returns True if there is sufficient quota available for the given cost, False otherwise.
Best-effort only
check() is inherently subject to race conditions (Time-of-Check to Time-of-Use). Between the check and the eventual hit(), another request may consume the remaining quota. Use check() only for fast pre-screening, always follow up with hit() to actually consume quota.
from fastapi import FastAPI, Request
from starlette.exceptions import HTTPException
from traffik import HTTPThrottle
from traffik.backends.inmemory import InMemoryBackend
backend = InMemoryBackend()
app = FastAPI(lifespan=backend.lifespan)
export_throttle = HTTPThrottle(uid="exports", rate="1000/min")
@app.post("/export")
async def export_data(request: Request):
body = await request.body()
cost = max(1, len(body) // 512)
# Pre-check before running the expensive export
if not await export_throttle.check(request, cost=cost):
raise HTTPException(
status_code=429,
detail=f"Rate limit would be exceeded. This request costs {cost} units.",
)
# Quota looks sufficient — proceed with the expensive operation
result = await run_export(body)
# Consume quota now that the operation completed
await export_throttle.hit(request, cost=cost)
return result
When the throttle strategy doesn't support stats (e.g., a custom strategy without a get_stat method), check() returns True optimistically rather than blocking the request.
await throttle.stat(connection): read state without consuming¶
stat(...) returns a StrategyStat object with the current throttle state for the connection. No quota is consumed. Returns None if the strategy doesn't support stats.
The StrategyStat object contains:
| Field | Type | Description |
|---|---|---|
hits_remaining |
int or float |
Quota remaining in the current window |
wait_ms |
float |
Milliseconds until quota resets (0 if quota is available) |
rate |
Rate |
The rate this stat is for |
key |
str |
The namespaced throttle key used for this connection |
from fastapi import FastAPI, Request
from traffik import HTTPThrottle
from traffik.backends.inmemory import InMemoryBackend
backend = InMemoryBackend()
app = FastAPI(lifespan=backend.lifespan)
throttle = HTTPThrottle(uid="api:metered", rate="100/min")
@app.get("/status")
async def quota_status(request: Request):
stat = await throttle.stat(request)
if stat is None:
return {"quota": "unknown"}
return {
"hits_remaining": stat.hits_remaining,
"retry_after_ms": stat.wait_ms,
}
A common pattern is to read the stat before consuming quota to include rate limit information in the response:
@app.get("/data")
async def get_data(request: Request):
stat_before = await throttle.stat(request)
await throttle.hit(request)
return {
"data": [],
"quota_used": 1,
"quota_remaining": (stat_before.hits_remaining - 1) if stat_before else None,
}
WebSocket: is_throttled(websocket) — check state after hit¶
For WebSocket connections, hit() doesn't raise an exception on throttle by default. Instead it sends a rate_limit JSON message to the client and marks the connection state. Use is_throttled() to check whether the last hit was throttled.
from fastapi import FastAPI, WebSocket
from traffik.throttles import WebSocketThrottle, is_throttled
from traffik.backends.inmemory import InMemoryBackend
backend = InMemoryBackend()
app = FastAPI(lifespan=backend.lifespan)
ws_throttle = WebSocketThrottle(uid="ws:messages", rate="60/min")
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
message = await websocket.receive_text()
# Record a hit for each message received
await ws_throttle.hit(websocket)
if is_throttled(websocket):
# Client was notified by the throttle handler.
# Optionally close the connection after repeated violations.
await websocket.close(code=1008, reason="Rate limit exceeded")
return
# Process the message normally
await websocket.send_text(f"Echo: {message}")
Note
is_throttled() reads a flag that hit() sets on websocket.state. It reflects the result of the most recent hit() call only. If you call hit() again, the flag is updated.
Putting it together: dynamic cost with conditional throttling¶
This example shows a multi-step pattern: compute cost from the request body, pre-check availability, then consume quota only on success.
from fastapi import FastAPI, Request
from starlette.exceptions import HTTPException
from traffik import HTTPThrottle
from traffik.backends.inmemory import InMemoryBackend
backend = InMemoryBackend()
app = FastAPI(lifespan=backend.lifespan)
# Each request costs based on number of items processed
# Total budget: 10,000 items per minute
batch_throttle = HTTPThrottle(uid="batch:items", rate="10000/min")
@app.post("/batch")
async def process_batch(request: Request):
data = await request.json()
items = data.get("items", [])
if not items:
return {"processed": 0}
cost = len(items)
# Non-consuming check first
if not await batch_throttle.check(request, cost=cost):
stat = await batch_throttle.stat(request)
retry_after = int((stat.wait_ms / 1000) + 1) if stat else 60
raise HTTPException(
status_code=429,
detail=f"Batch of {cost} items would exceed quota.",
headers={"Retry-After": str(retry_after)},
)
# Run the actual processing
results = await process_items(items)
# Consume quota — actual cost may differ if some items failed
actual_cost = len(results)
await batch_throttle.hit(request, cost=actual_cost)
return {"processed": actual_cost, "results": results}