Skip to main content

Data Processing Tools

Tools steps provide essential file operations, utilities, and external service integrations for Jetty workflows. These steps handle everything from text manipulation to image processing and webhook notifications.

Available Steps (7)

text_concatenate

Combines multiple text values from trajectory paths into a single concatenated output.

Activity Name: text_concatenate

Use Cases: Document merging, report aggregation, log file consolidation, multi-file text processing

read_text_file

Reads text content from the storage backend with trajectory storage context integration.

Activity Name: read_text_file

Use Cases: Loading configuration files, reading data inputs, text file processing, content extraction

split_text

Splits text into a list based on a delimiter.

Activity Name: split_text

Use Cases: Parsing CSV data, breaking text into lines, tokenizing input, list generation from text

download_image

Downloads images from URLs with metadata extraction and storage integration.

Activity Name: download_image

Use Cases: Asset collection, image pipeline inputs, web scraping, media processing workflows

save_text_file

Saves text content to the storage backend with configurable paths and formats.

Activity Name: save_text_file

Use Cases: Result persistence, report generation, data export, workflow output storage

add_image_metadata

Adds EXIF metadata to image files from previous workflow steps using PIL.

Activity Name: add_image_metadata

Use Cases: Image cataloging, metadata enrichment, asset management, photography workflows

webhook_notify

Sends HTTP notifications to external services with trajectory data.

Activity Name: webhook_notify

Use Cases: Workflow notifications, system integration, event triggering, status updates

Step Documentation

text_concatenate

Combines multiple text files into a single output with configurable separators.

Configuration

{
"activity": "text_concatenate",
"input_files": ["file1.txt", "file2.txt", "file3.txt"],
"separator": "\n---\n"
}

Parameters

  • input_files (array, required) - List of file paths to concatenate
  • input_files_path (string) - Alternative: Path to array from previous step
  • separator (string, default: "\n") - Text to insert between files
  • output_path (string, default: "concatenated.txt") - Output file path
  • include_filenames (boolean, default: false) - Add filename headers

Input Patterns

{
"input_files": ["report1.txt", "report2.txt", "report3.txt"]
}

Example

{
"name": "merge_reports",
"activity": "text_concatenate",
"config": {
"input_files": [
"daily_summary.txt",
"metrics_report.txt",
"error_log.txt"
],
"separator": "\n\n========== NEXT REPORT ==========\n\n",
"output_path": "combined_report_{{date}}.txt",
"include_filenames": true
}
}

read_text_file

Reads text content from storage with full trajectory integration.

Configuration

{
"activity": "read_text_file",
"text_path": "init_params.file_path"
}

Parameters

  • text_path (string, required) - Path expression to the file path to read

Output

  • text (string) - The decoded file content
  • text_content_path (string) - The resolved storage path

Example

{
"name": "load_configuration",
"activity": "read_text_file",
"config": {
"text_path": "init_params.config_file"
}
}

split_text

Splits text into a list based on a delimiter.

Configuration

{
"activity": "split_text",
"text": "init_params.input_text",
"delimiter": "\n"
}

Parameters

  • text (string, required) - Text to split (or path expression)
  • delimiter (string, default: "\n") - Delimiter to split on
  • strip_items (bool, default: true) - Strip whitespace from each item
  • filter_empty (bool, default: false) - Remove empty strings from result

Output

  • items (array) - List of split strings
  • count (int) - Number of items in the list
  • json (array) - Same as items (for compatibility)

Examples

Split Lines

{
"name": "parse_lines",
"activity": "split_text",
"config": {
"text": "init_params.multi_line_text",
"delimiter": "\n",
"filter_empty": true
}
}

Parse CSV Row

{
"name": "parse_csv",
"activity": "split_text",
"config": {
"text": "reader.outputs.text",
"delimiter": ",",
"strip_items": true
}
}

Split into Items for Parallel Processing

{
"steps": ["split_input", "process_items"],
"step_configs": {
"split_input": {
"activity": "split_text",
"text": "init_params.items_string",
"delimiter": "|",
"filter_empty": true
},
"process_items": {
"activity": "list_emit_await",
"items_path": "split_input.outputs.items",
"task_reference": {"task_name": "process_single"}
}
}
}

download_image

Downloads images from URLs with automatic metadata extraction.

Configuration

{
"activity": "download_image",
"url": "https://example.com/image.jpg",
"output_path": "images/downloaded.jpg"
}

Parameters

  • url (string, required) - Image URL to download
  • output_path (string, required) - Storage path for downloaded image
  • timeout (int, default: 30) - Download timeout in seconds
  • headers (object) - Custom HTTP headers
  • extract_metadata (boolean, default: true) - Extract EXIF data
  • validate_image (boolean, default: true) - Verify image format

Advanced Options

{
"url": "https://api.example.com/protected-image.jpg",
"headers": {
"Authorization": "Bearer {{auth_token}}",
"User-Agent": "Jetty/1.0"
},
"timeout": 60,
"output_path": "assets/images/{{image_id}}.jpg"
}

Example

{
"name": "collect_product_images",
"activity": "download_image",
"config": {
"url": "{{product_api.outputs.image_url}}",
"output_path": "products/{{product_api.outputs.sku}}/main.jpg",
"extract_metadata": true,
"headers": {
"Referer": "https://example.com"
}
}
}

save_text_file

Saves text content to storage with flexible path configuration.

Configuration

{
"activity": "save_text_file",
"content": "File content here",
"file_path": "outputs/result.txt"
}

