Processor Flow is a dynamic workflow processor that leverages the Forge library to execute configurable flows. It provides the same interface as the QuestionAnswering processor but with the flexibility to run any Forge flow.
- Compatible with QuestionAnswering: Accepts the same input/output format as the QuestionAnswering processor
- Dynamic Flow Selection: Choose flows at runtime via flow name or inline JSON
- Hot-Reloadable Flows: Add new flows without redeploying by mounting JSON files
- Default QA Flow: Includes a default flow that mimics QuestionAnswering behavior
- Extensible: Easy to add new flows and customize behavior
The processor accepts:
files: List of FileMetaData (documents to process)questions: List of Questions to answerflow_name: Optional name of a flow resource to load (e.g., "qa_default")flow_json: Optional inline JSON definition of a Forge Flowflow_params: Optional generic parameters for flows
Flows are loaded from:
- Inline JSON (
flow_jsonparameter) - highest precedence - Named flows (
flow_nameparameter) - loaded fromFORGE_FLOWS_DIR - Default flow -
qa_defaultif nothing specified
Flow directory is configurable via FORGE_FLOWS_DIR environment variable (default: /opt/forge_flows).
- FlowLoader: Loads and caches Forge flows from JSON files or inline definitions
- InputMapper: Converts protobuf messages to Forge flow inputs
- OutputMapper: Converts Forge flow outputs back to QuestionAnswer protos
- ForgeActivity: Temporal activity that executes Forge flows
- TemporalForgeWorkflow: Temporal workflow for the processor
Prerequisites:
- Python 3.13+
- uv (recommended) or pip
Installation:
uv sync
source .venv/bin/activateRun unit tests:
poe testRun the processor locally:
poe serveFORGE_FLOWS_DIR: Directory containing flow JSON files (default:/opt/forge_flows)TEMPORAL_ADDRESS: Temporal server addressEXCHANGE_URL: Token exchange service URLEXCHANGE_CLIENT_ID: Token exchange client IDEXCHANGE_CLIENT_SECRET: Token exchange client secret
Place flow JSON files in the FORGE_FLOWS_DIR directory with .json extension. The flow name is the filename without extension.
Example: /opt/forge_flows/qa_default.json → flow name "qa_default"
- Create a new JSON flow definition (see
resources/forge_flows/qa_default.jsonfor example) - Mount or copy it to
FORGE_FLOWS_DIRin the container - Use the flow by setting
flow_namein the processor config
No code changes or redeployment needed!
- Helm 3+
- aws-cli 2+
aws sso login --profile <my-profile>aws ecr get-login-password \
--region eu-west-1 | helm registry login \
--username AWS \
--password-stdin 396802430222.dkr.ecr.eu-west-1.amazonaws.com/processor-forgeexport platform_environment=<platform_environment>
helm upgrade --install processor-forge oci://396802430222.dkr.ecr.eu-west-1.amazonaws.com/processor-forge \
--namespace ${platform_environment}-biolevate-apps \
--set application.environment=${platform_environment} \
--set application.name=processor-forge \
--version <helm_version>helm_version can be found in the Chart.yaml file (key=version)
export platform_environment=<platform_environment>
helm delete processor-forge --namespace ${platform_environment}-biolevate-appsThe processor is structured as follows:
src/forge/
├── __init__.py # Module exports
├── activity.py # Forge activity implementation
├── workflow.py # Temporal workflow definition
├── main.py # Worker entry point
├── flow_loader.py # Flow loading logic
└── io_mapping.py # Input/output mapping utilities
resources/
└── forge_flows/
└── qa_default.json # Default QA flow
- Temporal Runtime: Switch from LocalRuntime to TemporalRuntime for distributed execution
- Flow Validation: Add schema validation for flow definitions
- Flow Monitoring: Enhanced observability for flow execution
- Flow Versioning: Support for versioned flows