Skip to content

LCO Backend Architecture Guide

Overview

This guide provides a comprehensive understanding of the FastAPI backend architecture for the LCO Construction Consulting application. The backend follows modern Python best practices with a clean, modular architecture that separates concerns and promotes maintainability.

Table of Contents

  1. Architecture Principles
  2. Project Structure
  3. Core Components
  4. Data Flow
  5. Integration with Frontend
  6. Key Patterns

Architecture Principles

The backend is built on several key architectural principles:

1. Layered Architecture

The application follows a clear separation of concerns with distinct layers: - API Layer (Routers) - HTTP endpoints and request handling - Repository Layer - Data access and persistence logic - Schema Layer - Data validation and serialization - Database Layer - Connection management and configuration

2. Dependency Injection

FastAPI's dependency injection system is used throughout to: - Manage database connections - Share repository instances - Handle cross-cutting concerns - Facilitate testing

3. Async/Await First

Full async support for: - Non-blocking database operations - Concurrent request handling - Improved scalability

4. Type Safety

Pydantic models provide: - Runtime validation - Type hints throughout - Auto-generated API documentation - Consistent serialization


Project Structure

backend/
├── app/
│   ├── __init__.py
│   ├── main.py                          # FastAPI application entry point
│   │
│   ├── core/                            # Core configuration
│   │   ├── __init__.py
│   │   └── config.py                   # Settings & environment variables (Pydantic Settings)
│   │
│   ├── db/                              # Database layer
│   │   ├── __init__.py
│   │   └── cosmos.py                   # Cosmos DB async client & initialization
│   │
│   ├── schemas/                         # Pydantic models (validation & serialization)
│   │   ├── __init__.py
│   │   ├── base.py                     # BaseSchema, BaseDocument, PaginatedResponse
│   │   ├── common.py                   # Common shared models
│   │   ├── client.py                   # Client schemas
│   │   ├── project.py                  # Project schemas
│   │   ├── crew.py                     # Crew, CrewTrade, CrewMember schemas
│   │   ├── service.py                  # Service schemas
│   │   ├── service_crew.py             # Service crew schemas (with indirect costs)
│   │   ├── equipment.py                # Equipment schemas
│   │   ├── equipment_tag.py            # Equipment tag schemas
│   │   ├── material.py                 # Material schemas
│   │   ├── wbs.py                      # WBS schemas
│   │   ├── task.py                     # Task tracking schemas
│   │   └── mto.py                      # Material Takeoff (MTO) schemas (1361 lines!)
│   │
│   ├── repositories/                    # Data access layer (Repository pattern)
│   │   ├── __init__.py
│   │   ├── base.py                     # Generic BaseRepository with CRUD + batch operations
│   │   ├── client.py                   # ClientRepository
│   │   ├── project.py                  # ProjectRepository
│   │   ├── crew.py                     # CrewRepository, CrewTradeRepository, CrewMemberRepository
│   │   ├── service.py                  # ServiceRepository
│   │   ├── service_crew.py             # ServiceCrewRepository
│   │   ├── equipment.py                # EquipmentRepository
│   │   ├── equipment_tag.py            # EquipmentTagRepository
│   │   ├── material.py                 # MaterialRepository
│   │   ├── wbs.py                      # WBSRepository
│   │   └── mto.py                      # MaterialTakeoffRepository
│   │
│   ├── services/                        # Business logic layer
│   │   ├── __init__.py
│   │   ├── mto/                        # MTO-specific services
│   │   │   ├── discipline_mapper.py   # Sheet name → discipline mapping
│   │   │   └── schema_introspector.py # Dynamic schema introspection
│   │   ├── exporters/                  # Export services
│   │   │   ├── base_export_service.py
│   │   │   └── mto_export_service.py  # Excel export
│   │   ├── parsers/                    # File parsing
│   │   │   ├── file_parser.py         # Excel/CSV parsing
│   │   │   └── excel_writer.py        # Excel writing utilities
│   │   ├── validators/                 # Data validation
│   │   │   ├── base_validator.py
│   │   │   ├── mto_validator.py       # MTO data validation
│   │   │   └── calculation_validator.py
│   │   ├── templates/
│   │   │   └── template_service.py    # Template file management
│   │   └── tasks/
│   │       └── task_service.py        # Background task management
│   │
│   ├── api/
│   │   ├── __init__.py
│   │   └── v1/                          # API version 1
│   │       ├── __init__.py
│   │       └── routers/                 # FastAPI routers (HTTP endpoints)
│   │           ├── clients.py
│   │           ├── projects.py
│   │           ├── services.py
│   │           ├── crews.py            # Crews, CrewTrades, CrewMembers (415 lines)
│   │           ├── service_crews.py
│   │           ├── equipment.py
│   │           ├── equipment_tag.py
│   │           ├── material.py
│   │           ├── wbs.py
│   │           └── mto.py              # Material Takeoff endpoints (1344 lines)
│   │
│   ├── utils/                           # Utility functions
│   │   ├── batch_utils.py              # Batch operation helpers
│   │   ├── data_utils.py
│   │   ├── datetime_utils.py
│   │   ├── file_utils.py
│   │   ├── string_utils.py
│   │   └── validation_utils.py
│   │
│   └── models/                          # Empty (using Pydantic schemas)
│       └── __init__.py
├── templates/mto/                       # Excel templates (10 files)
│   ├── mto_template.xlsx               # Unified (9 disciplines)
│   ├── electrical_equipment_template.xlsx
│   ├── mechanical_equipment_template.xlsx
│   ├── bulk_electrical_template.xlsx
│   ├── instrumentation_template.xlsx
│   ├── piping_template.xlsx
│   ├── civil_template.xlsx
│   ├── concrete_template.xlsx
│   ├── structural_template.xlsx
│   └── architectural_template.xlsx
├── tests/                               # Shell-based integration tests
│   ├── config.sh                       # Test configuration
│   ├── test_clients.sh
│   ├── test_projects.sh
│   ├── test_services.sh
│   ├── test_crews.sh
│   ├── test_service_crews.sh
│   ├── test_equipment.sh
│   ├── test_mto.sh
│   ├── test_mto_batch.sh
│   ├── test_mto_roundtrip.sh
│   ├── run_all_tests.sh
│   ├── helpers/
│   │   ├── create_test_excel.py
│   │   └── generate_mto_perf_data.py
│   └── test_data/
├── scripts/                             # Utility scripts
│   └── create_*_template.py
├── function_app.py                      # Azure Functions ASGI wrapper
├── host.json                            # Azure Functions host config
├── requirements.txt                     # Production dependencies
├── requirements-dev.txt                 # Development dependencies
├── .env.example                         # Environment template
├── local.settings.json.example          # Azure Functions local settings
├── .python-version                      # Python version specification
├── pyproject.toml                       # Project configuration (ruff, mypy, pytest)
└── README.md

