Workflow Context Manager
Overviewβ
The Workflow Context Manager (ContextManager) is one of the core components of the workflow engine, responsible for unified management of context state during workflow execution. It provides complete context lifecycle management, including environment variable resolution, sub-workflow support, node result tracking, data inheritance, and more.
Core Featuresβ
- π§ Unified Context Management: Centralized management of all state information during workflow execution
- π Environment Variable Support: Automatic resolution and substitution of environment variable references (
{{varName}}format) - π Sub-Workflow Support: Complete multi-level workflow nesting and data inheritance mechanism
- π Node State Tracking: Real-time tracking of node execution status and results
- 𧬠Data Inheritance: Support for data passing and isolation between parent and child workflows
- β‘ High Performance: Optimized data structures ensure efficient execution of large-scale workflows
Context Structureβ
Basic Context Structureβ
{
// Environment variables
env: {
API_URL: "https://api.example.com",
DATABASE_URL: "mongodb://localhost:27017",
DEBUG_MODE: "true"
},
// Execution information
execution: {
teamId: "team_123",
userId: "user_456",
workflowId: "workflow_789",
executionId: "exec_abc"
},
// Team and user information
teamId: "team_123",
userId: "user_456",
// Node execution results
"node-1": { output: "result1" },
"node-2": { output: "result2" },
// Internal state management
_processedNodes: Set, // Set of processed nodes
_isSubflow: false, // Whether this is a sub-workflow
_parentContext: null, // Reference to parent context
_level: 0 // Nesting level
}
Special Property Referenceβ
| Property | Type | Description |
|---|---|---|
_processedNodes | Set | Set of processed nodes, used to prevent duplicate processing |
_isSubflow | boolean | Identifies whether the current context belongs to a sub-workflow |
_parentContext | Object | Reference to the parent context, used for data inheritance |
_level | number | Nesting level β 0 for the root workflow, incrementing for sub-workflows |
API Referenceβ
Constructorβ
new ContextManager(options)β
Creates a new context manager instance.
Parameters:
{
execution: { // β
Required - Execution object
teamId: string, // Team ID
userId: string, // User ID
workflowId: string, // Workflow ID
executionId: string // Execution ID
},
envVars: Object, // β Optional - Environment variable dictionary
parentContext: Object // β Optional - Parent context (used for sub-workflows)
}
Example:
const contextManager = new ContextManager({
execution: {
teamId: 'team_123',
userId: 'user_456',
workflowId: 'workflow_789',
executionId: 'exec_abc'
},
envVars: {
API_URL: 'https://api.example.com',
DATABASE_URL: 'mongodb://localhost:27017'
}
});
Core Methodsβ
createExecutionContext(initialContext)β
Creates a complete execution context by merging the base context, inherited parent context data, and initial input data.
Parameters:
initialContext(Object): Initial context data
Returns:
- (Object): The complete execution context
Example:
const executionContext = contextManager.createExecutionContext({
'input-node': { data: 'initial data' },
'config-node': { settings: { timeout: 30 } }
});
updateNodeResult(nodeId, result)β
Updates the execution result of a specified node and marks it as processed.
Parameters:
nodeId(string): Node identifierresult(any): Node execution result
Example:
contextManager.updateNodeResult('data-processor', {
processedData: 'transformed data',
status: 'success'
});
getNodeResult(nodeId)β
Retrieves the execution result of a specified node.
Parameters:
nodeId(string): Node identifier
Returns:
- (any): The node execution result, or undefined if the node has not been executed
Example:
const result = contextManager.getNodeResult('data-processor');
if (result) {
console.log('Node result:', result);
}
isNodeProcessed(nodeId)β
Checks whether a specified node has already been processed.
Parameters:
nodeId(string): Node identifier
Returns:
- (boolean): Returns true if the node has been processed, false otherwise
Example:
if (!contextManager.isNodeProcessed('data-processor')) {
// Node has not been processed yet, safe to execute
await executeNode('data-processor');
}
Environment Variable Managementβ
Environment Variable Syntaxβ
The context manager supports referencing environment variables in any string value using the {{variableName}} syntax.
Supported formats:
// Simple variable substitution
"{{API_URL}}/users"
// Used within complex strings
"Connecting to database: {{DATABASE_URL}}"
// Used within configuration objects
{
url: "{{API_URL}}",
timeout: "{{REQUEST_TIMEOUT}}"
}
resolveEnvironmentVariables(input)β
Recursively resolves all environment variable references in the input data.
Parameters:
input(any): The input data to resolve
Returns:
- (any): The resolved data
Supported data types:
- β Strings: Directly substitutes variable references
- β Arrays: Recursively processes each element
- β Objects: Recursively processes all property values
- β Other types: Returns the original value as-is
Example:
// Set up environment variables
const contextManager = new ContextManager({
execution: { /* ... */ },
envVars: {
API_URL: 'https://api.example.com',
DATABASE_NAME: 'production_db',
TIMEOUT: '5000'
}
});
// Resolve configuration containing environment variables
const config = {
endpoint: "{{API_URL}}/data",
database: {
name: "{{DATABASE_NAME}}",
timeout: "{{TIMEOUT}}"
},
urls: [
"{{API_URL}}/users",
"{{API_URL}}/orders"
]
};
const resolved = contextManager.resolveEnvironmentVariables(config);
console.log(resolved);
// Output:
// {
// endpoint: "https://api.example.com/data",
// database: {
// name: "production_db",
// timeout: "5000"
// },
// urls: [
// "https://api.example.com/users",
// "https://api.example.com/orders"
// ]
// }
Sub-Workflow Supportβ
Multi-Level Workflow Architectureβ
Root Workflow (Level 0)
βββ Node A
βββ Control Node B (triggers sub-workflow)
β βββ Sub-Workflow (Level 1)
β βββ Sub-Node C
β βββ Sub-Control Node D (triggers sub-sub-workflow)
β β βββ Sub-Sub-Workflow (Level 2)
β β βββ Sub-Sub-Node E
β βββ Sub-Node F
βββ Node G
Data Inheritance Mechanismβ
inheritFromParent()β
Inherits data from the parent context, automatically filtering out internal state properties.
Inheritance rules:
- β Inherits all node execution results
- β Inherits user-defined data
- β Does not inherit internal properties starting with
_ - β Does not inherit
envandexecutionobjects
Example:
// Parent workflow context
const parentContext = {
'node-1': { output: 'parent result' },
'node-2': { data: 'shared data' },
customData: { shared: true },
_processedNodes: new Set(['node-1', 'node-2']),
env: { /* environment variables */ },
execution: { /* execution info */ }
};
// Create sub-workflow context manager
const childContextManager = new ContextManager({
execution: { /* sub-workflow execution info */ },
parentContext: parentContext
});
// The sub-workflow will inherit:
// - 'node-1': { output: 'parent result' }
// - 'node-2': { data: 'shared data' }
// - customData: { shared: true }
mergeSubflowResults(subflowResult, controlNodeId)β
Merges sub-workflow execution results back into the parent workflow context.
Parameters:
subflowResult(Object): Sub-workflow execution resultcontrolNodeId(string): ID of the control node that triggered the sub-workflow
Merge rules:
- β Merges all node execution results
- β Merges the processed nodes set
- β Does not merge internal state properties
Example:
// Sub-workflow execution result
const subflowResult = {
'sub-node-1': { output: 'sub result 1' },
'sub-node-2': { output: 'sub result 2' },
_processedNodes: new Set(['sub-node-1', 'sub-node-2']),
_level: 1,
_isSubflow: true
};
// Merge into parent workflow
contextManager.mergeSubflowResults(subflowResult, 'if-else-control');
// The parent workflow context now contains:
// - Original node results
// - 'sub-node-1': { output: 'sub result 1' }
// - 'sub-node-2': { output: 'sub result 2' }
// - Updated _processedNodes set
Node State Managementβ
Node Lifecycleβ
Pending β Executing β Completed
β β β
Not recorded Processing Result recorded
State Tracking Best Practicesβ
// 1. Check if the node has been processed (avoid duplicate execution)
if (contextManager.isNodeProcessed('data-processor')) {
console.log('Node already processed, skipping execution');
return contextManager.getNodeResult('data-processor');
}
// 2. Execute the node logic
const result = await executeNodeLogic();
// 3. Update the node result (automatically marks it as processed)
contextManager.updateNodeResult('data-processor', result);
// 4. Subsequent nodes can retrieve the result
const processedData = contextManager.getNodeResult('data-processor');
Utility Methodsβ
getFinalOutputs(nodes)β
Extracts the final output results of the workflow.
Parameters:
nodes(Object): Workflow node definitions
Returns:
- (Object): An object containing the results of all executed nodes
Example:
const nodes = {
'input-processor': { /* node definition */ },
'data-transformer': { /* node definition */ },
'output-formatter': { /* node definition */ }
};
const outputs = contextManager.getFinalOutputs(nodes);
// Example output:
// {
// 'input-processor': { processedInput: '...' },
// 'data-transformer': { transformedData: '...' },
// 'output-formatter': { formattedOutput: '...' }
// }
getStatus()β
Gets the current status information of the context manager.
Returns:
{
level: number, // Nesting level
isSubflow: boolean, // Whether this is a sub-workflow
processedNodesCount: number, // Number of processed nodes
envVarsCount: number, // Number of environment variables
hasParent: boolean, // Whether it has a parent context
contextKeysCount: number // Number of context keys
}
Example:
const status = contextManager.getStatus();
console.log('Context status:', status);
// Output:
// {
// level: 0,
// isSubflow: false,
// processedNodesCount: 3,
// envVarsCount: 5,
// hasParent: false,
// contextKeysCount: 12
// }
Complete Usage Examplesβ
Example 1: Basic Workflow Executionβ
const ContextManager = require('./src/core/ContextManager');
// 1. Create the context manager
const contextManager = new ContextManager({
execution: {
teamId: 'team_123',
userId: 'user_456',
workflowId: 'data-processing-workflow',
executionId: 'exec_789'
},
envVars: {
API_URL: 'https://api.example.com',
DATABASE_URL: 'mongodb://localhost:27017',
TIMEOUT: '5000'
}
});
// 2. Create the execution context
const context = contextManager.createExecutionContext({
'input-node': {
data: 'raw data to process'
}
});
// 3. Simulate node execution
async function executeWorkflow() {
// Execute the data processing node
if (!contextManager.isNodeProcessed('data-processor')) {
const inputData = contextManager.getNodeResult('input-node');
const processedData = await processData(inputData);
contextManager.updateNodeResult('data-processor', processedData);
}
// Execute the data transformation node
if (!contextManager.isNodeProcessed('data-transformer')) {
const processedData = contextManager.getNodeResult('data-processor');
const transformedData = await transformData(processedData);
contextManager.updateNodeResult('data-transformer', transformedData);
}
// Get the final results
const finalOutputs = contextManager.getFinalOutputs({
'input-node': {},
'data-processor': {},
'data-transformer': {}
});
return finalOutputs;
}
Example 2: Environment Variable Usageβ
// Node configuration containing environment variable references
const nodeConfig = {
apiEndpoint: "{{API_URL}}/data",
database: {
connectionString: "{{DATABASE_URL}}",
timeout: "{{TIMEOUT}}"
},
headers: {
authorization: "Bearer {{API_TOKEN}}"
}
};
// Resolve environment variables
const resolvedConfig = contextManager.resolveEnvironmentVariables(nodeConfig);
// resolvedConfig will contain the actual environment variable values
console.log(resolvedConfig);
// {
// apiEndpoint: "https://api.example.com/data",
// database: {
// connectionString: "mongodb://localhost:27017",
// timeout: "5000"
// },
// headers: {
// authorization: "Bearer your-actual-token"
// }
// }
Example 3: Sub-Workflow Handlingβ
// Parent workflow context manager
const parentContextManager = new ContextManager({
execution: { /* parent workflow execution info */ },
envVars: { /* environment variables */ }
});
// Parent workflow executes some nodes
parentContextManager.updateNodeResult('parent-node-1', { data: 'parent result' });
// Create sub-workflow context manager
const childContextManager = new ContextManager({
execution: { /* sub-workflow execution info */ },
parentContext: parentContextManager.getContext()
});
// Sub-workflow can access parent workflow results
const childContext = childContextManager.createExecutionContext();
console.log(childContext['parent-node-1']); // { data: 'parent result' }
// Sub-workflow execution
childContextManager.updateNodeResult('child-node-1', { data: 'child result' });
// Merge sub-workflow results back into parent workflow
const childResult = childContextManager.getContext();
parentContextManager.mergeSubflowResults(childResult, 'control-node');
// Parent workflow can now access sub-workflow results
console.log(parentContextManager.getNodeResult('child-node-1')); // { data: 'child result' }
Performance Optimizationβ
Large-Scale Workflow Optimizationβ
1. Node State Check Optimizationβ
// β
Recommended: Use Set for fast lookups
if (contextManager.isNodeProcessed(nodeId)) {
return contextManager.getNodeResult(nodeId);
}
// β Avoid: Repeatedly fetching results for determination
const result = contextManager.getNodeResult(nodeId);
if (result !== undefined) {
return result;
}
2. Environment Variable Resolution Optimizationβ
// β
Recommended: Batch resolve configurations
const resolvedConfigs = contextManager.resolveEnvironmentVariables(allNodeConfigs);
// β Avoid: Resolving one by one
Object.keys(nodeConfigs).forEach(nodeId => {
nodeConfigs[nodeId] = contextManager.resolveEnvironmentVariables(nodeConfigs[nodeId]);
});
3. Memory Managementβ
// For long-running workflows, periodically clean up unneeded node results
function cleanupOldResults(contextManager, keepNodeIds) {
const context = contextManager.getContext();
Object.keys(context).forEach(key => {
if (!key.startsWith('_') && !keepNodeIds.includes(key)) {
delete context[key];
}
});
}
Debugging and Monitoringβ
Enabling Verbose Loggingβ
// Enable debug mode via environment variable
process.env.LOG_LEVEL = 'debug';
const contextManager = new ContextManager({
// ... configuration
});
// The context manager will output detailed execution logs
// Including: node result updates, environment variable resolution, sub-workflow merging, etc.
Status Monitoringβ
// Periodically check context status
setInterval(() => {
const status = contextManager.getStatus();
console.log('Context status monitor:', {
processedNodes: status.processedNodesCount,
level: status.level,
memoryUsage: process.memoryUsage()
});
}, 5000);
Common Troubleshootingβ
1. Environment Variables Not Resolvedβ
Symptom: Output still contains strings in the {{variableName}} format
Troubleshooting steps:
// Check if environment variables are correctly set
console.log('Environment variables:', contextManager.getContext().env);
// Check if variable names match
const testResolve = contextManager.resolveEnvironmentVariables("{{API_URL}}");
console.log('Resolution result:', testResolve);
2. Sub-Workflow Data Not Inheritedβ
Symptom: Sub-workflow cannot access parent workflow node results
Troubleshooting steps:
// Check if parent context was correctly passed
console.log('Is sub-workflow:', contextManager.getContext()._isSubflow);
console.log('Parent context:', contextManager.getContext()._parentContext);
// Check inherited data
const inherited = contextManager.inheritFromParent();
console.log('Inherited data:', inherited);
3. Duplicate Node Executionβ
Symptom: The same node is executed multiple times
Troubleshooting steps:
// Check status before node execution
console.log('Node processed:', contextManager.isNodeProcessed(nodeId));
console.log('Processed nodes list:', Array.from(contextManager.getContext()._processedNodes));
Best Practicesβ
1. Context Design Principlesβ
- Single Responsibility: Each context manager handles only one workflow instance
- State Isolation: Contexts between different workflow instances are fully isolated
- Data Minimization: Store only necessary data in the context to avoid memory leaks
2. Environment Variable Managementβ
// β
Recommended: Centralized environment variable management
const envVars = {
// API configuration
API_URL: process.env.API_URL || 'https://api.default.com',
API_TOKEN: process.env.API_TOKEN,
// Database configuration
DATABASE_URL: process.env.DATABASE_URL,
DATABASE_TIMEOUT: process.env.DATABASE_TIMEOUT || '5000',
// Feature flags
ENABLE_CACHE: process.env.ENABLE_CACHE || 'false',
DEBUG_MODE: process.env.DEBUG_MODE || 'false'
};
const contextManager = new ContextManager({
execution: executionInfo,
envVars: envVars
});
3. Error Handlingβ
// Node execution error handling
try {
const result = await executeNode(nodeId, inputs);
contextManager.updateNodeResult(nodeId, result);
} catch (error) {
// Record error state
contextManager.updateNodeResult(nodeId, {
error: error.message,
status: 'failed',
timestamp: new Date().toISOString()
});
throw error;
}
4. Resource Cleanupβ
// Clean up resources after workflow execution completes
function cleanupContext(contextManager) {
const context = contextManager.getContext();
// Clean up large object references
Object.keys(context).forEach(key => {
if (typeof context[key] === 'object' && context[key] !== null) {
if (context[key].largeData) {
delete context[key].largeData;
}
}
});
}
Related Documentationβ
- Workflow Overview β Workflow engine fundamentals
- Environment Variables β Workflow environment variable configuration
- If-Else Condition Node β Using context in conditional branches
- ForEach Loop Node β Context passing in loops