Data Processing Tools
Tools steps provide essential file operations, utilities, and external service integrations for Jetty workflows. These steps handle everything from text manipulation to image processing and webhook notifications.
Available Steps (7)
text_concatenate
Combines multiple text values from trajectory paths into a single concatenated output.
Activity Name: text_concatenate
Use Cases: Document merging, report aggregation, log file consolidation, multi-file text processing
read_text_file
Reads text content from the storage backend with trajectory storage context integration.
Activity Name: read_text_file
Use Cases: Loading configuration files, reading data inputs, text file processing, content extraction
split_text
Splits text into a list based on a delimiter.
Activity Name: split_text
Use Cases: Parsing CSV data, breaking text into lines, tokenizing input, list generation from text
download_image
Downloads images from URLs with metadata extraction and storage integration.
Activity Name: download_image
Use Cases: Asset collection, image pipeline inputs, web scraping, media processing workflows
save_text_file
Saves text content to the storage backend with configurable paths and formats.
Activity Name: save_text_file
Use Cases: Result persistence, report generation, data export, workflow output storage
add_image_metadata
Adds EXIF metadata to image files from previous workflow steps using PIL.
Activity Name: add_image_metadata
Use Cases: Image cataloging, metadata enrichment, asset management, photography workflows
webhook_notify
Sends HTTP notifications to external services with trajectory data.
Activity Name: webhook_notify
Use Cases: Workflow notifications, system integration, event triggering, status updates
Step Documentation
text_concatenate
Combines multiple text files into a single output with configurable separators.
Configuration
{
"activity": "text_concatenate",
"input_files": ["file1.txt", "file2.txt", "file3.txt"],
"separator": "\n---\n"
}
Parameters
input_files(array, required) - List of file paths to concatenateinput_files_path(string) - Alternative: Path to array from previous stepseparator(string, default:"\n") - Text to insert between filesoutput_path(string, default:"concatenated.txt") - Output file pathinclude_filenames(boolean, default: false) - Add filename headers
Input Patterns
{
"input_files": ["report1.txt", "report2.txt", "report3.txt"]
}
Example
{
"name": "merge_reports",
"activity": "text_concatenate",
"config": {
"input_files": [
"daily_summary.txt",
"metrics_report.txt",
"error_log.txt"
],
"separator": "\n\n========== NEXT REPORT ==========\n\n",
"output_path": "combined_report_{{date}}.txt",
"include_filenames": true
}
}
read_text_file
Reads text content from storage with full trajectory integration.
Configuration
{
"activity": "read_text_file",
"text_path": "init_params.file_path"
}
Parameters
text_path(string, required) - Path expression to the file path to read
Output
text(string) - The decoded file contenttext_content_path(string) - The resolved storage path
Example
{
"name": "load_configuration",
"activity": "read_text_file",
"config": {
"text_path": "init_params.config_file"
}
}
split_text
Splits text into a list based on a delimiter.
Configuration
{
"activity": "split_text",
"text": "init_params.input_text",
"delimiter": "\n"
}
Parameters
text(string, required) - Text to split (or path expression)delimiter(string, default:"\n") - Delimiter to split onstrip_items(bool, default:true) - Strip whitespace from each itemfilter_empty(bool, default:false) - Remove empty strings from result
Output
items(array) - List of split stringscount(int) - Number of items in the listjson(array) - Same as items (for compatibility)
Examples
Split Lines
{
"name": "parse_lines",
"activity": "split_text",
"config": {
"text": "init_params.multi_line_text",
"delimiter": "\n",
"filter_empty": true
}
}
Parse CSV Row
{
"name": "parse_csv",
"activity": "split_text",
"config": {
"text": "reader.outputs.text",
"delimiter": ",",
"strip_items": true
}
}
Split into Items for Parallel Processing
{
"steps": ["split_input", "process_items"],
"step_configs": {
"split_input": {
"activity": "split_text",
"text": "init_params.items_string",
"delimiter": "|",
"filter_empty": true
},
"process_items": {
"activity": "list_emit_await",
"items_path": "split_input.outputs.items",
"task_reference": {"task_name": "process_single"}
}
}
}
download_image
Downloads images from URLs with automatic metadata extraction.
Configuration
{
"activity": "download_image",
"url": "https://example.com/image.jpg",
"output_path": "images/downloaded.jpg"
}
Parameters
url(string, required) - Image URL to downloadoutput_path(string, required) - Storage path for downloaded imagetimeout(int, default: 30) - Download timeout in secondsheaders(object) - Custom HTTP headersextract_metadata(boolean, default: true) - Extract EXIF datavalidate_image(boolean, default: true) - Verify image format
Advanced Options
{
"url": "https://api.example.com/protected-image.jpg",
"headers": {
"Authorization": "Bearer {{auth_token}}",
"User-Agent": "Jetty/1.0"
},
"timeout": 60,
"output_path": "assets/images/{{image_id}}.jpg"
}
Example
{
"name": "collect_product_images",
"activity": "download_image",
"config": {
"url": "{{product_api.outputs.image_url}}",
"output_path": "products/{{product_api.outputs.sku}}/main.jpg",
"extract_metadata": true,
"headers": {
"Referer": "https://example.com"
}
}
}
save_text_file
Saves text content to storage with flexible path configuration.
Configuration
{
"activity": "save_text_file",
"content": "File content here",
"file_path": "outputs/result.txt"
}
Parameters
content(string, required) - Text content to savecontent_path(string) - Alternative: Path to content from previous stepfile_path(string, required) - Output file pathencoding(string, default:"utf-8") - Text encodingcreate_directories(boolean, default: true) - Create parent directoriesoverwrite(boolean, default: true) - Overwrite existing files
Content Sources
{
"content": "Static text content",
"file_path": "output.txt"
}
Example
{
"name": "save_analysis_results",
"activity": "save_text_file",
"config": {
"content_path": "analyzer.outputs.report",
"file_path": "analysis/{{workflow.run_id}}/final_report.md",
"encoding": "utf-8",
"create_directories": true
}
}
add_image_metadata
Enriches images with custom metadata and EXIF information.
Configuration
{
"activity": "add_image_metadata",
"image_path": "images/photo.jpg",
"metadata": {
"description": "Product photo",
"tags": ["product", "catalog"]
}
}
Parameters
image_path(string, required) - Path to image filemetadata(object, required) - Metadata to addoutput_path(string) - Output path (defaults to overwrite)preserve_original(boolean, default: false) - Keep original fileformat(string) - Output format (JPEG, PNG, etc.)
Metadata Fields
{
"metadata": {
"title": "Image Title",
"description": "Detailed description",
"author": "Photographer Name",
"copyright": "© 2024 Company",
"tags": ["tag1", "tag2"],
"custom_fields": {
"project_id": "12345",
"version": "1.0"
}
}
}
Example
webhook_notify
Sends HTTP notifications with workflow data to external services.
Configuration
{
"activity": "webhook_notify",
"webhook_url": "https://api.example.com/webhooks/workflow",
"payload": {
"status": "completed",
"workflow_id": "{{workflow.run_id}}"
}
}
Parameters
webhook_url(string, required) - Webhook endpoint URLpayload(object, required) - JSON payload to sendmethod(string, default:"POST") - HTTP methodheaders(object) - Custom HTTP headersauth_header_secret(string) - Secret for Authorization headertimeout(int, default: 30) - Request timeoutretry_count(int, default: 3) - Number of retries
Authentication Patterns
{
"webhook_url": "https://api.example.com/notify",
"auth_header_secret": "WEBHOOK_AUTH_TOKEN",
"payload": {"message": "Workflow completed"}
}
Example
{
"name": "notify_completion",
"activity": "webhook_notify",
"config": {
"webhook_url": "https://api.slack.com/webhooks/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX",
"method": "POST",
"payload": {
"text": "Workflow {{workflow.run_id}} completed successfully",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Workflow Complete* :white_check_mark:\n*Run ID:* {{workflow.run_id}}\n*Duration:* {{workflow.duration_seconds}}s"
}
}
]
},
"retry_count": 3,
"timeout": 10
}
}
Advanced Patterns
File Processing Pipeline
{
"steps": [
{
"name": "read_input",
"activity": "read_text_file",
"config": {
"file_path": "inputs/raw_data.txt"
}
},
{
"name": "process_data",
"activity": "text_concatenate",
"config": {
"input_files_path": "read_input.outputs.related_files",
"separator": "\n---PROCESSED---\n"
}
},
{
"name": "save_output",
"activity": "save_text_file",
"config": {
"content_path": "process_data.outputs.text",
"file_path": "outputs/processed_{{timestamp}}.txt"
}
}
]
}
Image Collection Workflow
{
"steps": [
{
"name": "download_images",
"activity": "download_image",
"config": {
"url": "{{image_urls[0]}}",
"output_path": "collection/image_1.jpg"
}
},
{
"name": "add_metadata",
"activity": "add_image_metadata",
"config": {
"image_path": "download_images.outputs.file_path",
"metadata": {
"collection": "product_catalog",
"timestamp": "{{workflow.start_time}}"
}
}
},
{
"name": "notify_complete",
"activity": "webhook_notify",
"config": {
"webhook_url": "{{notification_endpoint}}",
"payload": {
"images_processed": 1,
"collection_id": "{{collection_id}}"
}
}
}
]
}
Error Handling
Common Issues
- File Not Found: Check trajectory paths and storage configuration
- Download Timeout: Increase timeout or implement retry logic
- Invalid Encoding: Specify correct encoding for text files
- Webhook Failure: Verify endpoint URL and authentication
Best Practices
- Always validate file paths before operations
- Use appropriate timeouts for network operations
- Implement proper error handling for external services
- Monitor storage usage and implement cleanup strategies
Performance Tips
File Operations
- Stream large files when possible
- Use batch operations for multiple files
- Implement caching for frequently accessed content
- Consider compression for large text files
Network Operations
- Set appropriate timeouts based on expected latency
- Implement exponential backoff for retries
- Use connection pooling for multiple requests
- Monitor bandwidth usage for large downloads
Storage Integration
All tools steps integrate seamlessly with Jetty's storage layer:
Storage Backends
- S3:
s3://bucket-name/path/to/file - GCS:
gs://bucket-name/path/to/file - Local:
file:///absolute/path/to/file
Path Resolution
- Relative paths resolve within trajectory context
- Absolute paths access shared storage areas
- Dynamic paths support template variables
Related Steps
- Step Library Overview - Complete step catalog
Integration Examples
View complete workflow examples in the Flow Library:
- File processing pipelines
- Image collection workflows
- Multi-step data transformations
- External service integrations