Core Components

1. Application Entry Point (main.py)

Location: app/main.py

This is the heart of the FastAPI application. It: - Creates the FastAPI app instance - Manages application lifecycle (startup/shutdown) - Configures middleware (CORS) - Registers API routers - Provides health check endpoints

Key Features:

# Lifespan management for database connections
@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: Initialize database
    await cosmos_db.initialize()
    yield
    # Shutdown: Close connections
    await cosmos_db.close()

# FastAPI app with configuration
app = FastAPI(
    title=settings.PROJECT_NAME,
    version=settings.VERSION,
    lifespan=lifespan
)

Router Registration:

# Example: Client router
app.include_router(
    clients.router,
    prefix=f"{settings.API_V1_STR}/clients",
    tags=["Clients"]
)

2. Configuration (core/config.py)

Location: app/core/config.py

Centralized configuration using Pydantic Settings: - Loads from environment variables - Type validation - Default values - Cached singleton pattern

Key Settings: - API Configuration: Version, debugging, CORS origins - Database: Cosmos DB endpoint, keys, container names - Pagination: Default skip/limit values - Business Logic: Rate calculation defaults

Usage:

from app.core.config import settings

# Access any setting
endpoint = settings.COSMOS_ENDPOINT
container_name = settings.CLIENTS_CONTAINER

3. Database Layer (db/cosmos.py)

Location: app/db/cosmos.py

Manages Azure Cosmos DB connections: - Singleton instance pattern - Lazy initialization - Container management - Health checks

Key Methods: - initialize() - Connects to Cosmos DB and creates containers - close() - Closes connections - get_container(name) - Retrieves a container client - health_check() - Verifies database connectivity

Container Configuration:

container_configs = {
    "clients": {"name": settings.CLIENTS_CONTAINER, "partition_key": "/clientId"},
    "projects": {"name": settings.PROJECTS_CONTAINER, "partition_key": "/clientId"},
    "services": {"name": settings.SERVICES_CONTAINER, "partition_key": "/projectId"},
    "service_crews": {"name": settings.SERVICE_CREWS_CONTAINER, "partition_key": "/serviceCrewId"},
    "crews": {"name": settings.CREWS_CONTAINER, "partition_key": "/crewId"},
    "crew_trades": {"name": settings.CREW_TRADES_CONTAINER, "partition_key": "/tradeCode"},
    "crew_members": {"name": settings.CREW_MEMBERS_CONTAINER, "partition_key": "/locationKey"},
    "equipment": {"name": settings.EQUIPMENT_CONTAINER, "partition_key": "/equipmentCode"},
    "equipment-tags": {"name": settings.EQUIPMENT_TAGS_CONTAINER, "partition_key": "/projectId"},
    "material": {"name": settings.MATERIAL_CONTAINER, "partition_key": "/materialCode"},
    "wbs": {"name": settings.WBS_CONTAINER, "partition_key": "/wbsCode"},
    "tasks": {"name": settings.TASKS_CONTAINER, "partition_key": "/projectId"},
}

Container Summary (24 total):

