Skip to main content

Control Flow Steps

Control flow steps provide powerful orchestration capabilities for Jetty workflows. They enable you to process lists of items in parallel, launch child workflows, and extract results from multiple trajectories.

Available Steps

StepActivity NameDescription
List Emit Awaitlist_emit_awaitProcess items in parallel by launching child workflows
Extract From Trajectoriesextract_from_trajectoriesExtract data from completed child trajectories

When to Use Control Flow Steps

Control flow steps are essential for:

  • Fan-out patterns: Process many items by launching parallel workflows
  • Batch processing: Run the same workflow on multiple inputs
  • Data aggregation: Collect and combine results from child workflows
  • Pipeline orchestration: Build complex multi-stage processing pipelines

List Emit Await

Emits child workflows for each item in a list and waits for all to complete.

Activity Name

list_emit_await

Overview

This step takes a list of items, launches a child workflow for each item, waits for all workflows to complete, and returns references to the completed trajectories. It's the foundation for parallel processing patterns.

Key features:

  • Parallel execution: Controlled concurrency with configurable limits
  • Template-based data mapping: Map item data to child workflow parameters
  • Timeout handling: Graceful timeout with partial result collection
  • Parent-child tracking: Automatic tracking of workflow relationships
  • Inherited context: Child workflows inherit collection, storage, and auth from parent

Configuration Parameters

Required Parameters

ParameterTypeDescription
items_pathstringPath expression to the list of items to process
task_reference.task_namestringName of the workflow to launch for each item

Optional Parameters

ParameterTypeDefaultDescription
data_mappingobject{}Template mapping from item to child init_params
execution_config.max_parallelint5Maximum concurrent child workflows
execution_config.timeout_secondsint2700Global timeout for all workflows
execution_config.fail_fastboolfalseStop on first failure
execution_config.collect_partial_resultsbooltrueCollect completed results on timeout

Template Syntax

Data mapping uses template expressions with magic variables:

VariableDescription
{{ $item }}The current item from the list
{{ $index }}Zero-based index of the current item
{{ $parent_id }}Trajectory ID of the parent workflow
{{ $item_str }}String representation of the item
{{ $item_len }}Length of the item (if applicable)

Path expressions are also supported:

  • {{ init_params.some_value }} - Access parent's init_params
  • {{ step_name.outputs.field }} - Access outputs from previous steps

Examples

Basic List Processing

{
"name": "process_items",
"activity": "list_emit_await",
"config": {
"items_path": "init_params.documents",
"task_reference": {
"task_name": "analyze_document"
},
"data_mapping": {
"document": "{{ $item }}",
"index": "{{ $index }}"
},
"execution_config": {
"max_parallel": 10,
"timeout_seconds": 3600
}
}
}

Processing with Inherited Parameters

{
"name": "batch_evaluate",
"activity": "list_emit_await",
"config": {
"items_path": "data_loader.outputs.test_cases",
"task_reference": {
"task_name": "run_evaluation"
},
"data_mapping": {
"test_case": "{{ $item }}",
"model": "{{ init_params.model }}",
"temperature": "{{ init_params.temperature }}"
},
"execution_config": {
"max_parallel": 5,
"fail_fast": false,
"collect_partial_results": true
}
}
}

Output Structure

{
"outputs": {
"trajectory_references": [
{
"index": 0,
"item": "original item data",
"trajectory_id": "abc123",
"storage_path": "collection/workflow/0001",
"name": "workflow_name"
}
],
"failed_workflows": [],
"statistics": {
"total_items": 10,
"emitted_workflows": 10,
"completed_workflows": 10,
"failed_workflows": 0,
"success_rate": 1.0,
"execution_time_seconds": 45.2,
"timeout_occurred": false
},
"task_reference": {
"collection_name": "my_collection",
"task_name": "analyze_document"
}
}
}

Extract From Trajectories

Extracts specified values from a list of trajectory references.

Activity Name

extract_from_trajectories

Overview

After using list_emit_await to run parallel workflows, use this step to extract specific data from each completed trajectory. It supports flexible path expressions for accessing any data within child trajectories.

Configuration Parameters

Required Parameters

ParameterTypeDescription
trajectory_list_pathstringPath to trajectory references (from list_emit_await)
extract_keysobjectMap of output key names to path expressions

Optional Parameters

ParameterTypeDefaultDescription
max_parallelint10Maximum parallel trajectory loads
include_metadatabooltrueInclude trajectory metadata in output
continue_on_errorbooltrueContinue if some extractions fail

Path Expressions

Extract values using dot notation:

  • status - Trajectory status
  • init_params.field - Value from init_params
  • steps.step_name.outputs.field - Step output value
  • labels[0].value - Label value

Examples

