Control Flow Steps
Control flow steps provide powerful orchestration capabilities for Jetty workflows. They enable you to process lists of items in parallel, launch child workflows, and extract results from multiple trajectories.
Available Steps
| Step | Activity Name | Description |
|---|---|---|
| List Emit Await | list_emit_await | Process items in parallel by launching child workflows |
| Extract From Trajectories | extract_from_trajectories | Extract data from completed child trajectories |
When to Use Control Flow Steps
Control flow steps are essential for:
- Fan-out patterns: Process many items by launching parallel workflows
- Batch processing: Run the same workflow on multiple inputs
- Data aggregation: Collect and combine results from child workflows
- Pipeline orchestration: Build complex multi-stage processing pipelines
List Emit Await
Emits child workflows for each item in a list and waits for all to complete.
Activity Name
list_emit_await
Overview
This step takes a list of items, launches a child workflow for each item, waits for all workflows to complete, and returns references to the completed trajectories. It's the foundation for parallel processing patterns.
Key features:
- Parallel execution: Controlled concurrency with configurable limits
- Template-based data mapping: Map item data to child workflow parameters
- Timeout handling: Graceful timeout with partial result collection
- Parent-child tracking: Automatic tracking of workflow relationships
- Inherited context: Child workflows inherit collection, storage, and auth from parent
Configuration Parameters
Required Parameters
| Parameter | Type | Description |
|---|---|---|
items_path | string | Path expression to the list of items to process |
task_reference.task_name | string | Name of the workflow to launch for each item |
Optional Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
data_mapping | object | {} | Template mapping from item to child init_params |
execution_config.max_parallel | int | 5 | Maximum concurrent child workflows |
execution_config.timeout_seconds | int | 2700 | Global timeout for all workflows |
execution_config.fail_fast | bool | false | Stop on first failure |
execution_config.collect_partial_results | bool | true | Collect completed results on timeout |
Template Syntax
Data mapping uses template expressions with magic variables:
| Variable | Description |
|---|---|
{{ $item }} | The current item from the list |
{{ $index }} | Zero-based index of the current item |
{{ $parent_id }} | Trajectory ID of the parent workflow |
{{ $item_str }} | String representation of the item |
{{ $item_len }} | Length of the item (if applicable) |
Path expressions are also supported:
{{ init_params.some_value }}- Access parent's init_params{{ step_name.outputs.field }}- Access outputs from previous steps
Examples
Basic List Processing
{
"name": "process_items",
"activity": "list_emit_await",
"config": {
"items_path": "init_params.documents",
"task_reference": {
"task_name": "analyze_document"
},
"data_mapping": {
"document": "{{ $item }}",
"index": "{{ $index }}"
},
"execution_config": {
"max_parallel": 10,
"timeout_seconds": 3600
}
}
}
Processing with Inherited Parameters
{
"name": "batch_evaluate",
"activity": "list_emit_await",
"config": {
"items_path": "data_loader.outputs.test_cases",
"task_reference": {
"task_name": "run_evaluation"
},
"data_mapping": {
"test_case": "{{ $item }}",
"model": "{{ init_params.model }}",
"temperature": "{{ init_params.temperature }}"
},
"execution_config": {
"max_parallel": 5,
"fail_fast": false,
"collect_partial_results": true
}
}
}
Output Structure
{
"outputs": {
"trajectory_references": [
{
"index": 0,
"item": "original item data",
"trajectory_id": "abc123",
"storage_path": "collection/workflow/0001",
"name": "workflow_name"
}
],
"failed_workflows": [],
"statistics": {
"total_items": 10,
"emitted_workflows": 10,
"completed_workflows": 10,
"failed_workflows": 0,
"success_rate": 1.0,
"execution_time_seconds": 45.2,
"timeout_occurred": false
},
"task_reference": {
"collection_name": "my_collection",
"task_name": "analyze_document"
}
}
}
Extract From Trajectories
Extracts specified values from a list of trajectory references.
Activity Name
extract_from_trajectories
Overview
After using list_emit_await to run parallel workflows, use this step to extract specific data from each completed trajectory. It supports flexible path expressions for accessing any data within child trajectories.
Configuration Parameters
Required Parameters
| Parameter | Type | Description |
|---|---|---|
trajectory_list_path | string | Path to trajectory references (from list_emit_await) |
extract_keys | object | Map of output key names to path expressions |
Optional Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
max_parallel | int | 10 | Maximum parallel trajectory loads |
include_metadata | bool | true | Include trajectory metadata in output |
continue_on_error | bool | true | Continue if some extractions fail |
Path Expressions
Extract values using dot notation:
status- Trajectory statusinit_params.field- Value from init_paramssteps.step_name.outputs.field- Step output valuelabels[0].value- Label value
Examples
Extract Scores from Evaluations
{
"name": "collect_results",
"activity": "extract_from_trajectories",
"config": {
"trajectory_list_path": "batch_evaluate.outputs.trajectory_references",
"extract_keys": {
"score": "steps.evaluate.outputs.score",
"explanation": "steps.evaluate.outputs.explanation",
"status": "status"
},
"include_metadata": true
}
}
Extract Multiple Values
{
"name": "aggregate_data",
"activity": "extract_from_trajectories",
"config": {
"trajectory_list_path": "process_items.outputs.trajectory_references",
"extract_keys": {
"result": "steps.processor.outputs.result",
"error": "steps.processor.outputs.error",
"duration": "steps.processor.outputs.duration_seconds",
"input_text": "init_params.text"
},
"max_parallel": 20,
"continue_on_error": true
}
}
Output Structure
{
"outputs": {
"extracted_data": [
{
"trajectory_id": "abc123",
"score": 0.95,
"explanation": "High quality output...",
"status": "completed",
"storage_path": "collection/workflow/0001",
"name": "workflow_name",
"item": "original item",
"index": 0
}
],
"total_processed": 10,
"total_found": 10,
"total_errors": 0,
"errors": [],
"extraction_config": {
"extract_keys": {...},
"include_metadata": true,
"max_parallel": 10
}
}
}
Common Patterns
Fan-Out / Fan-In Pattern
Process items in parallel and aggregate results:
{
"steps": ["fan_out", "fan_in", "summarize"],
"step_configs": {
"fan_out": {
"activity": "list_emit_await",
"items_path": "init_params.items",
"task_reference": {
"task_name": "process_single_item"
},
"data_mapping": {
"item": "{{ $item }}"
}
},
"fan_in": {
"activity": "extract_from_trajectories",
"trajectory_list_path": "fan_out.outputs.trajectory_references",
"extract_keys": {
"result": "steps.process.outputs.result",
"score": "steps.process.outputs.score"
}
},
"summarize": {
"activity": "litellm_chat",
"prompt_path": "fan_in.outputs.extracted_data",
"system_prompt": "Summarize these results..."
}
}
}
Batch Evaluation Pipeline
Run evaluations on multiple test cases:
{
"steps": ["load_data", "evaluate_all", "collect_scores", "analyze"],
"step_configs": {
"load_data": {
"activity": "read_text_file",
"text_path": "init_params.test_file"
},
"evaluate_all": {
"activity": "list_emit_await",
"items_path": "load_data.outputs.json.test_cases",
"task_reference": {
"task_name": "single_evaluation"
},
"data_mapping": {
"input": "{{ $item.input }}",
"expected": "{{ $item.expected }}"
},
"execution_config": {
"max_parallel": 10
}
},
"collect_scores": {
"activity": "extract_from_trajectories",
"trajectory_list_path": "evaluate_all.outputs.trajectory_references",
"extract_keys": {
"score": "steps.judge.outputs.score",
"passed": "steps.verify.outputs.passed"
}
},
"analyze": {
"activity": "simple_judge",
"instruction": "Analyze these evaluation results and provide a summary.",
"item_path": "collect_scores.outputs.extracted_data"
}
}
}
Multi-Model Comparison
Compare multiple models on the same inputs:
{
"steps": ["run_models", "collect_results"],
"init_params": {
"models": ["gpt-4o", "claude-3-5-sonnet-20241022", "gemini-pro"],
"test_prompt": "Write a poem about coding"
},
"step_configs": {
"run_models": {
"activity": "list_emit_await",
"items_path": "init_params.models",
"task_reference": {
"task_name": "generate_with_model"
},
"data_mapping": {
"model": "{{ $item }}",
"prompt": "{{ init_params.test_prompt }}"
}
},
"collect_results": {
"activity": "extract_from_trajectories",
"trajectory_list_path": "run_models.outputs.trajectory_references",
"extract_keys": {
"model": "init_params.model",
"output": "steps.generate.outputs.text",
"tokens": "steps.generate.outputs.token_count"
}
}
}
}
Best Practices
Concurrency Management
- Start with
max_parallel: 5and adjust based on downstream capacity - Consider API rate limits when processing many items
- Use
fail_fast: truefor critical pipelines where any failure should stop processing
Timeout Handling
- Set realistic timeouts based on child workflow complexity
- Enable
collect_partial_resultsto recover completed work on timeout - Monitor
statistics.timeout_occurredto detect timing issues
Error Handling
- Use
continue_on_error: truefor batch jobs that can tolerate partial failures - Check
failed_workflowsarray for error details - Implement retry logic for transient failures
Data Mapping
- Keep templates simple and readable
- Use path expressions for accessing parent context
- Validate data mapping with small test batches first
Related Steps
- Simple Judge - Evaluate collected results
- Number Sequence Generator - Generate item lists
- LiteLLM Chat - Process aggregated data