Container Partition Key Purpose
Clients /clientId Client organizations
Projects /clientId Construction projects
Services /projectId Project services/work packages
ServiceCrews /serviceCrewId Service-specific crew configurations (includes indirect costs)
Crews /crewId Reusable crew compositions
CrewTrades /tradeCode Universal trade definitions
CrewMembers /locationKey Location-specific labor rates
Equipment /equipmentCode Equipment catalog
equipment-tags /projectId Project equipment tags
Material /materialCode Material catalog
MaterialTakeoff /projectId MTO items (9 disciplines)
WBS /wbsCode Work Breakdown Structure
Tasks /projectId Background task tracking
Subcontractors /projectId Project-specific subcontractor rate cards
ProjectMaterialCosts /projectId MTO-Material cost mappings
coa-kpis /group COA KPI definitions
ScheduleActivities /projectId P6 schedule activities
ApiKeys /keyPrefix API key storage
Packages /projectId Work packages
SystemsSubsystems /projectId System hierarchy
FileUploads /projectId Centralized upload tracking
EstimationMasterUploads /projectId Estimation master uploads
DropdownOptions /dropdownType Global dropdown values

REMOVED: IndirectCosts container - Data migrated to ServiceCrews container. NOTE: locationKey format for CrewMembers: country|region|province|year|quarter|projectType

4. Schema Layer (schemas/)

Location: app/schemas/

Pydantic models for data validation and serialization.

Base Schemas (base.py): - BaseSchema - Common Pydantic configuration - BaseDocument - Adds id, timestamps, audit fields - PaginatedResponse - Standard pagination wrapper - ErrorResponse - Consistent error format

Entity Schemas (e.g., client.py): - ClientBase - Shared properties - ClientCreate - Properties for creation (request) - ClientUpdate - Properties for updates (request) - Client - Full database model (response) - ClientDetailed - Extended model with relationships

Key Features:

class ClientBase(BaseSchema):
    client_code: str = Field(alias="clientCode")
    client_name: str = Field(alias="clientName")
    # Field aliases handle camelCase <-> snake_case

class Client(ClientBase, BaseDocument):
    # Inherits id, createdAt, updatedAt from BaseDocument
    client_id: str = Field(alias="clientId")
    type: Literal["client"] = "client"  # Discriminator field

5. Service Layer (services/)

Location: app/services/

Business logic layer that provides specialized functionality for complex operations.

Service Categories:

MTO Services (app/services/mto/): - discipline_mapper.py - Maps sheet names to discipline types - schema_introspector.py - Dynamic schema introspection for MTO fields

File Parsing (app/services/parsers/): - file_parser.py - Parse Excel (.xlsx, .xls) and CSV files - excel_writer.py - Excel file writing utilities with formatting

Export Services (app/services/exporters/): - base_export_service.py - Base class for export functionality - mto_export_service.py - MTO Excel export (multi-sheet and single-sheet)

Validation (app/services/validators/): - base_validator.py - Base validation patterns - mto_validator.py - MTO data validation rules - calculation_validator.py - Calculation and formula validation

Templates (app/services/templates/): - template_service.py - Template file management and generation

Background Tasks (app/services/tasks/): - task_service.py - Task tracking for async operations (MTO import, etc.)

Messaging (app/services/messaging/): - service_bus_service.py - Azure Service Bus integration for async message processing

Storage (app/services/storage/): - blob_storage_service.py - Azure Blob Storage integration for file uploads/downloads

Business Logic (app/services/): - crew_trade_service.py - Trade code deletion with cascade operations (referential integrity)

Key Service Methods:

FileParser - Parse Excel files for import:

async def parse_file(file_stream, filename) -> list[dict]:
    """Parse single sheet Excel/CSV"""
    pass

async def parse_excel_all_sheets(file_stream) -> dict[str, list[dict]]:
    """Parse all sheets in Excel file"""
    pass

MTOExportService - Export MTO data to Excel:

async def export_mtos(
    mtos: list[dict],
    filename: str,
    format: str = "multi-sheet"
) -> str:
    """Export MTOs to Excel file (multi-sheet or single-sheet)"""
    pass

TaskService - Manage background tasks:

async def create_task(task_type, project_id, metadata) -> dict:
    """Create task record for async operation"""
    pass

async def update_task_progress(task_id, project_id, current, total, message):
    """Update task progress for UI polling"""
    pass

async def update_task_status(task_id, project_id, status, result=None, error=None):
    """Mark task as completed or failed"""
    pass

async def find_recent_duplicate_task(project_id, task_type, filename, file_size, strategy):
    """Idempotency protection - find duplicate task within 10 minutes"""
    pass

MTOValidator - Validate MTO data:

async def validate_mtos(discipline, data: list[dict]) -> ValidationResult:
    """
    Validate MTO items against discipline schema.
    Returns: ValidationResult with valid_data and errors lists.
    """
    pass

6. Repository Layer (repositories/)

Location: app/repositories/

Data access layer that abstracts database operations using the Repository pattern.

Base Repository (base.py): Generic repository using Python 3.12+ type parameter syntax:

class BaseRepository[T](ABC):
    def __init__(self, container: ContainerProxy):
        self.container = container

