Workflow Operator Nodes
Operator nodes are the components in a workflow that perform specific tasks — they process data, perform calculations, call services, send notifications, and more. GeniSpace provides a rich set of predefined operator nodes and also supports custom operator nodes to meet a wide range of complex business requirements.
Operator Node Overview
Each operator node has:
- Input Data: The data source that the node processes
- Configuration Parameters: Settings that control the node's behavior
- Output Data: The results produced after processing
- Error Handling: How exceptions are handled
Data Processing Nodes
Data Transformation
Convert data from one format to another:
{
"type": "transform",
"config": {
"mappings": [
{
"source": "$.input.customer.name",
"target": "$.output.userName",
"transform": "uppercase"
},
{
"source": "$.input.order.items",
"target": "$.output.productList",
"transform": {
"type": "map",
"expression": "item => ({ id: item.productId, quantity: item.qty })"
}
}
],
"outputSchema": {
"type": "object",
"properties": {
"userName": { "type": "string" },
"productList": { "type": "array" }
}
}
}
}
Data Filtering
Filter and refine data:
{
"type": "filter",
"config": {
"source": "$.input.products",
"condition": "item => item.price > 100 && item.stock > 0",
"limit": 10,
"sort": {
"field": "popularity",
"order": "desc"
}
}
}
Data Merging
Merge multiple data sources:
{
"type": "merge",
"config": {
"sources": [
{ "name": "customerData", "path": "$.input.customer" },
{ "name": "orderData", "path": "$.input.order" },
{ "name": "productData", "path": "$.context.products" }
],
"mergeStrategy": "deep",
"conflictResolution": "lastWins"
}
}
Data Validation
Validate whether data meets requirements:
{
"type": "validate",
"config": {
"schema": {
"type": "object",
"properties": {
"email": {
"type": "string",
"format": "email"
},
"age": {
"type": "number",
"minimum": 18
}
},
"required": ["email", "age"]
},
"customValidators": [
{
"name": "domainCheck",
"expression": "data.email.endsWith('@company.com')",
"message": "Must use a company email address"
}
],
"onValidationError": "throw" // throw, continue, branch
}
}
Database Operation Nodes
Data Query
Query data from a database:
{
"type": "dbQuery",
"config": {
"connection": "main_db",
"queryType": "select",
"query": "SELECT * FROM customers WHERE status = :status AND created_at > :date",
"parameters": {
"status": "$.input.status",
"date": "$.input.startDate"
},
"pagination": {
"enabled": true,
"pageSize": 100,
"maxItems": 1000
}
}
}
Data Update
Update database records:
{
"type": "dbUpdate",
"config": {
"connection": "main_db",
"table": "orders",
"updates": {
"status": "$.input.newStatus",
"updated_at": "${ new Date().toISOString() }"
},
"condition": "id = :orderId",
"parameters": {
"orderId": "$.input.orderId"
},
"returnUpdated": true
}
}
Transaction Operations
Execute multiple database operations within a transaction:
{
"type": "dbTransaction",
"config": {
"connection": "main_db",
"operations": [
{
"type": "insert",
"table": "orders",
"data": "$.input.order",
"returnId": true,
"idField": "orderId"
},
{
"type": "insert",
"table": "order_items",
"data": "$.input.items.map(item => ({ ...item, order_id: $.context.orderId }))"
},
{
"type": "update",
"table": "inventory",
"updates": {
"quantity": "inventory.quantity - item.quantity"
},
"condition": "product_id = :productId",
"multipleUpdates": {
"source": "$.input.items",
"parameterField": "productId"
}
}
],
"isolation": "serializable"
}
}
HTTP and API Nodes
HTTP Request
Call an external API:
{
"type": "httpRequest",
"config": {
"url": "https://api.example.com/v1/orders",
"method": "POST",
"headers": {
"Content-Type": "application/json",
"Authorization": "Bearer ${$.context.apiToken}"
},
"body": "$.input.orderData",
"timeout": 5000,
"retry": {
"maxAttempts": 3,
"initialDelay": 1000,
"backoffMultiplier": 2,
"retryOn": [500, 502, 503]
},
"responseMapping": {
"orderId": "$.response.data.id",
"status": "$.response.data.status"
}
}
}
GraphQL Request
Execute a GraphQL query:
{
"type": "graphqlRequest",
"config": {
"endpoint": "https://api.example.com/graphql",
"query": `
query GetProductDetails($id: ID!) {
product(id: $id) {
id
name
price
availability {
inStock
leadTime
}
}
}
`,
"variables": {
"id": "$.input.productId"
},
"authentication": {
"type": "header",
"headerName": "Authorization",
"value": "Bearer ${$.context.apiToken}"
}
}
}
API Composition
Sequentially call multiple APIs and process the results:
{
"type": "apiComposite",
"config": {
"operations": [
{
"name": "getUserProfile",
"type": "httpRequest",
"config": {
"url": "https://api.example.com/users/${$.input.userId}",
"method": "GET"
}
},
{
"name": "getUserOrders",
"type": "httpRequest",
"dependsOn": ["getUserProfile"],
"config": {
"url": "https://api.example.com/users/${$.input.userId}/orders",
"method": "GET"
}
},
{
"name": "getProductDetails",
"type": "httpRequest",
"dependsOn": ["getUserOrders"],
"config": {
"url": "https://api.example.com/products/details",
"method": "POST",
"body": {
"productIds": "$.context.getUserOrders.data.map(order => order.productId)"
}
}
}
],
"outputMapping": {
"user": "$.context.getUserProfile.data",
"orders": "$.context.getUserOrders.data",
"products": "$.context.getProductDetails.data"
}
}
}
AI and Machine Learning Nodes
Text Analysis
Analyze text content:
{
"type": "textAnalysis",
"config": {
"input": "$.input.customerMessage",
"operations": [
{
"type": "sentiment",
"field": "sentiment"
},
{
"type": "entityExtraction",
"field": "entities",
"entityTypes": ["PERSON", "LOCATION", "ORGANIZATION"]
},
{
"type": "languageDetection",
"field": "language"
},
{
"type": "categorization",
"field": "category",
"categories": ["Complaint", "Inquiry", "Feedback", "Request"]
}
],
"modelSettings": {
"provider": "openai",
"model": "gpt-4"
}
}
}
Intelligent Summarization
Generate content summaries:
{
"type": "contentSummary",
"config": {
"content": "$.input.document",
"maxLength": 200,
"format": "bullet",
"targetLanguage": "zh-CN",
"focusAreas": ["Key Points", "Critical Data", "Action Items"],
"modelSettings": {
"provider": "geniTask",
"model": "summarizer-v2"
}
}
}
Content Generation
Generate new content:
{
"type": "contentGeneration",
"config": {
"prompt": "Create an engaging marketing description for the following product:\nProduct Name: ${$.input.product.name}\nFeatures: ${$.input.product.features.join(', ')}\nTarget Audience: ${$.input.audience}\n",
"parameters": {
"maxTokens": 500,
"temperature": 0.7,
"topP": 0.9
},
"outputFormat": "html",
"systemContext": "You are a creative marketing copywriter skilled at writing compelling product descriptions.",
"modelSettings": {
"provider": "openai",
"model": "gpt-4-turbo"
}
}
}
Image Analysis
Analyze image content:
{
"type": "imageAnalysis",
"config": {
"image": "$.input.productImage",
"operations": [
{
"type": "objectDetection",
"field": "objects",
"minConfidence": 0.7
},
{
"type": "sceneClassification",
"field": "scene"
},
{
"type": "colorAnalysis",
"field": "dominantColors",
"maxColors": 5
},
{
"type": "textExtraction",
"field": "textContent"
}
],
"modelSettings": {
"provider": "geniTask",
"model": "vision-analyzer-v1"
}
}
}
Notification and Messaging Nodes
Email
Send an email:
{
"type": "email",
"config": {
"connection": "company_smtp",
"to": ["${$.input.customer.email}"],
"cc": ["support@company.com"],
"subject": "Your Order #${$.input.order.id} Has Been Confirmed",
"template": "order_confirmation",
"templateData": {
"customer": "$.input.customer",
"order": "$.input.order",
"products": "$.context.productDetails"
},
"attachments": [
{
"name": "receipt.pdf",
"content": "$.context.receiptPdf",
"contentType": "application/pdf"
}
],
"trackOpens": true,
"priority": "high"
}
}
SMS
Send an SMS notification:
{
"type": "sms",
"config": {
"provider": "twilio",
"to": "${$.input.customer.phone}",
"message": "Hello ${$.input.customer.firstName}, your order #${$.input.order.id} has been shipped. Estimated delivery: ${$.input.shipment.estimatedDelivery}",
"messageType": "transactional"
}
}
Push Notification
Send a push notification:
{
"type": "pushNotification",
"config": {
"provider": "firebase",
"recipients": {
"type": "tokens",
"tokens": "$.input.user.deviceTokens"
},
"notification": {
"title": "Order Status Update",
"body": "Your order #${$.input.order.id} status has been updated to: ${$.input.order.status}",
"icon": "order_icon"
},
"data": {
"orderId": "$.input.order.id",
"status": "$.input.order.status",
"deepLink": "app://orders/${$.input.order.id}"
},
"priority": "high",
"ttl": 3600
}
}
Message Queue
Send a message to a message queue:
{
"type": "queueMessage",
"config": {
"provider": "rabbitmq",
"connection": "mq_main",
"queue": "order_processing",
"message": "$.input.orderData",
"properties": {
"contentType": "application/json",
"messageType": "order.created",
"priority": 5
},
"persistent": true
}
}
File Operation Nodes
File Generation
Generate a file:
{
"type": "generateFile",
"config": {
"fileType": "pdf",
"template": "invoice_template",
"data": {
"invoice": "$.input.invoice",
"company": "$.context.companyDetails",
"customer": "$.input.customer"
},
"options": {
"paperSize": "A4",
"orientation": "portrait",
"headerTemplate": "<div>Invoice #{{invoiceNumber}}</div>",
"footerTemplate": "<div>Page <span class='pageNumber'></span> of <span class='totalPages'></span></div>"
},
"output": {
"filename": "Invoice_${$.input.invoice.number}.pdf",
"saveToStorage": true,
"storagePath": "invoices/${$.input.customer.id}/"
}
}
}
File Processing
Process uploaded files:
{
"type": "processFile",
"config": {
"input": "$.input.file",
"operations": [
{
"type": "extract",
"fileType": "csv",
"options": {
"delimiter": ",",
"header": true,
"skipEmptyLines": true
},
"output": "extractedData"
},
{
"type": "transform",
"source": "$.context.extractedData",
"transformation": "item => ({
customerId: item.customer_id,
total: parseFloat(item.amount)
})",
"output": "transformedData"
},
{
"type": "validate",
"source": "$.context.transformedData",
"validation": "item => item.total > 0",
"output": "validData"
}
]
}
}
File Storage
Store and manage files:
{
"type": "fileStorage",
"config": {
"operation": "store",
"file": "$.input.document",
"storage": "aws_s3",
"path": "customers/${$.input.customerId}/documents/${$.input.documentType}/",
"filename": "${$.input.documentName}_${Date.now()}.pdf",
"metadata": {
"documentType": "$.input.documentType",
"uploadedBy": "$.context.currentUser.id",
"tags": "$.input.tags"
},
"accessControl": {
"visibility": "private",
"expiryTime": 86400
}
}
}
Integration Nodes
Third-Party Service Integration
Integrate with external services:
{
"type": "serviceIntegration",
"config": {
"service": "salesforce",
"operation": "createLead",
"authentication": {
"type": "oauth2",
"credential": "salesforce_oauth"
},
"input": {
"firstName": "$.input.customer.firstName",
"lastName": "$.input.customer.lastName",
"email": "$.input.customer.email",
"company": "$.input.customer.company",
"source": "Website Form",
"status": "New"
},
"outputMapping": {
"leadId": "$.result.id",
"success": "$.result.success"
}
}
}
Payment Processing
Process payment transactions:
{
"type": "payment",
"config": {
"provider": "stripe",
"operation": "createCharge",
"authentication": {
"secretKey": "${$.context.secrets.stripeApiKey}"
},
"input": {
"amount": "$.input.order.total * 100", // Stripe uses cents as the unit
"currency": "$.input.order.currency",
"source": "$.input.paymentMethod.id",
"description": "Order #${$.input.order.id}",
"metadata": {
"orderId": "$.input.order.id",
"customerId": "$.input.customer.id"
}
},
"onSuccess": {
"updateOrder": {
"status": "paid",
"paymentId": "$.result.id",
"paidAt": "${new Date().toISOString()}"
}
},
"onError": {
"updateOrder": {
"status": "payment_failed",
"failureReason": "$.error.message"
}
}
}
}
Custom Nodes
Custom Code
Execute custom code logic:
{
"type": "customCode",
"config": {
"runtime": "nodejs16",
"code": `
module.exports = async function(input, context) {
const { customer, order } = input;
// Custom business logic
let discountRate = 0;
if (customer.vipLevel === 'gold') {
discountRate = 0.1;
} else if (customer.vipLevel === 'platinum') {
discountRate = 0.15;
}
// Calculate discount
const originalTotal = order.items.reduce((sum, item) => sum + (item.price * item.quantity), 0);
const discount = originalTotal * discountRate;
const finalTotal = originalTotal - discount;
return {
originalTotal,
discount,
finalTotal,
discountRate,
customerLevel: customer.vipLevel
};
}
`,
"inputMapping": {
"customer": "$.input.customer",
"order": "$.input.order"
},
"timeout": 3000,
"memoryLimit": 128
}
}
External Service Functions
Call external function services:
{
"type": "function",
"config": {
"provider": "aws_lambda",
"function": "order-processor",
"input": "$.input",
"authentication": {
"type": "iam",
"role": "workflow-execution-role"
},
"timeout": 10000,
"asyncExecution": false
}
}
Error Handling and Retries
Error Handler
Handle errors during execution:
{
"type": "errorHandler",
"config": {
"strategies": [
{
"errorType": "validation",
"action": "resolve",
"value": {
"status": "invalid",
"errors": "$.error.details"
}
},
{
"errorType": "http4xx",
"action": "retry",
"maxRetries": 3,
"delay": 1000,
"backoffMultiplier": 2,
"conditions": [
{
"field": "$.error.status",
"operator": "equals",
"value": 429
}
]
},
{
"errorType": "timeout",
"action": "fallback",
"fallbackNode": "timeoutFallback"
},
{
"errorType": "*",
"action": "throw"
}
],
"onExhaustedRetries": "fallback",
"fallbackNode": "defaultFallback",
"logLevel": "warning"
}
}
Timeout Control
Control node execution time:
{
"type": "timeout",
"config": {
"node": "longRunningOperation",
"timeout": 5000,
"onTimeout": "abort", // abort, continue, fallback
"fallbackNode": "timeoutFallback"
}
}
Node Composition and Reuse
Sub-Workflows
Create reusable sub-workflows:
{
"type": "subworkflow",
"config": {
"workflowId": "payment-processor",
"version": "latest",
"input": {
"paymentMethod": "$.input.paymentMethod",
"amount": "$.input.order.total",
"currency": "$.input.order.currency",
"description": "Order #${$.input.order.id}"
},
"waitForCompletion": true,
"timeout": 30000,
"outputMapping": {
"paymentResult": "$.result"
}
}
}
Node Groups
Group multiple nodes together:
{
"type": "nodeGroup",
"config": {
"name": "orderProcessing",
"nodes": [
{
"name": "validateOrder",
"type": "validate",
"config": { /* ... */ }
},
{
"name": "calculateTax",
"type": "customCode",
"config": { /* ... */ }
},
{
"name": "processPayment",
"type": "payment",
"config": { /* ... */ }
}
],
"inputMapping": {
"order": "$.input.order",
"customer": "$.input.customer"
},
"outputMapping": {
"result": {
"orderValid": "$.context.validateOrder.valid",
"taxAmount": "$.context.calculateTax.taxAmount",
"paymentStatus": "$.context.processPayment.status"
}
}
}
}
Debugging and Monitoring
Log Node
Record debugging and audit information:
{
"type": "log",
"config": {
"level": "info",
"message": "Processing order: ${$.input.order.id} - Amount: ${$.input.order.total} ${$.input.order.currency}",
"data": {
"order": "$.input.order",
"customer": {
"id": "$.input.customer.id",
"email": "$.input.customer.email"
},
"processingTime": "$.context.metrics.processingTime"
},
"tags": ["order", "payment"]
}
}
Metrics Collection
Collect execution metrics:
{
"type": "metrics",
"config": {
"measurements": [
{
"name": "order_processing_time",
"value": "$.context.metrics.processingTime",
"unit": "milliseconds",
"dimensions": {
"orderId": "$.input.order.id",
"orderType": "$.input.order.type",
"customerId": "$.input.customer.id"
}
},
{
"name": "order_value",
"value": "$.input.order.total",
"unit": "currency",
"dimensions": {
"currency": "$.input.order.currency",
"orderType": "$.input.order.type"
}
}
],
"destination": "cloudwatch"
}
}
Best Practices
Node Naming and Organization
- Use clear, descriptive node names
- Organize related nodes into node groups
- Add descriptions and comments for complex nodes
- Follow consistent naming conventions
Node Configuration Optimization
- Process only necessary data to avoid redundant operations
- Use data mappings to reduce the amount of data transferred
- Configure appropriate timeout and retry strategies
- Add error handling for critical nodes
Security Best Practices
- Use environment variables and secret storage to manage sensitive information
- Follow the principle of least privilege when configuring integration permissions
- Validate and sanitize all user input
- Use audit logs to record sensitive operations
FAQ
Details
How do I create a custom operator node?
There are two main ways to create a custom operator node:-
Use the Custom Code Node — For simple custom logic, you can use the built-in custom code node and write JavaScript/Python code.
-
Develop a Standalone Node Component — For more complex functionality or reusable components:
- Use the GeniSpace SDK to create a custom node package
- Implement the node interface and define input/output schemas
- Package and register the node
- Publish it to your organization's node repository
Details
How do I handle high-volume data workflows?
When processing large volumes of data:- Use pagination and batch processing nodes
- Configure streaming processing instead of loading all data at once
- Use data filtering nodes to reduce the processing volume
- Consider using dedicated big data processing nodes
- For particularly large datasets, configure asynchronous execution and implement result notification mechanisms
Details
How do I optimize node performance?
Key methods for optimizing node performance:- Select only the necessary fields for processing
- For database operations, optimize queries and add appropriate indexes
- Use caching to reduce redundant computations and API calls
- Configure concurrent execution for nodes without mutual dependencies
- Use the correct data structures and algorithms
- Monitor node execution time to identify performance bottlenecks
Next Steps
- Learn how to configure Workflow Triggers
- Learn to visually design workflows with the Workflow Builder
- Explore advanced usage of Environment Variables
- See the Workflow Overview for real-world application inspiration
- Dive deeper into the Operator Overview and Custom Operator Development
- Learn how to Import Operators from OpenAPI Docs