Skip to main content

Workflow Operator Nodes

Operator nodes are the components in a workflow that perform specific tasks — they process data, perform calculations, call services, send notifications, and more. GeniSpace provides a rich set of predefined operator nodes and also supports custom operator nodes to meet a wide range of complex business requirements.

Operator Node Overview

Each operator node has:

  • Input Data: The data source that the node processes
  • Configuration Parameters: Settings that control the node's behavior
  • Output Data: The results produced after processing
  • Error Handling: How exceptions are handled

Data Processing Nodes

Data Transformation

Convert data from one format to another:

{
"type": "transform",
"config": {
"mappings": [
{
"source": "$.input.customer.name",
"target": "$.output.userName",
"transform": "uppercase"
},
{
"source": "$.input.order.items",
"target": "$.output.productList",
"transform": {
"type": "map",
"expression": "item => ({ id: item.productId, quantity: item.qty })"
}
}
],
"outputSchema": {
"type": "object",
"properties": {
"userName": { "type": "string" },
"productList": { "type": "array" }
}
}
}
}

Data Filtering

Filter and refine data:

{
"type": "filter",
"config": {
"source": "$.input.products",
"condition": "item => item.price > 100 && item.stock > 0",
"limit": 10,
"sort": {
"field": "popularity",
"order": "desc"
}
}
}

Data Merging

Merge multiple data sources:

{
"type": "merge",
"config": {
"sources": [
{ "name": "customerData", "path": "$.input.customer" },
{ "name": "orderData", "path": "$.input.order" },
{ "name": "productData", "path": "$.context.products" }
],
"mergeStrategy": "deep",
"conflictResolution": "lastWins"
}
}

Data Validation

Validate whether data meets requirements:

{
"type": "validate",
"config": {
"schema": {
"type": "object",
"properties": {
"email": {
"type": "string",
"format": "email"
},
"age": {
"type": "number",
"minimum": 18
}
},
"required": ["email", "age"]
},
"customValidators": [
{
"name": "domainCheck",
"expression": "data.email.endsWith('@company.com')",
"message": "Must use a company email address"
}
],
"onValidationError": "throw" // throw, continue, branch
}
}

Database Operation Nodes

Data Query

Query data from a database:

{
"type": "dbQuery",
"config": {
"connection": "main_db",
"queryType": "select",
"query": "SELECT * FROM customers WHERE status = :status AND created_at > :date",
"parameters": {
"status": "$.input.status",
"date": "$.input.startDate"
},
"pagination": {
"enabled": true,
"pageSize": 100,
"maxItems": 1000
}
}
}

Data Update

Update database records:

{
"type": "dbUpdate",
"config": {
"connection": "main_db",
"table": "orders",
"updates": {
"status": "$.input.newStatus",
"updated_at": "${ new Date().toISOString() }"
},
"condition": "id = :orderId",
"parameters": {
"orderId": "$.input.orderId"
},
"returnUpdated": true
}
}

Transaction Operations

Execute multiple database operations within a transaction:

{
"type": "dbTransaction",
"config": {
"connection": "main_db",
"operations": [
{
"type": "insert",
"table": "orders",
"data": "$.input.order",
"returnId": true,
"idField": "orderId"
},
{
"type": "insert",
"table": "order_items",
"data": "$.input.items.map(item => ({ ...item, order_id: $.context.orderId }))"
},
{
"type": "update",
"table": "inventory",
"updates": {
"quantity": "inventory.quantity - item.quantity"
},
"condition": "product_id = :productId",
"multipleUpdates": {
"source": "$.input.items",
"parameterField": "productId"
}
}
],
"isolation": "serializable"
}
}

HTTP and API Nodes

HTTP Request

Call an external API:

