Import Processing Architecture¶
Version: 1.0.0 Last Updated: 2025-01-13 Status: Production Ready
Overview¶
The Import Processing system handles asynchronous document imports from Azure Service Bus. It supports 14 document types across MTO, schedule, and universal table imports.
Key Design Goal: Shared processing logic between Azure Functions and Container Apps Job, ensuring behavioral parity regardless of execution environment.
Architecture Diagram¶
┌──────────────────────────────────────────────────────────────────────┐
│ Service Bus Queue │
│ (import-queue messages) │
└──────────────────────────────┬───────────────────────────────────────┘
│
┌───────────────────┴───────────────────┐
▼ ▼
┌─────────────────────┐ ┌─────────────────────┐
│ function_app.py │ │ worker.py │
│ (Azure Functions) │ │ (Container Apps Job)│
│ ~90 lines │ │ ~200 lines │
│ │ │ │
│ • Azure trigger │ │ • Service Bus SDK │
│ • autoComplete=true │ │ • Batch receive │
│ • Re-raise on error │ │ • Lock renewal │
│ │ │ • SIGTERM handling │
└─────────┬───────────┘ └──────────┬──────────┘
│ │
└───────────────┬────────────────────────┘
▼
┌─────────────────────────────────────────┐
│ app/services/import_processing/ │
│ (Shared Module) │
│ │
│ process_import_message() ← SINGLE │
│ ENTRY │
│ POINT │
└─────────────────┬───────────────────────┘
│
┌─────────────────┼─────────────────────────┐
│ │ │
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────────────┐
│ schemas.py │ │ registry.py │ │ processor.py │
│ │ │ │ │ │
│ ImportMessage │ │ IMPORT_HANDLERS│ │ • Blob download │
│ DeleteMessage │ │ DELETE_HANDLERS│ │ • Handler routing │
│ │ │ get_handler() │ │ • Cosmos init │
│ │ │ is_bulk_op() │ │ • Blob cleanup │
└───────────────┘ └───────┬───────┘ └───────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ app/import_handlers/ │
│ (14 Handler Classes) │
│ │
│ mto, estimation_master, │
│ schedule_extraction, schedule_upload, │
│ schedule_project_extraction, │
│ schedule_delete_all, schedule_fill_*, │
│ equipment, material, crew, etc. │
└─────────────────────────────────────────┘
Component Responsibilities¶
| Component | Location | Responsibility |
|---|---|---|
| function_app.py | functions/import-processor/ |
Azure Functions trigger, auto-complete, exception re-raise for retry |
| worker.py | backend/ |
Service Bus SDK, batch receive, lock renewal, SIGTERM graceful shutdown |
| schemas.py | app/services/import_processing/ |
Pydantic validation for ImportMessage / DeleteMessage |
| registry.py | app/services/import_processing/ |
Handler lookup, is_bulk_operation(), should_download_blob() |
| processor.py | app/services/import_processing/ |
Core logic: parse → validate → download blob → route → execute → cleanup |
| import_handlers/ | app/import_handlers/ |
14 handlers implementing actual import/delete business logic |
Execution Environments¶
Azure Functions (function_app.py)¶
Use Case: Production serverless deployment with automatic scaling.
Characteristics:
- Triggered by Service Bus queue binding
- autoComplete=true in host.json - message auto-completed on success
- Re-raises exceptions to trigger Service Bus retry
- Minimal code (~90 lines) - thin wrapper only
Message Handling:
@app.service_bus_queue_trigger(...)
async def import_processor(msg: func.ServiceBusMessage):
raw_message = msg.get_body().decode("utf-8")
await process_import_message(raw_message, msg.delivery_count, msg.message_id)
# Success: auto-completes message
# Exception: re-raises, Service Bus retries
Container Apps Job (worker.py)¶
Use Case: Development, testing, or hybrid deployment with Container Apps.
Characteristics: - Uses Azure Service Bus SDK directly - Batch message receive with configurable limits - Explicit message lock renewal for long operations - Graceful shutdown on SIGTERM/SIGINT - Explicit complete/abandon for message handling
Message Handling:
raw_message = b"".join(msg.body).decode("utf-8")
success = await handle_message(msg)
if success:
await receiver.complete_message(msg)
else:
await receiver.abandon_message(msg) # Retry or dead-letter
Handler Registry¶
Import Handlers (14 types)¶
IMPORT_HANDLERS = {
# MTO/estimation handlers
"mto": MTOImportHandler,
"estimation_master": EstimationMasterHandler,
# Schedule handlers
"schedule_extraction": ScheduleExtractionHandler,
"schedule_project_extraction": ScheduleProjectExtractionHandler,
"schedule_upload": ScheduleUploadHandler,
"schedule_delete_all": ScheduleBulkHandler,
"schedule_fill_level": ScheduleBulkHandler,
"schedule_clear_level": ScheduleBulkHandler,
"schedule_fill_column": ScheduleBulkHandler,
# Universal table import handlers
"equipment": EquipmentHandler,
"material": MaterialHandler,
"crew": CrewHandler,
"crew_member": CrewMemberHandler,
"crew_trade": CrewTradeHandler,
}
Delete Handlers (2 types)¶
DELETE_HANDLERS = {
"estimation_master_delete": EstimationMasterHandler,
"mto_delete": MTOImportHandler,
}
Message Flow¶
1. Message arrives in Service Bus queue
2. Wrapper extracts raw message body:
- function_app.py: msg.get_body().decode("utf-8")
- worker.py: b"".join(msg.body).decode("utf-8")
3. Both call the SAME function:
await process_import_message(raw_message, delivery_count, message_id)
4. Shared processor:
├─ Initialize Cosmos DB (idempotent)
├─ Parse JSON → ImportMessage/DeleteMessage
├─ Check if delete operation (ends with "_delete")
│ └─ If delete: route to DELETE_HANDLERS
├─ Check if needs blob download:
│ ├─ NOT for: bulk ops, deletes, self-download types
│ └─ YES for: mto, estimation_master, schedule_extraction, etc.
├─ Preflight check (schedule_extraction already completed?)
├─ Download blob if needed
├─ Build handler kwargs (per document_type)
├─ Execute handler.process_import() or handler.process_delete()
└─ Delete blob on success (cost savings)
5. Wrapper handles success/failure:
- function_app.py: return normally (auto-complete) or re-raise (retry)
- worker.py: complete_message() or abandon_message()
Message Schemas¶
ImportMessage¶
class ImportMessage(BaseModel):
# Required fields for all operations
document_type: str # "mto", "schedule_extraction", etc.
task_id: str # Unique task identifier
project_id: str # Project ID (partition key)
# Blob-related fields (not required for bulk ops)
blob_name: str | None # Blob storage file name
filename: str | None # Original filename
strategy: str = "merge" # Import strategy
# MTO/estimation_master specific
service_id: str | None
# Schedule extraction specific
extraction_id: str | None
primavera_project_id: str | None
schedule_config: dict | None
# Schedule bulk operations
operation: str | None
activity_ids: list[str] | None
filters: dict | None
payload: dict | None
# Schedule upload specific
temp_blob_name: str | None
target_blob_name: str | None
original_filename: str | None
direct_upload: bool = False
DeleteMessage¶
class DeleteMessage(BaseModel):
document_type: str # "estimation_master_delete", "mto_delete"
task_id: str
project_id: str
service_id: str # Service ID to delete data for
Key Logic¶
Blob Download Decision¶
def should_download_blob(document_type: str) -> bool:
# Delete operations don't need blob
if is_delete_operation(document_type):
return False
# Bulk operations don't need blob
if is_bulk_operation(document_type):
return False
# These handlers manage their own downloads
SELF_DOWNLOAD_TYPES = {"schedule_upload", "schedule_project_extraction"}
return document_type not in SELF_DOWNLOAD_TYPES
Bulk Operation Detection¶
def is_bulk_operation(document_type: str) -> bool:
# schedule_extraction is NOT bulk - it needs blob content
return document_type.startswith("schedule_") and document_type not in ("schedule_extraction",)
Handler Kwargs Routing¶
Each document type receives specific kwargs:
| Document Type | Kwargs |
|---|---|
mto, estimation_master |
service_id |
schedule_extraction |
extraction_id, primavera_project_id, schedule_config |
schedule_project_extraction |
blob_name, primavera_project_id, schedule_config |
schedule_upload |
temp_blob_name, target_blob_name, original_filename, direct_upload, blob_name |
| Bulk schedule ops | operation, activity_ids, filters, payload |
Retry Behavior¶
Service Bus Configuration¶
| Setting | Value | Description |
|---|---|---|
| Max Delivery Count | 10 | Messages retry up to 10 times |
| Lock Duration | 5 minutes | Time before message unlocks |
| Dead-Letter | Enabled | Failed messages go to DLQ |
Retry Flow¶
Message Received (delivery_count=1)
│
▼
Process Message
│
┌────┴────┐
│ │
Success Failure
│ │
▼ ▼
Complete Abandon/Re-raise
(removed) │
▼
delivery_count++
│
▼
retry < 10?
┌────┴────┐
│ │
Yes No
│ │
▼ ▼
Retry Dead-Letter
Preflight Check (schedule_extraction)¶
Before downloading blob, checks if extraction already completed:
async def preflight_check_schedule_extraction(document_type, message):
if document_type != "schedule_extraction":
return True # Continue processing
# Check task status in Cosmos DB
existing_doc = await tasks_container.read_item(
item=message.extraction_id,
partition_key=message.project_id
)
if existing_doc.get("status") in ("completed", "failed"):
return False # Skip processing (already done)
return True # Continue processing
This prevents retry loops when blob was deleted after successful extraction.
File Locations¶
Shared Module¶
backend/app/services/import_processing/
├── __init__.py # Public API exports
├── schemas.py # ImportMessage, DeleteMessage
├── registry.py # Handler mappings, routing logic
└── processor.py # Core processing logic
Import Handlers¶
backend/app/import_handlers/
├── __init__.py # Handler exports
├── base.py # ImportHandler base class
├── mto_handler.py
├── estimation_master_handler.py
├── schedule_handler.py
├── schedule_bulk_handler.py
├── schedule_upload_handler.py
├── schedule_project_handler.py
├── equipment_handler.py
├── material_handler.py
├── crew_handler.py
├── crew_member_handler.py
└── crew_trade_handler.py
Wrappers¶
backend/functions/import-processor/
└── function_app.py # Azure Functions wrapper (~90 lines)
backend/
└── worker.py # Container Apps Job wrapper (~200 lines)
Adding a New Document Type¶
-
Create Handler in
app/import_handlers/: -
Register in registry.py:
-
Add kwargs routing in
processor.py(if needed): -
Update schemas.py if new message fields needed.
Environment Variables¶
| Variable | Description | Default |
|---|---|---|
AZURE_SERVICE_BUS_CONNECTION_STRING |
Service Bus connection | Required |
AZURE_SERVICE_BUS_QUEUE_NAME |
Queue name | import-queue |
AZURE_STORAGE_CONNECTION_STRING |
Blob Storage connection | Required |
AZURE_STORAGE_CONTAINER_NAME |
Blob container | imports |
MAX_MESSAGES_PER_RUN |
Worker batch limit | 10 |
MAX_WAIT_TIME_SECONDS |
Worker wait time | 30 |
Troubleshooting¶
Message stuck in queue¶
- Check Service Bus metrics for active messages
- Verify worker/function is running
- Check dead-letter queue for failed messages
Handler not found¶
- Verify
document_typeis registered inregistry.py - Check for case sensitivity (lowercase expected)
Blob download fails¶
- Check
blob_nameis set in message - Verify storage connection string
- Ensure blob exists in container
Duplicate processing¶
- Check preflight logic for your document type
- Ensure message completion/abandonment is correct
- Review dead-letter queue for patterns
Related Documentation¶
- MTO Import/Export Guide - MTO-specific import details
- Backend Architecture - Overall backend architecture
- System Architecture - High-level system diagram