Skip to content

Import Processing Architecture

Version: 1.0.0 Last Updated: 2025-01-13 Status: Production Ready


Overview

The Import Processing system handles asynchronous document imports from Azure Service Bus. It supports 14 document types across MTO, schedule, and universal table imports.

Key Design Goal: Shared processing logic between Azure Functions and Container Apps Job, ensuring behavioral parity regardless of execution environment.


Architecture Diagram

┌──────────────────────────────────────────────────────────────────────┐
│                        Service Bus Queue                              │
│                      (import-queue messages)                          │
└──────────────────────────────┬───────────────────────────────────────┘
           ┌───────────────────┴───────────────────┐
           ▼                                       ▼
┌─────────────────────┐                 ┌─────────────────────┐
│   function_app.py   │                 │     worker.py       │
│  (Azure Functions)  │                 │ (Container Apps Job)│
│     ~90 lines       │                 │    ~200 lines       │
│                     │                 │                     │
│ • Azure trigger     │                 │ • Service Bus SDK   │
│ • autoComplete=true │                 │ • Batch receive     │
│ • Re-raise on error │                 │ • Lock renewal      │
│                     │                 │ • SIGTERM handling  │
└─────────┬───────────┘                 └──────────┬──────────┘
          │                                        │
          └───────────────┬────────────────────────┘
        ┌─────────────────────────────────────────┐
        │    app/services/import_processing/      │
        │         (Shared Module)                 │
        │                                         │
        │  process_import_message()  ← SINGLE     │
        │                               ENTRY     │
        │                               POINT     │
        └─────────────────┬───────────────────────┘
        ┌─────────────────┼─────────────────────────┐
        │                 │                         │
        ▼                 ▼                         ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────────────┐
│  schemas.py   │ │  registry.py  │ │    processor.py       │
│               │ │               │ │                       │
│ ImportMessage │ │ IMPORT_HANDLERS│ │ • Blob download       │
│ DeleteMessage │ │ DELETE_HANDLERS│ │ • Handler routing     │
│               │ │ get_handler() │ │ • Cosmos init         │
│               │ │ is_bulk_op()  │ │ • Blob cleanup        │
└───────────────┘ └───────┬───────┘ └───────────────────────┘
        ┌─────────────────────────────────────────┐
        │        app/import_handlers/             │
        │         (14 Handler Classes)            │
        │                                         │
        │ mto, estimation_master,                 │
        │ schedule_extraction, schedule_upload,   │
        │ schedule_project_extraction,            │
        │ schedule_delete_all, schedule_fill_*,   │
        │ equipment, material, crew, etc.         │
        └─────────────────────────────────────────┘

Component Responsibilities

Component Location Responsibility
function_app.py functions/import-processor/ Azure Functions trigger, auto-complete, exception re-raise for retry
worker.py backend/ Service Bus SDK, batch receive, lock renewal, SIGTERM graceful shutdown
schemas.py app/services/import_processing/ Pydantic validation for ImportMessage / DeleteMessage
registry.py app/services/import_processing/ Handler lookup, is_bulk_operation(), should_download_blob()
processor.py app/services/import_processing/ Core logic: parse → validate → download blob → route → execute → cleanup
import_handlers/ app/import_handlers/ 14 handlers implementing actual import/delete business logic

Execution Environments

Azure Functions (function_app.py)

Use Case: Production serverless deployment with automatic scaling.

Characteristics: - Triggered by Service Bus queue binding - autoComplete=true in host.json - message auto-completed on success - Re-raises exceptions to trigger Service Bus retry - Minimal code (~90 lines) - thin wrapper only

Message Handling:

@app.service_bus_queue_trigger(...)
async def import_processor(msg: func.ServiceBusMessage):
    raw_message = msg.get_body().decode("utf-8")
    await process_import_message(raw_message, msg.delivery_count, msg.message_id)
    # Success: auto-completes message
    # Exception: re-raises, Service Bus retries

Container Apps Job (worker.py)

Use Case: Development, testing, or hybrid deployment with Container Apps.

Characteristics: - Uses Azure Service Bus SDK directly - Batch message receive with configurable limits - Explicit message lock renewal for long operations - Graceful shutdown on SIGTERM/SIGINT - Explicit complete/abandon for message handling

Message Handling:

raw_message = b"".join(msg.body).decode("utf-8")
success = await handle_message(msg)
if success:
    await receiver.complete_message(msg)
else:
    await receiver.abandon_message(msg)  # Retry or dead-letter


Handler Registry

Import Handlers (14 types)

IMPORT_HANDLERS = {
    # MTO/estimation handlers
    "mto":                       MTOImportHandler,
    "estimation_master":         EstimationMasterHandler,

    # Schedule handlers
    "schedule_extraction":       ScheduleExtractionHandler,
    "schedule_project_extraction": ScheduleProjectExtractionHandler,
    "schedule_upload":           ScheduleUploadHandler,
    "schedule_delete_all":       ScheduleBulkHandler,
    "schedule_fill_level":       ScheduleBulkHandler,
    "schedule_clear_level":      ScheduleBulkHandler,
    "schedule_fill_column":      ScheduleBulkHandler,

    # Universal table import handlers
    "equipment":                 EquipmentHandler,
    "material":                  MaterialHandler,
    "crew":                      CrewHandler,
    "crew_member":               CrewMemberHandler,
    "crew_trade":                CrewTradeHandler,
}

Delete Handlers (2 types)

DELETE_HANDLERS = {
    "estimation_master_delete":  EstimationMasterHandler,
    "mto_delete":                MTOImportHandler,
}

Message Flow

1. Message arrives in Service Bus queue

2. Wrapper extracts raw message body:
   - function_app.py: msg.get_body().decode("utf-8")
   - worker.py:       b"".join(msg.body).decode("utf-8")

3. Both call the SAME function:
   await process_import_message(raw_message, delivery_count, message_id)

4. Shared processor:
   ├─ Initialize Cosmos DB (idempotent)
   ├─ Parse JSON → ImportMessage/DeleteMessage
   ├─ Check if delete operation (ends with "_delete")
   │   └─ If delete: route to DELETE_HANDLERS
   ├─ Check if needs blob download:
   │   ├─ NOT for: bulk ops, deletes, self-download types
   │   └─ YES for: mto, estimation_master, schedule_extraction, etc.
   ├─ Preflight check (schedule_extraction already completed?)
   ├─ Download blob if needed
   ├─ Build handler kwargs (per document_type)
   ├─ Execute handler.process_import() or handler.process_delete()
   └─ Delete blob on success (cost savings)

5. Wrapper handles success/failure:
   - function_app.py: return normally (auto-complete) or re-raise (retry)
   - worker.py:       complete_message() or abandon_message()

Message Schemas

ImportMessage

class ImportMessage(BaseModel):
    # Required fields for all operations
    document_type: str          # "mto", "schedule_extraction", etc.
    task_id: str                # Unique task identifier
    project_id: str             # Project ID (partition key)

    # Blob-related fields (not required for bulk ops)
    blob_name: str | None       # Blob storage file name
    filename: str | None        # Original filename
    strategy: str = "merge"     # Import strategy

    # MTO/estimation_master specific
    service_id: str | None

    # Schedule extraction specific
    extraction_id: str | None
    primavera_project_id: str | None
    schedule_config: dict | None

    # Schedule bulk operations
    operation: str | None
    activity_ids: list[str] | None
    filters: dict | None
    payload: dict | None

    # Schedule upload specific
    temp_blob_name: str | None
    target_blob_name: str | None
    original_filename: str | None
    direct_upload: bool = False

DeleteMessage

class DeleteMessage(BaseModel):
    document_type: str   # "estimation_master_delete", "mto_delete"
    task_id: str
    project_id: str
    service_id: str      # Service ID to delete data for

Key Logic

Blob Download Decision

def should_download_blob(document_type: str) -> bool:
    # Delete operations don't need blob
    if is_delete_operation(document_type):
        return False

    # Bulk operations don't need blob
    if is_bulk_operation(document_type):
        return False

    # These handlers manage their own downloads
    SELF_DOWNLOAD_TYPES = {"schedule_upload", "schedule_project_extraction"}
    return document_type not in SELF_DOWNLOAD_TYPES

Bulk Operation Detection

def is_bulk_operation(document_type: str) -> bool:
    # schedule_extraction is NOT bulk - it needs blob content
    return document_type.startswith("schedule_") and document_type not in ("schedule_extraction",)