{
"type": "httpRequest",
"config": {
"url": "https://api.example.com/v1/orders",
"method": "POST",
"headers": {
"Content-Type": "application/json",
"Authorization": "Bearer ${$.context.apiToken}"
},
"body": "$.input.orderData",
"timeout": 5000,
"retry": {
"maxAttempts": 3,
"initialDelay": 1000,
"backoffMultiplier": 2,
"retryOn": [500, 502, 503]
},
"responseMapping": {
"orderId": "$.response.data.id",
"status": "$.response.data.status"
}
}
}

GraphQL Request

Execute a GraphQL query:

{
"type": "graphqlRequest",
"config": {
"endpoint": "https://api.example.com/graphql",
"query": `
query GetProductDetails($id: ID!) {
product(id: $id) {
id
name
price
availability {
inStock
leadTime
}
}
}
`,
"variables": {
"id": "$.input.productId"
},
"authentication": {
"type": "header",
"headerName": "Authorization",
"value": "Bearer ${$.context.apiToken}"
}
}
}

API Composition

Sequentially call multiple APIs and process the results:

{
"type": "apiComposite",
"config": {
"operations": [
{
"name": "getUserProfile",
"type": "httpRequest",
"config": {
"url": "https://api.example.com/users/${$.input.userId}",
"method": "GET"
}
},
{
"name": "getUserOrders",
"type": "httpRequest",
"dependsOn": ["getUserProfile"],
"config": {
"url": "https://api.example.com/users/${$.input.userId}/orders",
"method": "GET"
}
},
{
"name": "getProductDetails",
"type": "httpRequest",
"dependsOn": ["getUserOrders"],
"config": {
"url": "https://api.example.com/products/details",
"method": "POST",
"body": {
"productIds": "$.context.getUserOrders.data.map(order => order.productId)"
}
}
}
],
"outputMapping": {
"user": "$.context.getUserProfile.data",
"orders": "$.context.getUserOrders.data",
"products": "$.context.getProductDetails.data"
}
}
}

AI and Machine Learning Nodes

Text Analysis

Analyze text content:

{
"type": "textAnalysis",
"config": {
"input": "$.input.customerMessage",
"operations": [
{
"type": "sentiment",
"field": "sentiment"
},
{
"type": "entityExtraction",
"field": "entities",
"entityTypes": ["PERSON", "LOCATION", "ORGANIZATION"]
},
{
"type": "languageDetection",
"field": "language"
},
{
"type": "categorization",
"field": "category",
"categories": ["Complaint", "Inquiry", "Feedback", "Request"]
}
],
"modelSettings": {
"provider": "openai",
"model": "gpt-4"
}
}
}

Intelligent Summarization

Generate content summaries:

{
"type": "contentSummary",
"config": {
"content": "$.input.document",
"maxLength": 200,
"format": "bullet",
"targetLanguage": "zh-CN",
"focusAreas": ["Key Points", "Critical Data", "Action Items"],
"modelSettings": {
"provider": "geniTask",
"model": "summarizer-v2"
}
}
}

Content Generation

Generate new content:

{
"type": "contentGeneration",
"config": {
"prompt": "Create an engaging marketing description for the following product:\nProduct Name: ${$.input.product.name}\nFeatures: ${$.input.product.features.join(', ')}\nTarget Audience: ${$.input.audience}\n",
"parameters": {
"maxTokens": 500,
"temperature": 0.7,
"topP": 0.9
},
"outputFormat": "html",
"systemContext": "You are a creative marketing copywriter skilled at writing compelling product descriptions.",
"modelSettings": {
"provider": "openai",
"model": "gpt-4-turbo"
}
}
}

Image Analysis

Analyze image content:

{
"type": "imageAnalysis",
"config": {
"image": "$.input.productImage",
"operations": [
{
"type": "objectDetection",
"field": "objects",
"minConfidence": 0.7
},
{
"type": "sceneClassification",
"field": "scene"
},
{
"type": "colorAnalysis",
"field": "dominantColors",
"maxColors": 5
},
{
"type": "textExtraction",
"field": "textContent"
}
],
"modelSettings": {
"provider": "geniTask",
"model": "vision-analyzer-v1"
}
}
}

