LCO Backend Architecture Guide¶
Overview¶
This guide provides a comprehensive understanding of the FastAPI backend architecture for the LCO Construction Consulting application. The backend follows modern Python best practices with a clean, modular architecture that separates concerns and promotes maintainability.
Table of Contents¶
- Architecture Principles
- Project Structure
- Core Components
- Data Flow
- Integration with Frontend
- Key Patterns
Architecture Principles¶
The backend is built on several key architectural principles:
1. Layered Architecture¶
The application follows a clear separation of concerns with distinct layers: - API Layer (Routers) - HTTP endpoints and request handling - Repository Layer - Data access and persistence logic - Schema Layer - Data validation and serialization - Database Layer - Connection management and configuration
2. Dependency Injection¶
FastAPI's dependency injection system is used throughout to: - Manage database connections - Share repository instances - Handle cross-cutting concerns - Facilitate testing
3. Async/Await First¶
Full async support for: - Non-blocking database operations - Concurrent request handling - Improved scalability
4. Type Safety¶
Pydantic models provide: - Runtime validation - Type hints throughout - Auto-generated API documentation - Consistent serialization
Project Structure¶
backend/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI application entry point
│ │
│ ├── core/ # Core configuration
│ │ ├── __init__.py
│ │ └── config.py # Settings & environment variables (Pydantic Settings)
│ │
│ ├── db/ # Database layer
│ │ ├── __init__.py
│ │ └── cosmos.py # Cosmos DB async client & initialization
│ │
│ ├── schemas/ # Pydantic models (validation & serialization)
│ │ ├── __init__.py
│ │ ├── base.py # BaseSchema, BaseDocument, PaginatedResponse
│ │ ├── common.py # Common shared models
│ │ ├── client.py # Client schemas
│ │ ├── project.py # Project schemas
│ │ ├── crew.py # Crew, CrewTrade, CrewMember schemas
│ │ ├── service.py # Service schemas
│ │ ├── service_crew.py # Service crew schemas (with indirect costs)
│ │ ├── equipment.py # Equipment schemas
│ │ ├── equipment_tag.py # Equipment tag schemas
│ │ ├── material.py # Material schemas
│ │ ├── wbs.py # WBS schemas
│ │ ├── task.py # Task tracking schemas
│ │ └── mto.py # Material Takeoff (MTO) schemas (1361 lines!)
│ │
│ ├── repositories/ # Data access layer (Repository pattern)
│ │ ├── __init__.py
│ │ ├── base.py # Generic BaseRepository with CRUD + batch operations
│ │ ├── client.py # ClientRepository
│ │ ├── project.py # ProjectRepository
│ │ ├── crew.py # CrewRepository, CrewTradeRepository, CrewMemberRepository
│ │ ├── service.py # ServiceRepository
│ │ ├── service_crew.py # ServiceCrewRepository
│ │ ├── equipment.py # EquipmentRepository
│ │ ├── equipment_tag.py # EquipmentTagRepository
│ │ ├── material.py # MaterialRepository
│ │ ├── wbs.py # WBSRepository
│ │ └── mto.py # MaterialTakeoffRepository
│ │
│ ├── services/ # Business logic layer
│ │ ├── __init__.py
│ │ ├── mto/ # MTO-specific services
│ │ │ ├── discipline_mapper.py # Sheet name → discipline mapping
│ │ │ └── schema_introspector.py # Dynamic schema introspection
│ │ ├── exporters/ # Export services
│ │ │ ├── base_export_service.py
│ │ │ └── mto_export_service.py # Excel export
│ │ ├── parsers/ # File parsing
│ │ │ ├── file_parser.py # Excel/CSV parsing
│ │ │ └── excel_writer.py # Excel writing utilities
│ │ ├── validators/ # Data validation
│ │ │ ├── base_validator.py
│ │ │ ├── mto_validator.py # MTO data validation
│ │ │ └── calculation_validator.py
│ │ ├── templates/
│ │ │ └── template_service.py # Template file management
│ │ └── tasks/
│ │ └── task_service.py # Background task management
│ │
│ ├── api/
│ │ ├── __init__.py
│ │ └── v1/ # API version 1
│ │ ├── __init__.py
│ │ └── routers/ # FastAPI routers (HTTP endpoints)
│ │ ├── clients.py
│ │ ├── projects.py
│ │ ├── services.py
│ │ ├── crews.py # Crews, CrewTrades, CrewMembers (415 lines)
│ │ ├── service_crews.py
│ │ ├── equipment.py
│ │ ├── equipment_tag.py
│ │ ├── material.py
│ │ ├── wbs.py
│ │ └── mto.py # Material Takeoff endpoints (1344 lines)
│ │
│ ├── utils/ # Utility functions
│ │ ├── batch_utils.py # Batch operation helpers
│ │ ├── data_utils.py
│ │ ├── datetime_utils.py
│ │ ├── file_utils.py
│ │ ├── string_utils.py
│ │ └── validation_utils.py
│ │
│ └── models/ # Empty (using Pydantic schemas)
│ └── __init__.py
│
├── templates/mto/ # Excel templates (10 files)
│ ├── mto_template.xlsx # Unified (9 disciplines)
│ ├── electrical_equipment_template.xlsx
│ ├── mechanical_equipment_template.xlsx
│ ├── bulk_electrical_template.xlsx
│ ├── instrumentation_template.xlsx
│ ├── piping_template.xlsx
│ ├── civil_template.xlsx
│ ├── concrete_template.xlsx
│ ├── structural_template.xlsx
│ └── architectural_template.xlsx
│
├── tests/ # Shell-based integration tests
│ ├── config.sh # Test configuration
│ ├── test_clients.sh
│ ├── test_projects.sh
│ ├── test_services.sh
│ ├── test_crews.sh
│ ├── test_service_crews.sh
│ ├── test_equipment.sh
│ ├── test_mto.sh
│ ├── test_mto_batch.sh
│ ├── test_mto_roundtrip.sh
│ ├── run_all_tests.sh
│ ├── helpers/
│ │ ├── create_test_excel.py
│ │ └── generate_mto_perf_data.py
│ └── test_data/
│
├── scripts/ # Utility scripts
│ └── create_*_template.py
│
├── function_app.py # Azure Functions ASGI wrapper
├── host.json # Azure Functions host config
├── requirements.txt # Production dependencies
├── requirements-dev.txt # Development dependencies
├── .env.example # Environment template
├── local.settings.json.example # Azure Functions local settings
├── .python-version # Python version specification
├── pyproject.toml # Project configuration (ruff, mypy, pytest)
└── README.md
Core Components¶
1. Application Entry Point (main.py)¶
Location: app/main.py
This is the heart of the FastAPI application. It: - Creates the FastAPI app instance - Manages application lifecycle (startup/shutdown) - Configures middleware (CORS) - Registers API routers - Provides health check endpoints
Key Features:
# Lifespan management for database connections
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: Initialize database
await cosmos_db.initialize()
yield
# Shutdown: Close connections
await cosmos_db.close()
# FastAPI app with configuration
app = FastAPI(
title=settings.PROJECT_NAME,
version=settings.VERSION,
lifespan=lifespan
)
Router Registration:
# Example: Client router
app.include_router(
clients.router,
prefix=f"{settings.API_V1_STR}/clients",
tags=["Clients"]
)
2. Configuration (core/config.py)¶
Location: app/core/config.py
Centralized configuration using Pydantic Settings: - Loads from environment variables - Type validation - Default values - Cached singleton pattern
Key Settings: - API Configuration: Version, debugging, CORS origins - Database: Cosmos DB endpoint, keys, container names - Pagination: Default skip/limit values - Business Logic: Rate calculation defaults
Usage:
from app.core.config import settings
# Access any setting
endpoint = settings.COSMOS_ENDPOINT
container_name = settings.CLIENTS_CONTAINER
3. Database Layer (db/cosmos.py)¶
Location: app/db/cosmos.py
Manages Azure Cosmos DB connections: - Singleton instance pattern - Lazy initialization - Container management - Health checks
Key Methods:
- initialize() - Connects to Cosmos DB and creates containers
- close() - Closes connections
- get_container(name) - Retrieves a container client
- health_check() - Verifies database connectivity
Container Configuration:
container_configs = {
"clients": {"name": settings.CLIENTS_CONTAINER, "partition_key": "/clientId"},
"projects": {"name": settings.PROJECTS_CONTAINER, "partition_key": "/clientId"},
"services": {"name": settings.SERVICES_CONTAINER, "partition_key": "/projectId"},
"service_crews": {"name": settings.SERVICE_CREWS_CONTAINER, "partition_key": "/serviceCrewId"},
"crews": {"name": settings.CREWS_CONTAINER, "partition_key": "/crewId"},
"crew_trades": {"name": settings.CREW_TRADES_CONTAINER, "partition_key": "/tradeCode"},
"crew_members": {"name": settings.CREW_MEMBERS_CONTAINER, "partition_key": "/locationKey"},
"equipment": {"name": settings.EQUIPMENT_CONTAINER, "partition_key": "/equipmentCode"},
"equipment-tags": {"name": settings.EQUIPMENT_TAGS_CONTAINER, "partition_key": "/projectId"},
"material": {"name": settings.MATERIAL_CONTAINER, "partition_key": "/materialCode"},
"wbs": {"name": settings.WBS_CONTAINER, "partition_key": "/wbsCode"},
"tasks": {"name": settings.TASKS_CONTAINER, "partition_key": "/projectId"},
}
Container Summary (24 total):
| Container | Partition Key | Purpose |
|---|---|---|
Clients |
/clientId |
Client organizations |
Projects |
/clientId |
Construction projects |
Services |
/projectId |
Project services/work packages |
ServiceCrews |
/serviceCrewId |
Service-specific crew configurations (includes indirect costs) |
Crews |
/crewId |
Reusable crew compositions |
CrewTrades |
/tradeCode |
Universal trade definitions |
CrewMembers |
/locationKey |
Location-specific labor rates |
Equipment |
/equipmentCode |
Equipment catalog |
equipment-tags |
/projectId |
Project equipment tags |
Material |
/materialCode |
Material catalog |
MaterialTakeoff |
/projectId |
MTO items (9 disciplines) |
WBS |
/wbsCode |
Work Breakdown Structure |
Tasks |
/projectId |
Background task tracking |
Subcontractors |
/projectId |
Project-specific subcontractor rate cards |
ProjectMaterialCosts |
/projectId |
MTO-Material cost mappings |
coa-kpis |
/group |
COA KPI definitions |
ScheduleActivities |
/projectId |
P6 schedule activities |
ApiKeys |
/keyPrefix |
API key storage |
Packages |
/projectId |
Work packages |
SystemsSubsystems |
/projectId |
System hierarchy |
FileUploads |
/projectId |
Centralized upload tracking |
EstimationMasterUploads |
/projectId |
Estimation master uploads |
DropdownOptions |
/dropdownType |
Global dropdown values |
REMOVED: IndirectCosts container - Data migrated to ServiceCrews container.
NOTE: locationKey format for CrewMembers: country|region|province|year|quarter|projectType
4. Schema Layer (schemas/)¶
Location: app/schemas/
Pydantic models for data validation and serialization.
Base Schemas (base.py):
- BaseSchema - Common Pydantic configuration
- BaseDocument - Adds id, timestamps, audit fields
- PaginatedResponse - Standard pagination wrapper
- ErrorResponse - Consistent error format
Entity Schemas (e.g., client.py):
- ClientBase - Shared properties
- ClientCreate - Properties for creation (request)
- ClientUpdate - Properties for updates (request)
- Client - Full database model (response)
- ClientDetailed - Extended model with relationships
Key Features:
class ClientBase(BaseSchema):
client_code: str = Field(alias="clientCode")
client_name: str = Field(alias="clientName")
# Field aliases handle camelCase <-> snake_case
class Client(ClientBase, BaseDocument):
# Inherits id, createdAt, updatedAt from BaseDocument
client_id: str = Field(alias="clientId")
type: Literal["client"] = "client" # Discriminator field
5. Service Layer (services/)¶
Location: app/services/
Business logic layer that provides specialized functionality for complex operations.
Service Categories:
MTO Services (app/services/mto/):
- discipline_mapper.py - Maps sheet names to discipline types
- schema_introspector.py - Dynamic schema introspection for MTO fields
File Parsing (app/services/parsers/):
- file_parser.py - Parse Excel (.xlsx, .xls) and CSV files
- excel_writer.py - Excel file writing utilities with formatting
Export Services (app/services/exporters/):
- base_export_service.py - Base class for export functionality
- mto_export_service.py - MTO Excel export (multi-sheet and single-sheet)
Validation (app/services/validators/):
- base_validator.py - Base validation patterns
- mto_validator.py - MTO data validation rules
- calculation_validator.py - Calculation and formula validation
Templates (app/services/templates/):
- template_service.py - Template file management and generation
Background Tasks (app/services/tasks/):
- task_service.py - Task tracking for async operations (MTO import, etc.)
Messaging (app/services/messaging/):
- service_bus_service.py - Azure Service Bus integration for async message processing
Storage (app/services/storage/):
- blob_storage_service.py - Azure Blob Storage integration for file uploads/downloads
Business Logic (app/services/):
- crew_trade_service.py - Trade code deletion with cascade operations (referential integrity)
Key Service Methods:
FileParser - Parse Excel files for import:
async def parse_file(file_stream, filename) -> list[dict]:
"""Parse single sheet Excel/CSV"""
pass
async def parse_excel_all_sheets(file_stream) -> dict[str, list[dict]]:
"""Parse all sheets in Excel file"""
pass
MTOExportService - Export MTO data to Excel:
async def export_mtos(
mtos: list[dict],
filename: str,
format: str = "multi-sheet"
) -> str:
"""Export MTOs to Excel file (multi-sheet or single-sheet)"""
pass
TaskService - Manage background tasks:
async def create_task(task_type, project_id, metadata) -> dict:
"""Create task record for async operation"""
pass
async def update_task_progress(task_id, project_id, current, total, message):
"""Update task progress for UI polling"""
pass
async def update_task_status(task_id, project_id, status, result=None, error=None):
"""Mark task as completed or failed"""
pass
async def find_recent_duplicate_task(project_id, task_type, filename, file_size, strategy):
"""Idempotency protection - find duplicate task within 10 minutes"""
pass
MTOValidator - Validate MTO data:
async def validate_mtos(discipline, data: list[dict]) -> ValidationResult:
"""
Validate MTO items against discipline schema.
Returns: ValidationResult with valid_data and errors lists.
"""
pass
6. Repository Layer (repositories/)¶
Location: app/repositories/
Data access layer that abstracts database operations using the Repository pattern.
Base Repository (base.py):
Generic repository using Python 3.12+ type parameter syntax:
class BaseRepository[T](ABC):
def __init__(self, container: ContainerProxy):
self.container = container
Provides generic CRUD operations:
- create(item) - Insert new document with auto-generated ID and timestamps
- get_by_id(id, partition_key) - Retrieve by ID and partition key
- update(item) - Update existing document (reads first, then replaces)
- upsert(item) - Insert or update based on existence
- delete(id, partition_key) - Remove document
- query(query, parameters, partition_key, max_item_count) - Execute SQL queries
- query_with_pagination(query, skip, limit) - Paginated queries with total count
- get_all(partition_key, skip, limit) - Retrieve all with pagination
- exists(item_id, partition_key) - Check if document exists
- batch_create(items) - Create multiple items
Entity Repositories (e.g., crew.py):
Extend BaseRepository with entity-specific methods:
class CrewRepository(BaseRepository):
def get_partition_key(self, item: dict[str, Any]) -> str:
return item.get('crewId', item.get('id', ''))
async def create_crew(self, crew_data: CrewCreate) -> Crew:
# Business logic and validation
pass
async def get_crew_by_code(self, crew_code: str) -> Crew | None:
# Custom query methods with proper typing
pass
class CrewTradeRepository(BaseRepository):
# Separate repository for crew trades
pass
class CrewMemberRepository(BaseRepository):
# Separate repository for crew members
pass
Key Responsibilities:
- Transform Pydantic models to/from Cosmos DB documents using model_dump() and **dict
- Execute complex SQL queries with parameterization
- Handle partition keys correctly (abstract method enforced)
- Manage timestamps (createdAt, updatedAt) automatically
- Provide type-safe interfaces with modern Python typing
6. API Layer (api/v1/routers/)¶
Location: app/api/v1/routers/
FastAPI routers define HTTP endpoints.
Structure (e.g., clients.py):
router = APIRouter()
# Dependency injection for repository
async def get_client_repo() -> ClientRepository:
return ClientRepository(cosmos_db.get_container("clients"))
# Endpoint definition
@router.get("", response_model=PaginatedResponse)
async def get_clients(
search: Optional[str] = Query(None),
skip: int = Query(0, ge=0),
limit: int = Query(20, ge=1, le=100),
repo: ClientRepository = Depends(get_client_repo)
):
result = await repo.get_all_clients(search=search, skip=skip, limit=limit)
return PaginatedResponse(**result)
Key Features: - Dependency injection for repositories - Request validation via Pydantic models - Response models for serialization - Query parameter validation - Path parameter validation - HTTP status codes - Error handling with HTTPException
Data Flow¶
Understanding how data flows through the application:
1. Request Flow (Incoming)¶
flowchart TD
Req["HTTP Request"]
Router["FastAPI Router (API Layer)"]
Schema["Pydantic Schema (Request Model)"]
Repo["Repository (Data Access Layer)"]
Container["Cosmos DB Container"]
DB["Database (Cosmos DB)"]
Req --> Router
Router -->|"validates request"| Schema
Schema -->|"dependency injection"| Repo
Repo -->|"SQL query"| Container
Container --> DB
Example: Creating a client
1. POST /api/v1/clients with JSON body
2. Router receives request, validates against ClientCreate schema
3. Router calls repo.create_client(client_data)
4. Repository transforms to database document
5. Repository executes container.create_item()
6. Cosmos DB stores document
2. Response Flow (Outgoing)¶
flowchart TD
DB["Database (Cosmos DB)"]
Container["Cosmos DB Container"]
Repo["Repository (Data Access Layer)"]
Schema["Pydantic Schema (Response Model)"]
Router["FastAPI Router (API Layer)"]
Resp["HTTP Response"]
DB --> Container
Container -->|"document"| Repo
Repo -->|"transforms to Pydantic model"| Schema
Schema -->|"serializes to JSON"| Router
Router --> Resp
Example: Retrieving a client
1. Repository queries container.read_item(id, partition_key)
2. Cosmos DB returns document dict
3. Repository converts to Client(**result)
4. Pydantic validates and serializes
5. FastAPI returns JSON response
3. Dependency Injection Flow¶
# 1. Define dependency function
async def get_client_repo() -> ClientRepository:
return ClientRepository(cosmos_db.get_container("clients"))
# 2. Inject into endpoint
@router.get("/{client_id}")
async def get_client(
client_id: str,
repo: ClientRepository = Depends(get_client_repo) # ← Injected
):
return await repo.get_client(client_id)
FastAPI automatically:
- Calls get_client_repo() before the endpoint
- Passes the result as repo parameter
- Reuses across multiple endpoint calls
- Manages lifecycle
Integration with Frontend¶
1. API Contract¶
The frontend communicates with the backend via RESTful JSON APIs.
Base URL: http://localhost:8000/api/v1 (development)
Common Patterns:
List Resources (Paginated):
GET /api/v1/clients?skip=0&limit=20&search=acme
Response: {
"data": [...],
"total": 45,
"skip": 0,
"limit": 20,
"hasMore": true
}
Get Single Resource:
GET /api/v1/clients/abc-123
Response: {
"id": "abc-123",
"clientCode": "ACME",
"clientName": "ACME Corp",
...
}
Create Resource:
POST /api/v1/clients
Body: {
"clientCode": "ACME",
"clientName": "ACME Corp",
"clientType": "private",
...
}
Response: 201 Created
{
"id": "generated-id",
"clientCode": "ACME",
"createdAt": "2025-10-09T...",
...
}
Update Resource:
PUT /api/v1/clients/abc-123
Body: {
"clientName": "ACME Corporation"
}
Response: 200 OK
{
"id": "abc-123",
"clientName": "ACME Corporation",
"updatedAt": "2025-10-09T...",
...
}
Delete Resource:
2. Field Naming Convention¶
Backend (Python): snake_case
Frontend (TypeScript): camelCase
Pydantic handles conversion automatically via field aliases:
class Client(BaseSchema):
client_name: str = Field(alias="clientName")
created_at: datetime = Field(alias="createdAt")
model_config = ConfigDict(populate_by_name=True)
3. CORS Configuration¶
Configured in app/main.py:
app.add_middleware(
CORSMiddleware,
allow_origins=settings.BACKEND_CORS_ORIGINS, # Frontend URLs
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
Default allowed origins (from config.py):
- http://localhost:3000 (React dev server)
- http://localhost:5173 (Vite dev server)
- https://*.azurestaticapps.net (Azure deployment)
4. Error Handling¶
Consistent error responses:
Response: 400 Bad Request
{
"detail": "Client with this code already exists"
}
Response: 404 Not Found
{
"detail": "Client not found"
}
Response: 500 Internal Server Error
{
"detail": "Failed to retrieve clients"
}
5. API Documentation¶
Auto-generated from Pydantic models and FastAPI decorators:
Swagger UI: http://localhost:8000/api/docs
- Interactive API explorer
- Try out endpoints
- View request/response schemas
ReDoc: http://localhost:8000/api/redoc
- Beautiful API documentation
- Organized by tags
- Downloadable OpenAPI spec
Key Patterns¶
1. Repository Pattern¶
Purpose: Separate data access logic from business logic
Implementation:
# Base repository provides common CRUD
class BaseRepository(ABC, Generic[T]):
async def create(self, item): ...
async def get_by_id(self, id, pk): ...
async def update(self, item): ...
async def delete(self, id, pk): ...
# Entity repositories add specific queries
class ClientRepository(BaseRepository):
async def get_client_by_code(self, code): ...
async def get_client_projects(self, client_id): ...
Benefits: - Testable (mock repositories) - Reusable query logic - Consistent error handling - Database abstraction
2. Pydantic Model Hierarchy¶
Purpose: Separate concerns for different use cases
Pattern:
# Base - shared fields
class ClientBase(BaseSchema):
client_code: str
client_name: str
# Create - what's needed to create
class ClientCreate(ClientBase):
pass # Requires all base fields
# Update - what can be updated
class ClientUpdate(BaseSchema):
client_name: Optional[str] = None # All fields optional
# Full model - what's stored/returned
class Client(ClientBase, BaseDocument):
client_id: str
created_at: datetime
project_count: int = 0
Benefits: - Type safety for different operations - Clear validation rules - Prevents over-posting - Self-documenting API
3. Dependency Injection¶
Purpose: Manage shared resources and cross-cutting concerns
Pattern:
# Define dependency
async def get_repo() -> ClientRepository:
return ClientRepository(cosmos_db.get_container("clients"))
# Inject into endpoints
@router.get("/{id}")
async def get_item(
id: str,
repo: ClientRepository = Depends(get_repo)
):
return await repo.get_client(id)
Benefits: - Testable (inject mocks) - Reusable across endpoints - Lifecycle management - Clean separation
4. Async/Await Pattern¶
Purpose: Non-blocking I/O for better performance
Pattern:
# All database operations are async
async def create_client(self, client_data: ClientCreate):
result = await self.container.create_item(body=item)
return Client(**result)
# Endpoints are async
@router.post("")
async def create_client(
client_data: ClientCreate,
repo: ClientRepository = Depends(get_client_repo)
):
return await repo.create_client(client_data)
Benefits: - Concurrent request handling - Non-blocking database calls - Better resource utilization - Scalability
5. Partition Key Strategy¶
Purpose: Optimize Cosmos DB queries and costs
Pattern:
# Each container has a partition key
container_configs = {
"clients": {"partition_key": "/clientId"},
"projects": {"partition_key": "/clientId"}, # Co-located with client
"services": {"partition_key": "/projectId"}, # Co-located with project
}
# Repositories implement partition key logic
class ClientRepository(BaseRepository):
def get_partition_key(self, item):
return item.get('clientId', item.get('id', ''))
Benefits: - Efficient queries - Reduced RU costs - Logical data grouping - Better performance
Domain Models Overview¶
Core Entities¶
Clients (/api/v1/clients)
- Represents construction clients
- Partition key: clientId (self-referencing)
- Relationships: One-to-many with Projects
Projects (/api/v1/projects)
- Construction projects for clients
- Partition key: clientId (co-located with client)
- Relationships: Belongs to Client, has many Services
Services (/api/v1/projects/{projectId}/services)
- Services performed on projects (Estimation, Loan Monitoring, etc.)
- Partition key: projectId (co-located with project)
- Relationships: Belongs to Project, has many Service Crews
Crews (/api/v1/crews)
- Reusable crew compositions with three-tier system:
- Crew Trades (/api/v1/crews/trades): Universal trade definitions (e.g., Carpenter, Electrician)
- Partition key: tradeCode
- Fields: tradeCode, tradeName, category, description
- Crew Members (/api/v1/crews/members): Location/time-specific rate cards
- Partition key: locationKey (format: country|province|tradeCode|laborDesignation)
- Fields: tradeCode, laborDesignation, location, year, quarter, rates
- Crews (/api/v1/crews): Crew templates with manpower and equipment composition
- Partition key: crewId
- Fields: crewCode, crewName, manpower[], equipment[], discipline
Service Crews (/api/v1/service-crews)
- Links crews to services with project-specific contextual data
- Partition key: serviceCrewId
- Fields: serviceId, crewId, quantity, rate, indirectCosts (labor/equipment breakdowns)
- Relationships: Links Service to Crew with indirect cost overrides
Equipment (/api/v1/equipment)
- Equipment catalog with unit prices
- Partition key: equipmentCode
- Fields: equipmentCode, equipmentName, equipmentType, category, unit, baseUnitPrice
Material Takeoff (MTO) (/api/v1/mto)
- MTO discipline sheets for estimation services (9 disciplines)
- Partition key: projectId
- Composite Key: (projectId, sourceFileName, discipline, itemNo)
- Fields: projectId, serviceId, discipline, sourceFileName, itemNo, description, quantity, etc.
- Disciplines: Electrical Equipment, Mechanical Equipment, Bulk Electrical, Instrumentation, Piping, Civil, Concrete, Structural, Architectural
- Import/Export: Async import with progress tracking, Excel export (multi-sheet and single-sheet)
Equipment Tags (/api/v1/equipment-tags)
- Project-specific equipment tags
- Partition key: projectId
- Fields: projectId, tagNo, equipmentCode, description, etc.
Material (/api/v1/material)
- Material catalog with unit prices
- Partition key: materialCode
- Fields: materialCode, materialName, category, unit, baseUnitPrice
WBS (/api/v1/wbs)
- Work Breakdown Structure hierarchy
- Partition key: wbsCode
- Fields: wbsCode, wbsText, parentCode, level, etc.
Tasks (Background Task Tracking)
- Async operation tracking (e.g., MTO import)
- Partition key: projectId
- Fields: taskId, taskType, status, progress, result, error
- Status: pending, running, completed, failed
- Task Types: mto_import, etc.
Additional Domain Models¶
Subcontractors (/api/v1/projects/{projectId}/subcontractors)
- Project-specific subcontractor management
- Partition key: projectId
- Fields: subcontractorCode, subcontractorName, allInRate, description, contact info
- allInRate - All-inclusive hourly/unit rate for cost calculations
- Supports batch import for bulk data loading
Labor Productivity Settings (/api/v1/services/{serviceId}/labor-settings)
- Per-discipline labor productivity factors for estimation services
- Stored in Services container (type: labor_productivity_settings)
- One document per service with factors for all 9 MTO disciplines
- Used in labor cost calculations:
- totalManhours = quantity × manhourPerUnit × laborProdFactor
- laborUnitCost = manhourPerUnit × laborProdFactor × hourlyCost
- Auto-creates default settings (factor=1.0) if none exist
Project Material Costs (/api/v1/project-material-costs)
- Links MTO items to Material catalog with cost overrides
- Partition key: projectId
- Foreign keys: projectId, itemNo (MTO), materialId (Material)
- Supports discipline-specific material fields (Piping, Concrete, Structural)
- Detailed response includes enriched MTO and Material data
Advanced Topics¶
1. Cosmos DB Queries¶
The repository layer uses parameterized queries:
query = "SELECT * FROM c WHERE c.type = @type AND c.clientId = @clientId"
parameters = [
{"name": "@type", "value": "project"},
{"name": "@clientId", "value": client_id}
]
results = await self.query(query, parameters)
2. Pagination Implementation¶
Uses SQL OFFSET/LIMIT:
async def query_with_pagination(self, query, skip=0, limit=20):
paginated_query = f"{query} OFFSET {skip} LIMIT {limit}"
items = await self.query(paginated_query)
# Get total count
count_query = query.replace("SELECT *", "SELECT VALUE COUNT(1)")
total = (await self.query(count_query))[0]
return {
"data": items,
"total": total,
"hasMore": (skip + limit) < total
}
3. Error Handling Strategy¶
Layered error handling:
# Repository layer - logs and re-raises
try:
result = await self.container.create_item(body=item)
except CosmosHttpResponseError as e:
logger.error(f"Failed to create item: {str(e)}")
raise
# Router layer - converts to HTTP errors
try:
client = await repo.create_client(client_data)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Failed to create client: {str(e)}")
raise HTTPException(status_code=500, detail="Failed to create client")
4. Lifespan Management¶
Modern FastAPI lifespan pattern:
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
await cosmos_db.initialize()
yield
# Shutdown
await cosmos_db.close()
app = FastAPI(lifespan=lifespan)
Background Task Management¶
Task Service Pattern¶
For long-running operations (e.g., MTO import), use the Task Service pattern:
1. Create Task Record:
from app.services.tasks.task_service import TaskService
task_service = TaskService()
task = await task_service.create_task(
task_type="mto_import",
project_id="proj-123",
metadata={
"filename": "mto_data.xlsx",
"strategy": "merge",
"file_size": 1024000
}
)
task_id = task["taskId"]
2. Queue Background Task:
from fastapi import BackgroundTasks
background_tasks.add_task(
process_import_in_background,
task_id=task_id,
project_id=project_id,
file_content=content,
...
)
# Return task ID immediately to frontend
return TaskSubmitResponse(
taskId=task_id,
status="pending",
pollUrl=f"/api/v1/mto/import/status/{task_id}?projectId={project_id}"
)
3. Update Progress in Background Worker:
async def process_import_in_background(task_id, project_id, ...):
# Mark as running
await task_service.update_task_status(task_id, project_id, TaskStatus.RUNNING)
# Process with progress updates
for i, sheet in enumerate(sheets):
# Update progress
await task_service.update_task_progress(
task_id=task_id,
project_id=project_id,
current=i + 1,
total=len(sheets),
message=f"Processing {sheet_name} ({i+1}/{len(sheets)})"
)
# Process sheet...
# Mark as completed
await task_service.update_task_status(
task_id=task_id,
project_id=project_id,
status=TaskStatus.COMPLETED,
result={"success": True, "results": [...]}
)
4. Poll Status from Frontend:
# GET /api/v1/mto/import/status/{taskId}?projectId={projectId}
@router.get("/import/status/{task_id}")
async def get_import_task_status(task_id, project_id, task_service=Depends(...)):
task = await task_service.get_task(task_id, project_id)
return TaskResponse(**task)
Idempotency Protection¶
Prevent duplicate background tasks when user retries:
# Check for recent duplicate task
existing_task = await task_service.find_recent_duplicate_task(
project_id=project_id,
task_type="mto_import",
filename=filename,
file_size=len(content),
strategy=strategy,
max_age_minutes=10 # Only consider tasks created in last 10 minutes
)
if existing_task:
# Return existing task instead of creating new one
return TaskSubmitResponse(
taskId=existing_task["taskId"],
status=existing_task["status"],
message="Import already in progress. Returning existing task."
)
Duplicate Detection Criteria:
- Same projectId
- Same taskType
- Same filename
- Same file_size (bytes)
- Same strategy
- Created within last 10 minutes
Task Status Lifecycle¶
stateDiagram-v2
[*] --> pending
pending --> running
running --> completed
running --> failed
Status Fields:
- pending - Task created, not started
- running - Task executing (has progress object)
- completed - Task finished successfully (has result object)
- failed - Task encountered error (has error string)
Progress Object:
{
"current": 3, # Current progress count
"total": 5, # Total items to process
"percentage": 60.0, # Completion percentage (0-100)
"message": "Processing Electrical Equipment (3/5)"
}
Batch Operations¶
Cosmos DB Batch API¶
BaseRepository implements batch operations using Cosmos DB's transactional batch API:
Features: - Up to 100 items per transaction (Cosmos limit) - Transactional guarantees (all-or-nothing) - Automatic chunking for >100 items - Progress callbacks for UI updates - Exponential backoff retry for throttling (429 errors) - 500ms delay between batches to avoid rate limiting
Batch Create Example:
# Repository method
async def _execute_batch_create(
self,
items: list[dict],
partition_key: str,
progress_callback: Callable | None = None
) -> list[dict]:
"""
Create up to 100 items in a single transaction.
Args:
items: List of items to create (max 100)
partition_key: Partition key value (all items must have same PK)
progress_callback: Optional async callback(current, total, message)
Returns:
List of created items
"""
if len(items) > 100:
raise ValueError("Batch size cannot exceed 100 items")
# Create batch operations
batch_operations = []
for item in items:
batch_operations.append(("create", (item,)))
# Execute batch with retries
for attempt in range(self.batch_retry_attempts):
try:
results = await self.container.execute_item_batch(
batch_operations=batch_operations,
partition_key=partition_key
)
# Update progress if callback provided
if progress_callback:
await progress_callback(len(items), len(items), "Batch completed")
return results
except CosmosHttpResponseError as e:
if e.status_code == 429 and attempt < self.batch_retry_attempts - 1:
# Throttling - exponential backoff
wait_time = (2 ** attempt) * 0.5 # 0.5s, 1s, 2s
await asyncio.sleep(wait_time)
continue
raise
Chunked Batch Create (>100 items):
async def batch_create_large(
self,
items: list[dict],
progress_callback: Callable | None = None
) -> list[dict]:
"""
Create any number of items by chunking into batches of 100.
"""
# Group by partition key
items_by_pk = {}
for item in items:
pk = self.get_partition_key(item)
if pk not in items_by_pk:
items_by_pk[pk] = []
items_by_pk[pk].append(item)
all_results = []
total_processed = 0
for pk, pk_items in items_by_pk.items():
# Chunk into batches of 100
chunks = [pk_items[i:i+100] for i in range(0, len(pk_items), 100)]
for chunk_index, chunk in enumerate(chunks):
# Execute batch
results = await self._execute_batch_create(
items=chunk,
partition_key=pk,
progress_callback=None # Use overall callback instead
)
all_results.extend(results)
total_processed += len(chunk)
# Update overall progress
if progress_callback:
await progress_callback(
total_processed,
len(items),
f"Created {total_processed}/{len(items)} items"
)
# Rate limiting: wait between batches
if chunk_index < len(chunks) - 1:
await asyncio.sleep(0.5) # 500ms delay
return all_results
Progress Callback Pattern:
# In MTO import background task
async def progress_callback(current, total, message):
await task_service.update_task_progress(
task_id=task_id,
project_id=project_id,
current=current,
total=total,
message=message
)
# Use in batch operation
await mto_repo.batch_create_large(
items=mto_items,
progress_callback=progress_callback
)
Best Practices¶
For Backend Development¶
- Always use async/await for I/O operations
- Use type hints throughout for IDE support
- Validate with Pydantic models, not manual checks
- Handle partition keys correctly in repositories
- Log errors before raising exceptions
- Use dependency injection for testability
- Keep routers thin - business logic in repositories
- Document with docstrings for auto-generated docs
For Frontend Integration¶
- Use the OpenAPI spec to generate TypeScript types
- Handle pagination consistently with skip/limit
- Use proper HTTP methods (GET, POST, PUT, DELETE)
- Check HTTP status codes for error handling
- Handle camelCase/snake_case conversion automatically
- Use the Swagger UI for API exploration during development
Deployment¶
Azure Functions¶
The backend can deploy to Azure Functions:
Entry point: function_app.py
import azure.functions as func
from app.main import app as fastapi_app
app = func.AsgiFunctionApp(
app=fastapi_app,
http_auth_level=func.AuthLevel.ANONYMOUS
)
This wraps the FastAPI app in an Azure Functions ASGI wrapper.
Environment Variables¶
Required in production:
- COSMOS_ENDPOINT - Cosmos DB endpoint URL
- COSMOS_KEY - Cosmos DB primary key
- DATABASE_NAME - Database name
- BACKEND_CORS_ORIGINS - Allowed frontend origins
Azure Services:
- AZURE_STORAGE_CONNECTION_STRING - Azure Blob Storage connection string
- AZURE_STORAGE_CONTAINER_NAME - Blob container for file imports (default: "imports")
- AZURE_SERVICE_BUS_CONNECTION_STRING - Azure Service Bus connection string
- AZURE_SERVICE_BUS_QUEUE_NAME - Queue for async processing (default: "import-queue")
Troubleshooting¶
Common Issues¶
1. "Cosmos DB not initialized"
- Ensure await cosmos_db.initialize() is called
- Check environment variables are set
- Verify Cosmos DB credentials
2. "Partition key mismatch"
- Verify get_partition_key() implementation
- Ensure partition key field exists in document
- Check container configuration
3. "CORS errors"
- Add frontend URL to BACKEND_CORS_ORIGINS
- Restart backend after changing CORS settings
- Check browser network tab for actual origin
4. "Validation errors"
- Check Pydantic model field names
- Verify aliases match frontend camelCase
- Use populate_by_name=True for flexibility
Next Steps¶
Recommended Enhancements¶
- Authentication & Authorization
- Add JWT tokens
- Implement role-based access
-
Secure sensitive endpoints
-
Service Layer
- Add business logic between routers and repositories
- Implement complex workflows
-
Coordinate multi-repository operations
-
Caching
- Add Redis for frequently accessed data
- Implement cache invalidation
-
Reduce Cosmos DB RU consumption
-
Testing
- Unit tests for repositories
- Integration tests for endpoints
-
Mock Cosmos DB for testing
-
Monitoring
- Application Insights integration
- Custom metrics
- Performance monitoring
-
Error tracking
-
API Versioning
- Plan for v2 endpoints
- Maintain backward compatibility
- Deprecation strategy
Resources¶
- FastAPI Documentation: https://fastapi.tiangolo.com
- Pydantic Documentation: https://docs.pydantic.dev
- Azure Cosmos DB SDK: https://learn.microsoft.com/python/api/azure-cosmos
- API Documentation: http://localhost:8000/api/docs (when running)
Summary¶
The LCO backend is a modern, well-architected FastAPI application that:
- ✅ Uses layered architecture for separation of concerns
- ✅ Implements repository pattern for data access
- ✅ Leverages Pydantic for type safety and validation
- ✅ Supports async/await for performance
- ✅ Provides dependency injection for testability
- ✅ Integrates seamlessly with Azure Cosmos DB
- ✅ Auto-generates API documentation
- ✅ Follows REST conventions for frontend integration
This architecture provides a solid foundation for building scalable, maintainable construction management applications.