Provides generic CRUD operations: - create(item) - Insert new document with auto-generated ID and timestamps - get_by_id(id, partition_key) - Retrieve by ID and partition key - update(item) - Update existing document (reads first, then replaces) - upsert(item) - Insert or update based on existence - delete(id, partition_key) - Remove document - query(query, parameters, partition_key, max_item_count) - Execute SQL queries - query_with_pagination(query, skip, limit) - Paginated queries with total count - get_all(partition_key, skip, limit) - Retrieve all with pagination - exists(item_id, partition_key) - Check if document exists - batch_create(items) - Create multiple items

Entity Repositories (e.g., crew.py): Extend BaseRepository with entity-specific methods:

class CrewRepository(BaseRepository):
    def get_partition_key(self, item: dict[str, Any]) -> str:
        return item.get('crewId', item.get('id', ''))

    async def create_crew(self, crew_data: CrewCreate) -> Crew:
        # Business logic and validation
        pass

    async def get_crew_by_code(self, crew_code: str) -> Crew | None:
        # Custom query methods with proper typing
        pass

class CrewTradeRepository(BaseRepository):
    # Separate repository for crew trades
    pass

class CrewMemberRepository(BaseRepository):
    # Separate repository for crew members
    pass

Key Responsibilities: - Transform Pydantic models to/from Cosmos DB documents using model_dump() and **dict - Execute complex SQL queries with parameterization - Handle partition keys correctly (abstract method enforced) - Manage timestamps (createdAt, updatedAt) automatically - Provide type-safe interfaces with modern Python typing

6. API Layer (api/v1/routers/)

Location: app/api/v1/routers/

FastAPI routers define HTTP endpoints.

Structure (e.g., clients.py):

router = APIRouter()

# Dependency injection for repository
async def get_client_repo() -> ClientRepository:
    return ClientRepository(cosmos_db.get_container("clients"))

# Endpoint definition
@router.get("", response_model=PaginatedResponse)
async def get_clients(
    search: Optional[str] = Query(None),
    skip: int = Query(0, ge=0),
    limit: int = Query(20, ge=1, le=100),
    repo: ClientRepository = Depends(get_client_repo)
):
    result = await repo.get_all_clients(search=search, skip=skip, limit=limit)
    return PaginatedResponse(**result)

Key Features: - Dependency injection for repositories - Request validation via Pydantic models - Response models for serialization - Query parameter validation - Path parameter validation - HTTP status codes - Error handling with HTTPException


Data Flow

Understanding how data flows through the application:

1. Request Flow (Incoming)

flowchart TD
    Req["HTTP Request"]
    Router["FastAPI Router (API Layer)"]
    Schema["Pydantic Schema (Request Model)"]
    Repo["Repository (Data Access Layer)"]
    Container["Cosmos DB Container"]
    DB["Database (Cosmos DB)"]

    Req --> Router
    Router -->|"validates request"| Schema
    Schema -->|"dependency injection"| Repo
    Repo -->|"SQL query"| Container
    Container --> DB

Example: Creating a client 1. POST /api/v1/clients with JSON body 2. Router receives request, validates against ClientCreate schema 3. Router calls repo.create_client(client_data) 4. Repository transforms to database document 5. Repository executes container.create_item() 6. Cosmos DB stores document

2. Response Flow (Outgoing)

flowchart TD
    DB["Database (Cosmos DB)"]
    Container["Cosmos DB Container"]
    Repo["Repository (Data Access Layer)"]
    Schema["Pydantic Schema (Response Model)"]
    Router["FastAPI Router (API Layer)"]
    Resp["HTTP Response"]

    DB --> Container
    Container -->|"document"| Repo
    Repo -->|"transforms to Pydantic model"| Schema
    Schema -->|"serializes to JSON"| Router
    Router --> Resp

Example: Retrieving a client 1. Repository queries container.read_item(id, partition_key) 2. Cosmos DB returns document dict 3. Repository converts to Client(**result) 4. Pydantic validates and serializes 5. FastAPI returns JSON response

3. Dependency Injection Flow

# 1. Define dependency function
async def get_client_repo() -> ClientRepository:
    return ClientRepository(cosmos_db.get_container("clients"))

# 2. Inject into endpoint
@router.get("/{client_id}")
async def get_client(
    client_id: str,
    repo: ClientRepository = Depends(get_client_repo)  # ← Injected
):
    return await repo.get_client(client_id)

FastAPI automatically: - Calls get_client_repo() before the endpoint - Passes the result as repo parameter - Reuses across multiple endpoint calls - Manages lifecycle


Integration with Frontend

1. API Contract

The frontend communicates with the backend via RESTful JSON APIs.

Base URL: http://localhost:8000/api/v1 (development)

Common Patterns:

List Resources (Paginated):

GET /api/v1/clients?skip=0&limit=20&search=acme
Response: {
  "data": [...],
  "total": 45,
  "skip": 0,
  "limit": 20,
  "hasMore": true
}

Get Single Resource:

GET /api/v1/clients/abc-123
Response: {
  "id": "abc-123",
  "clientCode": "ACME",
  "clientName": "ACME Corp",
  ...
}

Create Resource:

POST /api/v1/clients
Body: {
  "clientCode": "ACME",
  "clientName": "ACME Corp",
  "clientType": "private",
  ...
}
Response: 201 Created
{
  "id": "generated-id",
  "clientCode": "ACME",
  "createdAt": "2025-10-09T...",
  ...
}