Handler Kwargs Routing

Each document type receives specific kwargs:

Document Type Kwargs
mto, estimation_master service_id
schedule_extraction extraction_id, primavera_project_id, schedule_config
schedule_project_extraction blob_name, primavera_project_id, schedule_config
schedule_upload temp_blob_name, target_blob_name, original_filename, direct_upload, blob_name
Bulk schedule ops operation, activity_ids, filters, payload

Retry Behavior

Service Bus Configuration

Setting Value Description
Max Delivery Count 10 Messages retry up to 10 times
Lock Duration 5 minutes Time before message unlocks
Dead-Letter Enabled Failed messages go to DLQ

Retry Flow

Message Received (delivery_count=1)
    Process Message
    ┌────┴────┐
    │         │
 Success    Failure
    │         │
    ▼         ▼
 Complete  Abandon/Re-raise
 (removed)    │
         delivery_count++
         retry < 10?
         ┌────┴────┐
         │         │
        Yes       No
         │         │
         ▼         ▼
      Retry    Dead-Letter

Preflight Check (schedule_extraction)

Before downloading blob, checks if extraction already completed:

async def preflight_check_schedule_extraction(document_type, message):
    if document_type != "schedule_extraction":
        return True  # Continue processing

    # Check task status in Cosmos DB
    existing_doc = await tasks_container.read_item(
        item=message.extraction_id,
        partition_key=message.project_id
    )

    if existing_doc.get("status") in ("completed", "failed"):
        return False  # Skip processing (already done)

    return True  # Continue processing

This prevents retry loops when blob was deleted after successful extraction.


File Locations

Shared Module

backend/app/services/import_processing/
├── __init__.py          # Public API exports
├── schemas.py           # ImportMessage, DeleteMessage
├── registry.py          # Handler mappings, routing logic
└── processor.py         # Core processing logic

Import Handlers

backend/app/import_handlers/
├── __init__.py          # Handler exports
├── base.py              # ImportHandler base class
├── mto_handler.py
├── estimation_master_handler.py
├── schedule_handler.py
├── schedule_bulk_handler.py
├── schedule_upload_handler.py
├── schedule_project_handler.py
├── equipment_handler.py
├── material_handler.py
├── crew_handler.py
├── crew_member_handler.py
└── crew_trade_handler.py

Wrappers

backend/functions/import-processor/
└── function_app.py      # Azure Functions wrapper (~90 lines)

backend/
└── worker.py            # Container Apps Job wrapper (~200 lines)

Adding a New Document Type

  1. Create Handler in app/import_handlers/:

    from .base import ImportHandler
    
    class NewTypeHandler(ImportHandler):
        async def process_import(self, task_id, project_id, file_content, ...):
            # Implementation
            pass
    

  2. Register in registry.py:

    from app.import_handlers import NewTypeHandler
    
    IMPORT_HANDLERS["new_type"] = NewTypeHandler
    

  3. Add kwargs routing in processor.py (if needed):

    def build_handler_kwargs(document_type, message):
        # ...
        elif document_type == "new_type":
            kwargs["custom_field"] = message.custom_field
        return kwargs
    

  4. Update schemas.py if new message fields needed.


Environment Variables

Variable Description Default
AZURE_SERVICE_BUS_CONNECTION_STRING Service Bus connection Required
AZURE_SERVICE_BUS_QUEUE_NAME Queue name import-queue
AZURE_STORAGE_CONNECTION_STRING Blob Storage connection Required
AZURE_STORAGE_CONTAINER_NAME Blob container imports
MAX_MESSAGES_PER_RUN Worker batch limit 10
MAX_WAIT_TIME_SECONDS Worker wait time 30

Troubleshooting

Message stuck in queue

  1. Check Service Bus metrics for active messages
  2. Verify worker/function is running
  3. Check dead-letter queue for failed messages

Handler not found

ValueError: Unsupported document type: xyz
  1. Verify document_type is registered in registry.py
  2. Check for case sensitivity (lowercase expected)

Blob download fails

  1. Check blob_name is set in message
  2. Verify storage connection string
  3. Ensure blob exists in container

Duplicate processing

  1. Check preflight logic for your document type
  2. Ensure message completion/abandonment is correct
  3. Review dead-letter queue for patterns