Notification and Messaging Nodes

Email

Send an email:

{
"type": "email",
"config": {
"connection": "company_smtp",
"to": ["${$.input.customer.email}"],
"cc": ["support@company.com"],
"subject": "Your Order #${$.input.order.id} Has Been Confirmed",
"template": "order_confirmation",
"templateData": {
"customer": "$.input.customer",
"order": "$.input.order",
"products": "$.context.productDetails"
},
"attachments": [
{
"name": "receipt.pdf",
"content": "$.context.receiptPdf",
"contentType": "application/pdf"
}
],
"trackOpens": true,
"priority": "high"
}
}

SMS

Send an SMS notification:

{
"type": "sms",
"config": {
"provider": "twilio",
"to": "${$.input.customer.phone}",
"message": "Hello ${$.input.customer.firstName}, your order #${$.input.order.id} has been shipped. Estimated delivery: ${$.input.shipment.estimatedDelivery}",
"messageType": "transactional"
}
}

Push Notification

Send a push notification:

{
"type": "pushNotification",
"config": {
"provider": "firebase",
"recipients": {
"type": "tokens",
"tokens": "$.input.user.deviceTokens"
},
"notification": {
"title": "Order Status Update",
"body": "Your order #${$.input.order.id} status has been updated to: ${$.input.order.status}",
"icon": "order_icon"
},
"data": {
"orderId": "$.input.order.id",
"status": "$.input.order.status",
"deepLink": "app://orders/${$.input.order.id}"
},
"priority": "high",
"ttl": 3600
}
}

Message Queue

Send a message to a message queue:

{
"type": "queueMessage",
"config": {
"provider": "rabbitmq",
"connection": "mq_main",
"queue": "order_processing",
"message": "$.input.orderData",
"properties": {
"contentType": "application/json",
"messageType": "order.created",
"priority": 5
},
"persistent": true
}
}

File Operation Nodes

File Generation

Generate a file:

{
"type": "generateFile",
"config": {
"fileType": "pdf",
"template": "invoice_template",
"data": {
"invoice": "$.input.invoice",
"company": "$.context.companyDetails",
"customer": "$.input.customer"
},
"options": {
"paperSize": "A4",
"orientation": "portrait",
"headerTemplate": "<div>Invoice #{{invoiceNumber}}</div>",
"footerTemplate": "<div>Page <span class='pageNumber'></span> of <span class='totalPages'></span></div>"
},
"output": {
"filename": "Invoice_${$.input.invoice.number}.pdf",
"saveToStorage": true,
"storagePath": "invoices/${$.input.customer.id}/"
}
}
}

File Processing

Process uploaded files:

{
"type": "processFile",
"config": {
"input": "$.input.file",
"operations": [
{
"type": "extract",
"fileType": "csv",
"options": {
"delimiter": ",",
"header": true,
"skipEmptyLines": true
},
"output": "extractedData"
},
{
"type": "transform",
"source": "$.context.extractedData",
"transformation": "item => ({
customerId: item.customer_id,
total: parseFloat(item.amount)
})",
"output": "transformedData"
},
{
"type": "validate",
"source": "$.context.transformedData",
"validation": "item => item.total > 0",
"output": "validData"
}
]
}
}

File Storage

Store and manage files:

{
"type": "fileStorage",
"config": {
"operation": "store",
"file": "$.input.document",
"storage": "aws_s3",
"path": "customers/${$.input.customerId}/documents/${$.input.documentType}/",
"filename": "${$.input.documentName}_${Date.now()}.pdf",
"metadata": {
"documentType": "$.input.documentType",
"uploadedBy": "$.context.currentUser.id",
"tags": "$.input.tags"
},
"accessControl": {
"visibility": "private",
"expiryTime": 86400
}
}
}