Update Resource:

PUT /api/v1/clients/abc-123
Body: {
  "clientName": "ACME Corporation"
}
Response: 200 OK
{
  "id": "abc-123",
  "clientName": "ACME Corporation",
  "updatedAt": "2025-10-09T...",
  ...
}

Delete Resource:

DELETE /api/v1/clients/abc-123
Response: 204 No Content

2. Field Naming Convention

Backend (Python): snake_case

client_name: str
created_at: datetime

Frontend (TypeScript): camelCase

clientName: string
createdAt: string

Pydantic handles conversion automatically via field aliases:

class Client(BaseSchema):
    client_name: str = Field(alias="clientName")
    created_at: datetime = Field(alias="createdAt")

    model_config = ConfigDict(populate_by_name=True)

3. CORS Configuration

Configured in app/main.py:

app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.BACKEND_CORS_ORIGINS,  # Frontend URLs
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

Default allowed origins (from config.py): - http://localhost:3000 (React dev server) - http://localhost:5173 (Vite dev server) - https://*.azurestaticapps.net (Azure deployment)

4. Error Handling

Consistent error responses:

Response: 400 Bad Request
{
  "detail": "Client with this code already exists"
}

Response: 404 Not Found
{
  "detail": "Client not found"
}

Response: 500 Internal Server Error
{
  "detail": "Failed to retrieve clients"
}

5. API Documentation

Auto-generated from Pydantic models and FastAPI decorators:

Swagger UI: http://localhost:8000/api/docs - Interactive API explorer - Try out endpoints - View request/response schemas

ReDoc: http://localhost:8000/api/redoc - Beautiful API documentation - Organized by tags - Downloadable OpenAPI spec


Key Patterns

1. Repository Pattern

Purpose: Separate data access logic from business logic

Implementation:

# Base repository provides common CRUD
class BaseRepository(ABC, Generic[T]):
    async def create(self, item): ...
    async def get_by_id(self, id, pk): ...
    async def update(self, item): ...
    async def delete(self, id, pk): ...

# Entity repositories add specific queries
class ClientRepository(BaseRepository):
    async def get_client_by_code(self, code): ...
    async def get_client_projects(self, client_id): ...

Benefits: - Testable (mock repositories) - Reusable query logic - Consistent error handling - Database abstraction

2. Pydantic Model Hierarchy

Purpose: Separate concerns for different use cases

Pattern:

# Base - shared fields
class ClientBase(BaseSchema):
    client_code: str
    client_name: str

# Create - what's needed to create
class ClientCreate(ClientBase):
    pass  # Requires all base fields

# Update - what can be updated
class ClientUpdate(BaseSchema):
    client_name: Optional[str] = None  # All fields optional

# Full model - what's stored/returned
class Client(ClientBase, BaseDocument):
    client_id: str
    created_at: datetime
    project_count: int = 0

Benefits: - Type safety for different operations - Clear validation rules - Prevents over-posting - Self-documenting API

3. Dependency Injection

Purpose: Manage shared resources and cross-cutting concerns

Pattern:

# Define dependency
async def get_repo() -> ClientRepository:
    return ClientRepository(cosmos_db.get_container("clients"))

# Inject into endpoints
@router.get("/{id}")
async def get_item(
    id: str,
    repo: ClientRepository = Depends(get_repo)
):
    return await repo.get_client(id)

Benefits: - Testable (inject mocks) - Reusable across endpoints - Lifecycle management - Clean separation

4. Async/Await Pattern

Purpose: Non-blocking I/O for better performance

Pattern:

# All database operations are async
async def create_client(self, client_data: ClientCreate):
    result = await self.container.create_item(body=item)
    return Client(**result)

# Endpoints are async
@router.post("")
async def create_client(
    client_data: ClientCreate,
    repo: ClientRepository = Depends(get_client_repo)
):
    return await repo.create_client(client_data)

Benefits: - Concurrent request handling - Non-blocking database calls - Better resource utilization - Scalability

5. Partition Key Strategy

Purpose: Optimize Cosmos DB queries and costs

Pattern:

# Each container has a partition key
container_configs = {
    "clients": {"partition_key": "/clientId"},
    "projects": {"partition_key": "/clientId"},  # Co-located with client
    "services": {"partition_key": "/projectId"},  # Co-located with project
}

# Repositories implement partition key logic
class ClientRepository(BaseRepository):
    def get_partition_key(self, item):
        return item.get('clientId', item.get('id', ''))

Benefits: - Efficient queries - Reduced RU costs - Logical data grouping - Better performance


Domain Models Overview

Core Entities

Clients (/api/v1/clients) - Represents construction clients - Partition key: clientId (self-referencing) - Relationships: One-to-many with Projects

Projects (/api/v1/projects) - Construction projects for clients - Partition key: clientId (co-located with client) - Relationships: Belongs to Client, has many Services

Services (/api/v1/projects/{projectId}/services) - Services performed on projects (Estimation, Loan Monitoring, etc.) - Partition key: projectId (co-located with project) - Relationships: Belongs to Project, has many Service Crews