Extract Scores from Evaluations

{
"name": "collect_results",
"activity": "extract_from_trajectories",
"config": {
"trajectory_list_path": "batch_evaluate.outputs.trajectory_references",
"extract_keys": {
"score": "steps.evaluate.outputs.score",
"explanation": "steps.evaluate.outputs.explanation",
"status": "status"
},
"include_metadata": true
}
}

Extract Multiple Values

{
"name": "aggregate_data",
"activity": "extract_from_trajectories",
"config": {
"trajectory_list_path": "process_items.outputs.trajectory_references",
"extract_keys": {
"result": "steps.processor.outputs.result",
"error": "steps.processor.outputs.error",
"duration": "steps.processor.outputs.duration_seconds",
"input_text": "init_params.text"
},
"max_parallel": 20,
"continue_on_error": true
}
}

Output Structure

{
"outputs": {
"extracted_data": [
{
"trajectory_id": "abc123",
"score": 0.95,
"explanation": "High quality output...",
"status": "completed",
"storage_path": "collection/workflow/0001",
"name": "workflow_name",
"item": "original item",
"index": 0
}
],
"total_processed": 10,
"total_found": 10,
"total_errors": 0,
"errors": [],
"extraction_config": {
"extract_keys": {...},
"include_metadata": true,
"max_parallel": 10
}
}
}

Common Patterns

Fan-Out / Fan-In Pattern

Process items in parallel and aggregate results:

{
"steps": ["fan_out", "fan_in", "summarize"],
"step_configs": {
"fan_out": {
"activity": "list_emit_await",
"items_path": "init_params.items",
"task_reference": {
"task_name": "process_single_item"
},
"data_mapping": {
"item": "{{ $item }}"
}
},
"fan_in": {
"activity": "extract_from_trajectories",
"trajectory_list_path": "fan_out.outputs.trajectory_references",
"extract_keys": {
"result": "steps.process.outputs.result",
"score": "steps.process.outputs.score"
}
},
"summarize": {
"activity": "litellm_chat",
"prompt_path": "fan_in.outputs.extracted_data",
"system_prompt": "Summarize these results..."
}
}
}

Batch Evaluation Pipeline

Run evaluations on multiple test cases:

{
"steps": ["load_data", "evaluate_all", "collect_scores", "analyze"],
"step_configs": {
"load_data": {
"activity": "read_text_file",
"text_path": "init_params.test_file"
},
"evaluate_all": {
"activity": "list_emit_await",
"items_path": "load_data.outputs.json.test_cases",
"task_reference": {
"task_name": "single_evaluation"
},
"data_mapping": {
"input": "{{ $item.input }}",
"expected": "{{ $item.expected }}"
},
"execution_config": {
"max_parallel": 10
}
},
"collect_scores": {
"activity": "extract_from_trajectories",
"trajectory_list_path": "evaluate_all.outputs.trajectory_references",
"extract_keys": {
"score": "steps.judge.outputs.score",
"passed": "steps.verify.outputs.passed"
}
},
"analyze": {
"activity": "simple_judge",
"instruction": "Analyze these evaluation results and provide a summary.",
"item_path": "collect_scores.outputs.extracted_data"
}
}
}

Multi-Model Comparison

Compare multiple models on the same inputs:

{
"steps": ["run_models", "collect_results"],
"init_params": {
"models": ["gpt-4o", "claude-3-5-sonnet-20241022", "gemini-pro"],
"test_prompt": "Write a poem about coding"
},
"step_configs": {
"run_models": {
"activity": "list_emit_await",
"items_path": "init_params.models",
"task_reference": {
"task_name": "generate_with_model"
},
"data_mapping": {
"model": "{{ $item }}",
"prompt": "{{ init_params.test_prompt }}"
}
},
"collect_results": {
"activity": "extract_from_trajectories",
"trajectory_list_path": "run_models.outputs.trajectory_references",
"extract_keys": {
"model": "init_params.model",
"output": "steps.generate.outputs.text",
"tokens": "steps.generate.outputs.token_count"
}
}
}
}

Best Practices

Concurrency Management

  • Start with max_parallel: 5 and adjust based on downstream capacity
  • Consider API rate limits when processing many items
  • Use fail_fast: true for critical pipelines where any failure should stop processing

Timeout Handling

  • Set realistic timeouts based on child workflow complexity
  • Enable collect_partial_results to recover completed work on timeout
  • Monitor statistics.timeout_occurred to detect timing issues

Error Handling

  • Use continue_on_error: true for batch jobs that can tolerate partial failures
  • Check failed_workflows array for error details
  • Implement retry logic for transient failures

Data Mapping

  • Keep templates simple and readable
  • Use path expressions for accessing parent context
  • Validate data mapping with small test batches first