Integration Nodes

Third-Party Service Integration

Integrate with external services:

{
"type": "serviceIntegration",
"config": {
"service": "salesforce",
"operation": "createLead",
"authentication": {
"type": "oauth2",
"credential": "salesforce_oauth"
},
"input": {
"firstName": "$.input.customer.firstName",
"lastName": "$.input.customer.lastName",
"email": "$.input.customer.email",
"company": "$.input.customer.company",
"source": "Website Form",
"status": "New"
},
"outputMapping": {
"leadId": "$.result.id",
"success": "$.result.success"
}
}
}

Payment Processing

Process payment transactions:

{
"type": "payment",
"config": {
"provider": "stripe",
"operation": "createCharge",
"authentication": {
"secretKey": "${$.context.secrets.stripeApiKey}"
},
"input": {
"amount": "$.input.order.total * 100", // Stripe uses cents as the unit
"currency": "$.input.order.currency",
"source": "$.input.paymentMethod.id",
"description": "Order #${$.input.order.id}",
"metadata": {
"orderId": "$.input.order.id",
"customerId": "$.input.customer.id"
}
},
"onSuccess": {
"updateOrder": {
"status": "paid",
"paymentId": "$.result.id",
"paidAt": "${new Date().toISOString()}"
}
},
"onError": {
"updateOrder": {
"status": "payment_failed",
"failureReason": "$.error.message"
}
}
}
}

Custom Nodes

Custom Code

Execute custom code logic:

{
"type": "customCode",
"config": {
"runtime": "nodejs16",
"code": `
module.exports = async function(input, context) {
const { customer, order } = input;

// Custom business logic
let discountRate = 0;

if (customer.vipLevel === 'gold') {
discountRate = 0.1;
} else if (customer.vipLevel === 'platinum') {
discountRate = 0.15;
}

// Calculate discount
const originalTotal = order.items.reduce((sum, item) => sum + (item.price * item.quantity), 0);
const discount = originalTotal * discountRate;
const finalTotal = originalTotal - discount;

return {
originalTotal,
discount,
finalTotal,
discountRate,
customerLevel: customer.vipLevel
};
}
`,
"inputMapping": {
"customer": "$.input.customer",
"order": "$.input.order"
},
"timeout": 3000,
"memoryLimit": 128
}
}

External Service Functions

Call external function services:

{
"type": "function",
"config": {
"provider": "aws_lambda",
"function": "order-processor",
"input": "$.input",
"authentication": {
"type": "iam",
"role": "workflow-execution-role"
},
"timeout": 10000,
"asyncExecution": false
}
}

Error Handling and Retries

Error Handler

Handle errors during execution:

{
"type": "errorHandler",
"config": {
"strategies": [
{
"errorType": "validation",
"action": "resolve",
"value": {
"status": "invalid",
"errors": "$.error.details"
}
},
{
"errorType": "http4xx",
"action": "retry",
"maxRetries": 3,
"delay": 1000,
"backoffMultiplier": 2,
"conditions": [
{
"field": "$.error.status",
"operator": "equals",
"value": 429
}
]
},
{
"errorType": "timeout",
"action": "fallback",
"fallbackNode": "timeoutFallback"
},
{
"errorType": "*",
"action": "throw"
}
],
"onExhaustedRetries": "fallback",
"fallbackNode": "defaultFallback",
"logLevel": "warning"
}
}

Timeout Control

Control node execution time:

{
"type": "timeout",
"config": {
"node": "longRunningOperation",
"timeout": 5000,
"onTimeout": "abort", // abort, continue, fallback
"fallbackNode": "timeoutFallback"
}
}

Node Composition and Reuse

Sub-Workflows

Create reusable sub-workflows:

{
"type": "subworkflow",
"config": {
"workflowId": "payment-processor",
"version": "latest",
"input": {
"paymentMethod": "$.input.paymentMethod",
"amount": "$.input.order.total",
"currency": "$.input.order.currency",
"description": "Order #${$.input.order.id}"
},
"waitForCompletion": true,
"timeout": 30000,
"outputMapping": {
"paymentResult": "$.result"
}
}
}

Node Groups

Group multiple nodes together:

{
"type": "nodeGroup",
"config": {
"name": "orderProcessing",
"nodes": [
{
"name": "validateOrder",
"type": "validate",
"config": { /* ... */ }
},
{
"name": "calculateTax",
"type": "customCode",
"config": { /* ... */ }
},
{
"name": "processPayment",
"type": "payment",
"config": { /* ... */ }
}
],
"inputMapping": {
"order": "$.input.order",
"customer": "$.input.customer"
},
"outputMapping": {
"result": {
"orderValid": "$.context.validateOrder.valid",
"taxAmount": "$.context.calculateTax.taxAmount",
"paymentStatus": "$.context.processPayment.status"
}
}
}
}

Debugging and Monitoring

Log Node

Record debugging and audit information:

{
"type": "log",
"config": {
"level": "info",
"message": "Processing order: ${$.input.order.id} - Amount: ${$.input.order.total} ${$.input.order.currency}",
"data": {
"order": "$.input.order",
"customer": {
"id": "$.input.customer.id",
"email": "$.input.customer.email"
},
"processingTime": "$.context.metrics.processingTime"
},
"tags": ["order", "payment"]
}
}

Metrics Collection

Collect execution metrics:

{
"type": "metrics",
"config": {
"measurements": [
{
"name": "order_processing_time",
"value": "$.context.metrics.processingTime",
"unit": "milliseconds",
"dimensions": {
"orderId": "$.input.order.id",
"orderType": "$.input.order.type",
"customerId": "$.input.customer.id"
}
},
{
"name": "order_value",
"value": "$.input.order.total",
"unit": "currency",
"dimensions": {
"currency": "$.input.order.currency",
"orderType": "$.input.order.type"
}
}
],
"destination": "cloudwatch"
}
}

Best Practices

Node Naming and Organization

  • Use clear, descriptive node names
  • Organize related nodes into node groups
  • Add descriptions and comments for complex nodes
  • Follow consistent naming conventions

Node Configuration Optimization

  • Process only necessary data to avoid redundant operations
  • Use data mappings to reduce the amount of data transferred
  • Configure appropriate timeout and retry strategies
  • Add error handling for critical nodes

Security Best Practices

  • Use environment variables and secret storage to manage sensitive information
  • Follow the principle of least privilege when configuring integration permissions
  • Validate and sanitize all user input
  • Use audit logs to record sensitive operations

FAQ

Details

How do I create a custom operator node? There are two main ways to create a custom operator node:

  1. Use the Custom Code Node — For simple custom logic, you can use the built-in custom code node and write JavaScript/Python code.

  2. Develop a Standalone Node Component — For more complex functionality or reusable components:

    • Use the GeniSpace SDK to create a custom node package
    • Implement the node interface and define input/output schemas
    • Package and register the node
    • Publish it to your organization's node repository
Details

How do I handle high-volume data workflows? When processing large volumes of data:

  1. Use pagination and batch processing nodes
  2. Configure streaming processing instead of loading all data at once
  3. Use data filtering nodes to reduce the processing volume
  4. Consider using dedicated big data processing nodes
  5. For particularly large datasets, configure asynchronous execution and implement result notification mechanisms
Details

How do I optimize node performance? Key methods for optimizing node performance:

  1. Select only the necessary fields for processing
  2. For database operations, optimize queries and add appropriate indexes
  3. Use caching to reduce redundant computations and API calls
  4. Configure concurrent execution for nodes without mutual dependencies
  5. Use the correct data structures and algorithms
  6. Monitor node execution time to identify performance bottlenecks

Next Steps