Crews (/api/v1/crews) - Reusable crew compositions with three-tier system: - Crew Trades (/api/v1/crews/trades): Universal trade definitions (e.g., Carpenter, Electrician) - Partition key: tradeCode - Fields: tradeCode, tradeName, category, description - Crew Members (/api/v1/crews/members): Location/time-specific rate cards - Partition key: locationKey (format: country|province|tradeCode|laborDesignation) - Fields: tradeCode, laborDesignation, location, year, quarter, rates - Crews (/api/v1/crews): Crew templates with manpower and equipment composition - Partition key: crewId - Fields: crewCode, crewName, manpower[], equipment[], discipline

Service Crews (/api/v1/service-crews) - Links crews to services with project-specific contextual data - Partition key: serviceCrewId - Fields: serviceId, crewId, quantity, rate, indirectCosts (labor/equipment breakdowns) - Relationships: Links Service to Crew with indirect cost overrides

Equipment (/api/v1/equipment) - Equipment catalog with unit prices - Partition key: equipmentCode - Fields: equipmentCode, equipmentName, equipmentType, category, unit, baseUnitPrice

Material Takeoff (MTO) (/api/v1/mto) - MTO discipline sheets for estimation services (9 disciplines) - Partition key: projectId - Composite Key: (projectId, sourceFileName, discipline, itemNo) - Fields: projectId, serviceId, discipline, sourceFileName, itemNo, description, quantity, etc. - Disciplines: Electrical Equipment, Mechanical Equipment, Bulk Electrical, Instrumentation, Piping, Civil, Concrete, Structural, Architectural - Import/Export: Async import with progress tracking, Excel export (multi-sheet and single-sheet)

Equipment Tags (/api/v1/equipment-tags) - Project-specific equipment tags - Partition key: projectId - Fields: projectId, tagNo, equipmentCode, description, etc.

Material (/api/v1/material) - Material catalog with unit prices - Partition key: materialCode - Fields: materialCode, materialName, category, unit, baseUnitPrice

WBS (/api/v1/wbs) - Work Breakdown Structure hierarchy - Partition key: wbsCode - Fields: wbsCode, wbsText, parentCode, level, etc.

Tasks (Background Task Tracking) - Async operation tracking (e.g., MTO import) - Partition key: projectId - Fields: taskId, taskType, status, progress, result, error - Status: pending, running, completed, failed - Task Types: mto_import, etc.

Additional Domain Models

Subcontractors (/api/v1/projects/{projectId}/subcontractors) - Project-specific subcontractor management - Partition key: projectId - Fields: subcontractorCode, subcontractorName, allInRate, description, contact info - allInRate - All-inclusive hourly/unit rate for cost calculations - Supports batch import for bulk data loading

Labor Productivity Settings (/api/v1/services/{serviceId}/labor-settings) - Per-discipline labor productivity factors for estimation services - Stored in Services container (type: labor_productivity_settings) - One document per service with factors for all 9 MTO disciplines - Used in labor cost calculations: - totalManhours = quantity × manhourPerUnit × laborProdFactor - laborUnitCost = manhourPerUnit × laborProdFactor × hourlyCost - Auto-creates default settings (factor=1.0) if none exist

Project Material Costs (/api/v1/project-material-costs) - Links MTO items to Material catalog with cost overrides - Partition key: projectId - Foreign keys: projectId, itemNo (MTO), materialId (Material) - Supports discipline-specific material fields (Piping, Concrete, Structural) - Detailed response includes enriched MTO and Material data


Advanced Topics

1. Cosmos DB Queries

The repository layer uses parameterized queries:

query = "SELECT * FROM c WHERE c.type = @type AND c.clientId = @clientId"
parameters = [
    {"name": "@type", "value": "project"},
    {"name": "@clientId", "value": client_id}
]
results = await self.query(query, parameters)

2. Pagination Implementation

Uses SQL OFFSET/LIMIT:

async def query_with_pagination(self, query, skip=0, limit=20):
    paginated_query = f"{query} OFFSET {skip} LIMIT {limit}"
    items = await self.query(paginated_query)

    # Get total count
    count_query = query.replace("SELECT *", "SELECT VALUE COUNT(1)")
    total = (await self.query(count_query))[0]

    return {
        "data": items,
        "total": total,
        "hasMore": (skip + limit) < total
    }

3. Error Handling Strategy

Layered error handling:

# Repository layer - logs and re-raises
try:
    result = await self.container.create_item(body=item)
except CosmosHttpResponseError as e:
    logger.error(f"Failed to create item: {str(e)}")
    raise

# Router layer - converts to HTTP errors
try:
    client = await repo.create_client(client_data)
except ValueError as e:
    raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
    logger.error(f"Failed to create client: {str(e)}")
    raise HTTPException(status_code=500, detail="Failed to create client")

4. Lifespan Management

Modern FastAPI lifespan pattern:

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    await cosmos_db.initialize()
    yield
    # Shutdown
    await cosmos_db.close()

app = FastAPI(lifespan=lifespan)


Background Task Management

Task Service Pattern

For long-running operations (e.g., MTO import), use the Task Service pattern:

