Building a Production-Ready FastAPI-Based Python App Serving Online and Offline Requests
Introduction
FastAPI has gained massive popularity for its asynchronous capabilities, auto-generated documentation, and performance. In this blog post, we’ll walk you through building a FastAPI-based Python application that handles both online requests (synchronous API calls) and offline requests (background tasks using queues like Amazon SQS). We’ll cover best practices for production readiness, handling thread control, and implementing scheduled tasks.
1. Application Design Overview
A robust production-ready FastAPI app architecture typically includes:
- API Layer: Handles synchronous API requests.
- Background Worker Layer: Handles long-running or asynchronous tasks using a queue (e.g., SQS).
- Database/Cache Layer: Stores data needed for responses.
- Task Scheduler: For periodic tasks.
- Monitoring and Health Checks: For observability.
Here’s a high-level design diagram for clarity:
API Gateway → FastAPI App → (Sync Response)
→ SQS Queue → Background Worker → DB/Cache
2. Setting Up the FastAPI Application
We’ll start by creating a FastAPI project structure:
app/
├── main.py
├── api/
│ └── endpoints.py
├── workers/
│ └── background_tasks.py
├── models/
│ └── schema.py
├── config.py
└── utils/
└── scheduler.py
main.py (Entry Point)
from fastapi import FastAPI, BackgroundTasks
from app.api.endpoints import router as api_router
import boto3
import os
app = FastAPI()
# Initialize SQS client
sqs_client = boto3.client(
"sqs",
region_name=os.getenv("AWS_REGION"),
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY")
)
@app.get("/health")
def health_check():
return {"status": "ok"}
# Include API endpoints
app.include_router(api_router)
3. Handling Online Requests (Synchronous API)
Here’s an example API endpoint that directly handles a user request.
# endpoints.py
from fastapi import APIRouter, HTTPException
from app.workers.background_tasks import enqueue_task
router = APIRouter()
@router.post("/process-online")
def process_online(data: dict):
# Perform synchronous processing
result = {"message": "Processed online", "input": data}
return result
@router.post("/process-offline")
def process_offline(data: dict):
enqueue_task(data)
return {"message": "Task enqueued for offline processing"}
4. Handling Offline Requests Using SQS (Background Worker)
A background worker listens to the SQS queue and processes messages.
workers/background_tasks.py:
import boto3
import time
import os
sqs_queue_url = os.getenv("SQS_QUEUE_URL")
# Enqueue task in SQS queue
def enqueue_task(data):
sqs_client.send_message(
QueueUrl=sqs_queue_url,
MessageBody=str(data)
)
print("Message sent to SQS")
# Worker function that polls the SQS queue
def process_sqs_messages():
while True:
messages = sqs_client.receive_message(
QueueUrl=sqs_queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=10
)
if 'Messages' in messages:
for message in messages['Messages']:
print("Processing message:", message['Body'])
# Perform heavy processing here
time.sleep(2) # Simulate processing time
# Delete message after processing
sqs_client.delete_message(
QueueUrl=sqs_queue_url,
ReceiptHandle=message['ReceiptHandle']
)
To run the background worker:
python -m app.workers.background_tasks.process_sqs_messages
5. Controlling Threads in FastAPI
By default, FastAPI handles concurrency using asynchronous coroutines. However, certain tasks may require threading.
Example: Running Tasks in Parallel
You can use concurrent.futures.ThreadPoolExecutor
for parallel execution.
import concurrent.futures
from fastapi import BackgroundTasks
executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
@app.get("/heavy-task")
def run_heavy_task():
future = executor.submit(expensive_function)
return {"message": "Task is running"}
6. Lifespan in FastAPI
FastAPI provides a lifespan handler to run code when the app starts and shuts down.
from fastapi import FastAPI
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
print("Starting up...")
yield
print("Shutting down...")
app = FastAPI(lifespan=lifespan)
Use lifespan to:
- Initialize database connections.
- Start background tasks.
7. Scheduled Tasks in FastAPI
While FastAPI doesn’t have built-in support for scheduled tasks, you can use third-party libraries like APScheduler
or Celery
for periodic jobs.
Example with APScheduler:
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
# Define the periodic job
@scheduler.scheduled_job("interval", seconds=60)
def periodic_task():
print("Running scheduled task")
scheduler.start()
@app.on_event("shutdown")
def shutdown_event():
scheduler.shutdown()
Example Use Case
- Online API Request: A user submits data that gets immediately processed and returned.
- Offline Processing: Large files or time-intensive requests are enqueued in SQS and processed later.
- Scheduled Task: Every minute, a report of processed tasks is sent to a monitoring system.
8. Production-Ready Considerations
- Environment Variables: Use
.env
files and AWS Secrets Manager for sensitive configurations. - Security: Implement OAuth2 or JWT for API authentication.
- Scalability: Deploy FastAPI behind a load balancer (e.g., AWS ALB).
- Monitoring: Integrate with Prometheus/Grafana for performance metrics.
- Retries and Dead-letter Queues: Configure SQS to handle retry logic and move unprocessed tasks to a dead-letter queue (DLQ).
Conclusion
FastAPI’s lightweight and asynchronous capabilities make it an excellent choice for building APIs that serve both online and offline requests. By leveraging tools like SQS for background processing and APScheduler for periodic tasks, you can create a robust, production-ready system. With thoughtful design, proper threading control, and effective monitoring, your FastAPI application will be scalable and efficient in real-world scenarios.