Parameters

  • content (string, required) - Text content to save
  • content_path (string) - Alternative: Path to content from previous step
  • file_path (string, required) - Output file path
  • encoding (string, default: "utf-8") - Text encoding
  • create_directories (boolean, default: true) - Create parent directories
  • overwrite (boolean, default: true) - Overwrite existing files

Content Sources

{
"content": "Static text content",
"file_path": "output.txt"
}

Example

{
"name": "save_analysis_results",
"activity": "save_text_file",
"config": {
"content_path": "analyzer.outputs.report",
"file_path": "analysis/{{workflow.run_id}}/final_report.md",
"encoding": "utf-8",
"create_directories": true
}
}

add_image_metadata

Enriches images with custom metadata and EXIF information.

Configuration

{
"activity": "add_image_metadata",
"image_path": "images/photo.jpg",
"metadata": {
"description": "Product photo",
"tags": ["product", "catalog"]
}
}

Parameters

  • image_path (string, required) - Path to image file
  • metadata (object, required) - Metadata to add
  • output_path (string) - Output path (defaults to overwrite)
  • preserve_original (boolean, default: false) - Keep original file
  • format (string) - Output format (JPEG, PNG, etc.)

Metadata Fields

{
"metadata": {
"title": "Image Title",
"description": "Detailed description",
"author": "Photographer Name",
"copyright": "© 2024 Company",
"tags": ["tag1", "tag2"],
"custom_fields": {
"project_id": "12345",
"version": "1.0"
}
}
}

Example

webhook_notify

Sends HTTP notifications with workflow data to external services.

Configuration

{
"activity": "webhook_notify",
"webhook_url": "https://api.example.com/webhooks/workflow",
"payload": {
"status": "completed",
"workflow_id": "{{workflow.run_id}}"
}
}

Parameters

  • webhook_url (string, required) - Webhook endpoint URL
  • payload (object, required) - JSON payload to send
  • method (string, default: "POST") - HTTP method
  • headers (object) - Custom HTTP headers
  • auth_header_secret (string) - Secret for Authorization header
  • timeout (int, default: 30) - Request timeout
  • retry_count (int, default: 3) - Number of retries

Authentication Patterns

{
"webhook_url": "https://api.example.com/notify",
"auth_header_secret": "WEBHOOK_AUTH_TOKEN",
"payload": {"message": "Workflow completed"}
}

Example

{
"name": "notify_completion",
"activity": "webhook_notify",
"config": {
"webhook_url": "https://api.slack.com/webhooks/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX",
"method": "POST",
"payload": {
"text": "Workflow {{workflow.run_id}} completed successfully",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Workflow Complete* :white_check_mark:\n*Run ID:* {{workflow.run_id}}\n*Duration:* {{workflow.duration_seconds}}s"
}
}
]
},
"retry_count": 3,
"timeout": 10
}
}

Advanced Patterns

File Processing Pipeline

{
"steps": [
{
"name": "read_input",
"activity": "read_text_file",
"config": {
"file_path": "inputs/raw_data.txt"
}
},
{
"name": "process_data",
"activity": "text_concatenate",
"config": {
"input_files_path": "read_input.outputs.related_files",
"separator": "\n---PROCESSED---\n"
}
},
{
"name": "save_output",
"activity": "save_text_file",
"config": {
"content_path": "process_data.outputs.text",
"file_path": "outputs/processed_{{timestamp}}.txt"
}
}
]
}

Image Collection Workflow

{
"steps": [
{
"name": "download_images",
"activity": "download_image",
"config": {
"url": "{{image_urls[0]}}",
"output_path": "collection/image_1.jpg"
}
},
{
"name": "add_metadata",
"activity": "add_image_metadata",
"config": {
"image_path": "download_images.outputs.file_path",
"metadata": {
"collection": "product_catalog",
"timestamp": "{{workflow.start_time}}"
}
}
},
{
"name": "notify_complete",
"activity": "webhook_notify",
"config": {
"webhook_url": "{{notification_endpoint}}",
"payload": {
"images_processed": 1,
"collection_id": "{{collection_id}}"
}
}
}
]
}

Error Handling

Common Issues

  • File Not Found: Check trajectory paths and storage configuration
  • Download Timeout: Increase timeout or implement retry logic
  • Invalid Encoding: Specify correct encoding for text files
  • Webhook Failure: Verify endpoint URL and authentication

Best Practices

  • Always validate file paths before operations
  • Use appropriate timeouts for network operations
  • Implement proper error handling for external services
  • Monitor storage usage and implement cleanup strategies

Performance Tips

File Operations

  • Stream large files when possible
  • Use batch operations for multiple files
  • Implement caching for frequently accessed content
  • Consider compression for large text files

Network Operations

  • Set appropriate timeouts based on expected latency
  • Implement exponential backoff for retries
  • Use connection pooling for multiple requests
  • Monitor bandwidth usage for large downloads

Storage Integration

All tools steps integrate seamlessly with Jetty's storage layer:

Storage Backends

  • S3: s3://bucket-name/path/to/file
  • GCS: gs://bucket-name/path/to/file
  • Local: file:///absolute/path/to/file

Path Resolution

  • Relative paths resolve within trajectory context
  • Absolute paths access shared storage areas
  • Dynamic paths support template variables

Integration Examples

View complete workflow examples in the Flow Library:

  • File processing pipelines
  • Image collection workflows
  • Multi-step data transformations
  • External service integrations