1. Create Task Record:

from app.services.tasks.task_service import TaskService

task_service = TaskService()
task = await task_service.create_task(
    task_type="mto_import",
    project_id="proj-123",
    metadata={
        "filename": "mto_data.xlsx",
        "strategy": "merge",
        "file_size": 1024000
    }
)
task_id = task["taskId"]

2. Queue Background Task:

from fastapi import BackgroundTasks

background_tasks.add_task(
    process_import_in_background,
    task_id=task_id,
    project_id=project_id,
    file_content=content,
    ...
)

# Return task ID immediately to frontend
return TaskSubmitResponse(
    taskId=task_id,
    status="pending",
    pollUrl=f"/api/v1/mto/import/status/{task_id}?projectId={project_id}"
)

3. Update Progress in Background Worker:

async def process_import_in_background(task_id, project_id, ...):
    # Mark as running
    await task_service.update_task_status(task_id, project_id, TaskStatus.RUNNING)

    # Process with progress updates
    for i, sheet in enumerate(sheets):
        # Update progress
        await task_service.update_task_progress(
            task_id=task_id,
            project_id=project_id,
            current=i + 1,
            total=len(sheets),
            message=f"Processing {sheet_name} ({i+1}/{len(sheets)})"
        )

        # Process sheet...

    # Mark as completed
    await task_service.update_task_status(
        task_id=task_id,
        project_id=project_id,
        status=TaskStatus.COMPLETED,
        result={"success": True, "results": [...]}
    )

4. Poll Status from Frontend:

# GET /api/v1/mto/import/status/{taskId}?projectId={projectId}

@router.get("/import/status/{task_id}")
async def get_import_task_status(task_id, project_id, task_service=Depends(...)):
    task = await task_service.get_task(task_id, project_id)
    return TaskResponse(**task)

Idempotency Protection

Prevent duplicate background tasks when user retries:

# Check for recent duplicate task
existing_task = await task_service.find_recent_duplicate_task(
    project_id=project_id,
    task_type="mto_import",
    filename=filename,
    file_size=len(content),
    strategy=strategy,
    max_age_minutes=10  # Only consider tasks created in last 10 minutes
)

if existing_task:
    # Return existing task instead of creating new one
    return TaskSubmitResponse(
        taskId=existing_task["taskId"],
        status=existing_task["status"],
        message="Import already in progress. Returning existing task."
    )

Duplicate Detection Criteria: - Same projectId - Same taskType - Same filename - Same file_size (bytes) - Same strategy - Created within last 10 minutes

Task Status Lifecycle

stateDiagram-v2
    [*] --> pending
    pending --> running
    running --> completed
    running --> failed

Status Fields: - pending - Task created, not started - running - Task executing (has progress object) - completed - Task finished successfully (has result object) - failed - Task encountered error (has error string)

Progress Object:

{
    "current": 3,           # Current progress count
    "total": 5,             # Total items to process
    "percentage": 60.0,     # Completion percentage (0-100)
    "message": "Processing Electrical Equipment (3/5)"
}


Batch Operations

Cosmos DB Batch API

BaseRepository implements batch operations using Cosmos DB's transactional batch API:

Features: - Up to 100 items per transaction (Cosmos limit) - Transactional guarantees (all-or-nothing) - Automatic chunking for >100 items - Progress callbacks for UI updates - Exponential backoff retry for throttling (429 errors) - 500ms delay between batches to avoid rate limiting

Batch Create Example:

# Repository method
async def _execute_batch_create(
    self,
    items: list[dict],
    partition_key: str,
    progress_callback: Callable | None = None
) -> list[dict]:
    """
    Create up to 100 items in a single transaction.

    Args:
        items: List of items to create (max 100)
        partition_key: Partition key value (all items must have same PK)
        progress_callback: Optional async callback(current, total, message)

    Returns:
        List of created items
    """
    if len(items) > 100:
        raise ValueError("Batch size cannot exceed 100 items")

    # Create batch operations
    batch_operations = []
    for item in items:
        batch_operations.append(("create", (item,)))

    # Execute batch with retries
    for attempt in range(self.batch_retry_attempts):
        try:
            results = await self.container.execute_item_batch(
                batch_operations=batch_operations,
                partition_key=partition_key
            )

            # Update progress if callback provided
            if progress_callback:
                await progress_callback(len(items), len(items), "Batch completed")

            return results

        except CosmosHttpResponseError as e:
            if e.status_code == 429 and attempt < self.batch_retry_attempts - 1:
                # Throttling - exponential backoff
                wait_time = (2 ** attempt) * 0.5  # 0.5s, 1s, 2s
                await asyncio.sleep(wait_time)
                continue
            raise

Chunked Batch Create (>100 items):

