Multi-processing based worker engine for processing content through ContentFlow pipelines.
- Overview
- Architecture
- Components
- Configuration
- Installation
- Usage
- Task Types
- Workflow
- Sending Tasks to the Queue
- Monitoring
- Features
- Error Handling
- Performance Tuning
- Requirements
The ContentFlow Worker is a distributed, multi-processing engine designed to:
- Process content items through ContentFlow pipelines
- Discover content from input sources (Azure Blob Storage, etc.)
- Scale processing across multiple worker processes
- Handle task queuing, retries, and fault tolerance
The worker engine consists of two types of worker processes:
- Listen to the Azure Storage Queue for processing tasks
- Execute ContentFlow pipelines on content items (excluding already-executed input executors)
- Report execution status to Cosmos DB
- Handle task retries and error recovery
- Poll Cosmos DB for enabled pipelines associated with vaults
- Parse pipeline configuration to identify input executors
- Execute input executors to discover content from sources
- Create processing tasks for discovered content items
- Queue tasks for processing workers
contentflow-worker/
├── app/
│ ├── __init__.py
│ ├── api.py # FastAPI health and status monitoring
│ ├── engine.py # Main worker engine (manages processes)
│ ├── models.py # Task models (Pydantic)
│ ├── queue_client.py # Azure Storage Queue wrapper
│ ├── settings.py # Configuration and settings
│ ├── startup.py # Application startup
│ ├── utils.py # Utility functions
│ └── worker/
│ ├── __init__.py
│ ├── processing_worker.py # Content processing worker
│ └── source_worker.py # Input source loading worker
├── main.py # Entry point
├── Dockerfile # Docker container configuration
├── requirements.txt # Python dependencies
├── .env # Environment variables (local)
├── .env.example # Environment variables template
└── README.md # This file
Configuration is loaded from:
- Environment variables (
.envfile) - Azure App Configuration (if available)
# Azure App Configuration
AZURE_APP_CONFIG_ENDPOINT=https://your-app-config-resource.azconfig.io
APP_CONFIG_KEY_FILTERS=contentflow.worker.*
# Worker Settings
WORKER_NAME=contentflow-worker
NUM_PROCESSING_WORKERS=2
NUM_SOURCE_WORKERS=2
# Azure Storage Queue
STORAGE_ACCOUNT_WORKER_QUEUE_URL=https://your-storage-account.queue.core.windows.net
STORAGE_WORKER_QUEUE_NAME=contentflow-execution-requests
# Queue Polling Settings
QUEUE_POLL_INTERVAL_SECONDS=5
QUEUE_VISIBILITY_TIMEOUT_SECONDS=300
QUEUE_MAX_MESSAGES=32
# Processing Settings
MAX_TASK_RETRIES=3
TASK_TIMEOUT_SECONDS=600
# Cosmos DB
COSMOS_DB_ENDPOINT=https://your-cosmos.documents.azure.com:443/
COSMOS_DB_NAME=contentflow
COSMOS_DB_CONTAINER_PIPELINES=pipelines
COSMOS_DB_CONTAINER_VAULT_EXECUTIONS=vault_executions
COSMOS_DB_CONTAINER_VAULTS=vaults
COSMOS_DB_CONTAINER_LOCKS=vault_exec_locks
COSMOS_DB_CONTAINER_CRAWL_CHECKPOINTS=vault_crawl_checkpoints
# Azure Blob Storage
BLOB_STORAGE_ACCOUNT_NAME=your-storage-account
BLOB_STORAGE_CONTAINER_NAME=content
# Source Worker Settings
DEFAULT_POLLING_INTERVAL_SECONDS=300 # Default 5 minutes if executor doesn't specify
SCHEDULER_SLEEP_INTERVAL_SECONDS=5 # How often scheduler checks for ready pipelines
LOCK_TTL_SECONDS=300 # Distributed lock TTL (5 minutes)
# API Settings
API_ENABLED=true
API_HOST=0.0.0.0
API_PORT=8099
# Logging
LOG_LEVEL=DEBUG
DEBUG=truePer-Executor Polling Intervals: Configure polling intervals directly in executor settings:
executors:
- id: blob_input
type: azure_blob_input
settings:
polling_interval_seconds: 300 # Check every 5 minutes
blob_storage_account: "mystorageaccount"See .env.example for complete configuration options.
- Install dependencies:
pip install -r requirements.txt- Configure environment:
cp .env.example .env
# Edit .env with your configuration- Ensure contentflow-lib is available:
# Worker expects contentflow-lib in parent directory
# ../contentflow-libpython main.pyThe worker will:
- Start configured number of processing and source workers
- Start the FastAPI health and status monitoring service
- Connect to Azure Storage Queue
- Begin processing tasks
- Monitor worker health and restart failed workers
- Handle graceful shutdown on SIGINT/SIGTERM
When API_ENABLED=true, the following health and status endpoints are available:
- Health Check:
GET /health- Returns worker health status - Worker Status:
GET /status- Returns detailed worker process status - Metrics:
GET /metrics- Returns worker performance metrics
Example health check:
curl http://localhost:8099/healthResponse:
{
"status": "healthy",
"timestamp": "2026-01-02T10:30:00Z",
"worker_name": "contentflow-worker"
}Press Ctrl+C or send SIGTERM to gracefully stop all workers:
kill -TERM <worker-pid>Executes a pipeline on a content item.
{
"task_id": "task_abc123",
"task_type": "content_processing",
"pipeline_id": "pipeline_xyz",
"pipeline_name": "Document Processing",
"execution_id": "exec_123",
"content_id": "content_456",
"content_data": {...},
"executed_input_executor": "blob_input", # Already-executed input executor
"priority": "normal",
"max_retries": 3
}-
Continuous Scheduling Loop:
- Runs continuously, checking pipeline schedule every 5 seconds (configurable via
SCHEDULER_SLEEP_INTERVAL_SECONDS) - Maintains next execution time for each pipeline based on per-executor polling intervals
- Runs continuously, checking pipeline schedule every 5 seconds (configurable via
-
Pipeline Discovery:
- Queries Cosmos DB for enabled pipelines with associated vaults
- Extracts
polling_interval_secondsfrom input executor settings (default: 300s) - Schedules pipelines for execution at appropriate intervals
-
Distributed Locking:
- Before executing a pipeline, attempts to acquire a distributed lock in Cosmos DB
- Prevents multiple workers from processing the same pipeline simultaneously
- Locks auto-expire after 5 minutes (configurable via
LOCK_TTL_SECONDS)
-
Content Discovery:
- Parse pipeline YAML to find input executor
- Execute input executor to discover content
- Create ContentProcessingTask for each discovered content item
- Mark the input executor as already executed (
executed_input_executorfield) - Send tasks to queue
-
Schedule Update:
- After successful execution, update next execution time
- Release distributed lock
- Pipeline will execute again after polling interval expires
Example: Pipeline with 1-hour polling
executors:
- id: blob_input
type: azure_blob_input
settings:
polling_interval_seconds: 3600 # Check every hour
blob_storage_account: "mystorageaccount"
blob_container_name: "documents"- Poll queue for ContentProcessingTask messages
- For each task:
- Load pipeline configuration
- Exclude the already-executed input executor
- Execute remaining pipeline on content
- Update execution status
- Delete message on success
You can send tasks from the API or other services:
from contentflow_worker import TaskQueueClient, ContentProcessingTask
# Initialize queue client
queue_client = TaskQueueClient(
queue_url="https://your-storage.queue.core.windows.net",
queue_name="contentflow-execution-requests"
)
# Create task
task = ContentProcessingTask(
task_id="task_123",
pipeline_id="pipeline_xyz",
pipeline_name="My Pipeline",
execution_id="exec_456",
content_id="content_789",
content_data={"text": "Sample content"}
)
# Send to queue
queue_client.send_content_processing_task(task)When the API is enabled, you can monitor worker status via HTTP:
# Check health
curl http://localhost:8099/health
# Get detailed status
curl http://localhost:8099/statusThe engine provides status information:
from contentflow_worker import WorkerEngine
engine = WorkerEngine()
status = engine.get_status()
print(status)Output:
{
"running": true,
"processing_workers": {
"configured": 2,
"active": 2,
"workers": [
{"id": 0, "pid": 12345, "alive": true},
{"id": 1, "pid": 12346, "alive": true}
]
},
"source_workers": {
"configured": 2,
"active": 2,
"workers": [
{"id": 0, "pid": 12347, "alive": true},
{"id": 1, "pid": 12348, "alive": true}
]
}
}Workers log to:
- Console (stdout)
- File (configured via LOG_LEVEL)
✅ Multi-processing parallelism
✅ Azure Storage Queue integration
✅ Automatic worker restart on failure
✅ Task retry handling
✅ Graceful shutdown
✅ Configurable worker counts
✅ Pipeline execution tracking
✅ Content source discovery
✅ Cosmos DB integration
- Task Failures: Tasks are retried up to
max_retriestimes - Worker Crashes: Engine automatically restarts crashed workers
- Queue Visibility: Failed tasks become visible again after timeout
- Execution Tracking: All execution status updates are recorded in Cosmos DB
Adjust these settings for your workload:
# More workers = higher throughput
NUM_PROCESSING_WORKERS=4
NUM_SOURCE_WORKERS=2
# Faster polling = lower latency
QUEUE_POLL_INTERVAL_SECONDS=2
# More messages per poll = higher throughput
QUEUE_MAX_MESSAGES=32
# Longer timeout = support longer-running pipelines
TASK_TIMEOUT_SECONDS=1200
# Queue visibility timeout for retries
QUEUE_VISIBILITY_TIMEOUT_SECONDS=300- Throughput: Increase
NUM_PROCESSING_WORKERSandQUEUE_MAX_MESSAGES - Latency: Decrease
QUEUE_POLL_INTERVAL_SECONDSandSCHEDULER_SLEEP_INTERVAL_SECONDS - Memory: Reduce worker counts to limit concurrent task processing
- Reliability: Increase
MAX_TASK_RETRIESandTASK_TIMEOUT_SECONDSfor complex pipelines
- Python 3.12+
- Azure Storage Account (Queue)
- Azure Cosmos DB
- contentflow-lib package
- Azure credentials (DefaultAzureCredential)