async def batch_create_large(
    self,
    items: list[dict],
    progress_callback: Callable | None = None
) -> list[dict]:
    """
    Create any number of items by chunking into batches of 100.
    """
    # Group by partition key
    items_by_pk = {}
    for item in items:
        pk = self.get_partition_key(item)
        if pk not in items_by_pk:
            items_by_pk[pk] = []
        items_by_pk[pk].append(item)

    all_results = []
    total_processed = 0

    for pk, pk_items in items_by_pk.items():
        # Chunk into batches of 100
        chunks = [pk_items[i:i+100] for i in range(0, len(pk_items), 100)]

        for chunk_index, chunk in enumerate(chunks):
            # Execute batch
            results = await self._execute_batch_create(
                items=chunk,
                partition_key=pk,
                progress_callback=None  # Use overall callback instead
            )
            all_results.extend(results)
            total_processed += len(chunk)

            # Update overall progress
            if progress_callback:
                await progress_callback(
                    total_processed,
                    len(items),
                    f"Created {total_processed}/{len(items)} items"
                )

            # Rate limiting: wait between batches
            if chunk_index < len(chunks) - 1:
                await asyncio.sleep(0.5)  # 500ms delay

    return all_results

Progress Callback Pattern:

# In MTO import background task
async def progress_callback(current, total, message):
    await task_service.update_task_progress(
        task_id=task_id,
        project_id=project_id,
        current=current,
        total=total,
        message=message
    )

# Use in batch operation
await mto_repo.batch_create_large(
    items=mto_items,
    progress_callback=progress_callback
)


Best Practices

For Backend Development

  1. Always use async/await for I/O operations
  2. Use type hints throughout for IDE support
  3. Validate with Pydantic models, not manual checks
  4. Handle partition keys correctly in repositories
  5. Log errors before raising exceptions
  6. Use dependency injection for testability
  7. Keep routers thin - business logic in repositories
  8. Document with docstrings for auto-generated docs

For Frontend Integration

  1. Use the OpenAPI spec to generate TypeScript types
  2. Handle pagination consistently with skip/limit
  3. Use proper HTTP methods (GET, POST, PUT, DELETE)
  4. Check HTTP status codes for error handling
  5. Handle camelCase/snake_case conversion automatically
  6. Use the Swagger UI for API exploration during development

Deployment

Azure Functions

The backend can deploy to Azure Functions:

Entry point: function_app.py

import azure.functions as func
from app.main import app as fastapi_app

app = func.AsgiFunctionApp(
    app=fastapi_app,
    http_auth_level=func.AuthLevel.ANONYMOUS
)

This wraps the FastAPI app in an Azure Functions ASGI wrapper.

Environment Variables

Required in production: - COSMOS_ENDPOINT - Cosmos DB endpoint URL - COSMOS_KEY - Cosmos DB primary key - DATABASE_NAME - Database name - BACKEND_CORS_ORIGINS - Allowed frontend origins

Azure Services: - AZURE_STORAGE_CONNECTION_STRING - Azure Blob Storage connection string - AZURE_STORAGE_CONTAINER_NAME - Blob container for file imports (default: "imports") - AZURE_SERVICE_BUS_CONNECTION_STRING - Azure Service Bus connection string - AZURE_SERVICE_BUS_QUEUE_NAME - Queue for async processing (default: "import-queue")


Troubleshooting

Common Issues

1. "Cosmos DB not initialized" - Ensure await cosmos_db.initialize() is called - Check environment variables are set - Verify Cosmos DB credentials

2. "Partition key mismatch" - Verify get_partition_key() implementation - Ensure partition key field exists in document - Check container configuration

3. "CORS errors" - Add frontend URL to BACKEND_CORS_ORIGINS - Restart backend after changing CORS settings - Check browser network tab for actual origin

4. "Validation errors" - Check Pydantic model field names - Verify aliases match frontend camelCase - Use populate_by_name=True for flexibility


Next Steps

  1. Authentication & Authorization
  2. Add JWT tokens
  3. Implement role-based access
  4. Secure sensitive endpoints

  5. Service Layer

  6. Add business logic between routers and repositories
  7. Implement complex workflows
  8. Coordinate multi-repository operations

  9. Caching

  10. Add Redis for frequently accessed data
  11. Implement cache invalidation
  12. Reduce Cosmos DB RU consumption

  13. Testing

  14. Unit tests for repositories
  15. Integration tests for endpoints
  16. Mock Cosmos DB for testing

  17. Monitoring

  18. Application Insights integration
  19. Custom metrics
  20. Performance monitoring
  21. Error tracking

  22. API Versioning

  23. Plan for v2 endpoints
  24. Maintain backward compatibility
  25. Deprecation strategy

Resources

  • FastAPI Documentation: https://fastapi.tiangolo.com
  • Pydantic Documentation: https://docs.pydantic.dev
  • Azure Cosmos DB SDK: https://learn.microsoft.com/python/api/azure-cosmos
  • API Documentation: http://localhost:8000/api/docs (when running)

Summary

The LCO backend is a modern, well-architected FastAPI application that:

  • ✅ Uses layered architecture for separation of concerns
  • ✅ Implements repository pattern for data access
  • ✅ Leverages Pydantic for type safety and validation
  • ✅ Supports async/await for performance
  • ✅ Provides dependency injection for testability
  • ✅ Integrates seamlessly with Azure Cosmos DB
  • ✅ Auto-generates API documentation
  • ✅ Follows REST conventions for frontend integration

This architecture provides a solid foundation for building scalable, maintainable construction management applications.