Skip to main content

Steps

Steps are atomic workflow operations that represent the smallest unit of work in Jetty workflows. They are implemented as reusable functions that can be combined to create complex multi-step processes, from AI model interactions to data transformations and evaluations.

What are Steps?

Steps are the building blocks of Jetty workflows that provide:

  • Atomic Operations: Single-purpose functions that perform specific tasks
  • Reusable Components: Modular functions that can be used across multiple workflows
  • Standardized Interface: Consistent input/output patterns for seamless composition
  • Rich Functionality: Extensive library covering AI models, data processing, and utility functions

Each step is executed by an Activity within a workflow and contributes to the overall Trajectory state.

Step Architecture

Step Function Structure

All steps in Jetty follow a standardized implementation pattern as StepFunction classes:

class MyStep(StepFunction):
"""Description of what this step does."""
activity_name = "my_step_activity"

async def __call__(
self,
trajectory: Trajectory,
config: dict[str, Any],
init_params: dict
) -> Step:
# Step implementation logic
# Process inputs, execute operations, return results
return Step(
inputs={"input_param": input_value},
outputs={"output_param": output_value}
)

Core Components

activity_name (required)

Unique identifier for the step that maps to flow configurations:

class TextAnalysis(StepFunction):
activity_name = "text_analysis" # Referenced in flow step_configs

**call method** (required)

The main execution function that implements the step logic:

async def __call__(self, trajectory: Trajectory, config: dict[str, Any], init_params: dict) -> Step:
# Extract inputs from configuration or trajectory
text = config.get("text", init_params.get("default_text"))

# Perform step operations
result = await process_text(text)

# Return structured step result
return Step(
inputs={"text": text},
outputs={"processed_text": result, "word_count": len(text.split())}
)

Input Parameters

trajectory: Current workflow state containing previous step results and configuration config: Step-specific configuration from the flow's step_configs section init_params: Workflow initialization parameters from the flow's init_params section

Output Structure

Steps return a Step object containing:

Step(
inputs={"param1": value1, "param2": value2}, # Input values used by the step
outputs={"result1": output1, "result2": output2} # Results produced by the step
)

Step Library Organization

The step library is organized into thematic categories, each providing specialized functionality:

AI Model Integration Steps

Gemini Steps (gemini_steps)

Google's Gemini AI model integration for text generation and analysis:

# Example: Text generation with Gemini
{
"step_configs": {
"generate_content": {
"activity": "gemini_text_generation",
"model": "gemini-pro",
"prompt": "Write a summary of: {{init_params.text}}",
"temperature": 0.7,
"max_tokens": 500
}
}
}

LiteLLM Steps (litellm_steps)

Multi-provider LLM integration supporting OpenAI, Anthropic, and other providers:

# Example: Cross-provider chat completion
{
"step_configs": {
"chat_analysis": {
"activity": "litellm_chat",
"model": "gpt-4",
"messages": [
{"role": "user", "content": "Analyze this text: {{init_params.text}}"}
],
"temperature": 0.3
}
}
}

Replicate Steps (replicate_steps)

Integration with Replicate's model hosting platform:

# Example: Image generation via Replicate
{
"step_configs": {
"create_image": {
"activity": "replicate_prediction",
"model": "stability-ai/stable-diffusion",
"input": {
"prompt": "{{init_params.image_prompt}}",
"num_outputs": 1
}
}
}
}

Data Processing Steps

Pandas Steps (pandas_steps)

Data manipulation and analysis using pandas DataFrames:

# Example: CSV processing and analysis
{
"step_configs": {
"data_analysis": {
"activity": "pandas_processor",
"operations": ["load_csv", "clean_data", "calculate_stats"],
"input_file": "{{init_params.data_file}}"
}
}
}

Tools Steps (tools_steps)

General-purpose utility functions for common operations:

# Example: File processing utilities
{
"step_configs": {
"process_files": {
"activity": "file_processor",
"operation": "merge_documents",
"input_format": "pdf",
"output_format": "text"
}
}
}

Evaluation and Testing Steps

Evaluation Steps (evals_steps)

Framework for evaluating AI model outputs and workflow results:

{
"step_configs": {
"evaluate_quality": {
"activity": "quality_evaluator",
"metrics": ["accuracy", "relevance", "coherence"],
"reference_data": "{{init_params.ground_truth}}"
}
}
}

AgentBench Steps (agentbench_steps)

Specialized evaluation framework for AI agent performance:

{
"step_configs": {
"benchmark_agent": {
"activity": "agentbench_evaluator",
"test_suite": "coding_tasks",
"difficulty_level": "intermediate"
}
}
}

Verdict Steps (verdict_steps)

Advanced evaluation system for complex assessment workflows:

{
"step_configs": {
"comprehensive_verdict": {
"activity": "verdict_evaluator",
"criteria": ["technical_accuracy", "clarity", "completeness"],
"scoring_method": "weighted_average"
}
}
}

Harbor Terminal Bench Steps (harbor_terminal_bench_steps)

Container-based agent evaluation framework:

{
"step_configs": {
"evaluate_agent": {
"activity": "harbor_terminal_bench",
"agent_path": "init_params.agent",
"model_path": "init_params.model",
"dataset_path": "init_params.dataset"
}
}
}

Utility and Development Steps

Toy Steps (toy_steps)

Simple demonstration and testing steps for development:

{
"step_configs": {
"double_text": {
"activity": "text_doubler",
"text_path": "init_params.text",
"sleep_time": "1.0"
},
"compliance_check": {
"activity": "random_compliance_check"
}
}
}

Step Data Flow

Input Resolution

Steps can access data from multiple sources:

# From workflow initialization parameters
text = init_params.get("input_text")

# From step configuration
model = config.get("model", "default-model")

# From previous step outputs using step utilities
previous_result = await step_utils.extract_step_value_async(
trajectory,
"previous_step.outputs.result"
)

Parameter Extraction

Use the step_utils module for consistent data extraction:

from mise.flows import step_utils

# Extract value from trajectory using path notation
value = await step_utils.extract_step_value_async(
trajectory,
"init_params.user_input" # or "step_name.outputs.result"
)

Output Generation

Steps produce structured outputs that become available to subsequent steps:

return Step(
inputs={
"source_text": input_text,
"processing_mode": mode
},
outputs={
"processed_text": result_text,
"word_count": len(result_text.split()),
"processing_time": execution_time,
"metadata": {
"model_version": "v1.2.3",
"confidence_score": 0.95
}
}
)

Step Implementation Examples

Simple Text Processing Step

class TextProcessor(StepFunction):
"""Process text input with various transformations."""
activity_name = "text_processor"

async def __call__(self, trajectory: Trajectory, config: dict[str, Any], init_params: dict) -> Step:
# Extract input text
text_path = config.get("text_path", "init_params.text")
text = await step_utils.extract_step_value_async(trajectory, text_path)

# Get processing options
operation = config.get("operation", "uppercase")

# Process text based on operation
if operation == "uppercase":
result = text.upper()
elif operation == "lowercase":
result = text.lower()
elif operation == "reverse":
result = text[::-1]
else:
result = text

return Step(
inputs={"text": text, "operation": operation},
outputs={"processed_text": result, "original_length": len(text)}
)

AI Model Integration Step

class AIAnalysis(StepFunction):
"""Analyze text using an AI model."""
activity_name = "ai_analysis"

async def __call__(self, trajectory: Trajectory, config: dict[str, Any], init_params: dict) -> Step:
# Extract configuration
model_name = config.get("model", "gemini-pro")
prompt = config.get("prompt", "Analyze this text")
temperature = config.get("temperature", 0.7)

# Get input text
text = await step_utils.extract_step_value_async(
trajectory,
config.get("text_path", "init_params.text")
)

# Call AI model (implementation depends on model provider)
analysis_result = await call_ai_model(
model=model_name,
prompt=f"{prompt}: {text}",
temperature=temperature
)

return Step(
inputs={
"text": text,
"model": model_name,
"prompt": prompt,
"temperature": temperature
},
outputs={
"analysis": analysis_result,
"model_response": analysis_result.get("content"),
"confidence": analysis_result.get("confidence", 0.0),
"token_usage": analysis_result.get("usage", {})
}
)

Data Processing Step with File Handling

class DataProcessor(StepFunction):
"""Process data files with pandas operations."""
activity_name = "data_processor"

async def __call__(self, trajectory: Trajectory, config: dict[str, Any], init_params: dict) -> Step:
import pandas as pd

# Get file path from configuration or previous step
file_path = config.get("file_path") or await step_utils.extract_step_value_async(
trajectory, "init_params.data_file"
)

# Load data
if file_path.endswith('.csv'):
df = pd.read_csv(file_path)
elif file_path.endswith('.json'):
df = pd.read_json(file_path)
else:
raise ValueError(f"Unsupported file format: {file_path}")

# Perform operations
operations = config.get("operations", ["describe"])
results = {}

if "describe" in operations:
results["description"] = df.describe().to_dict()
if "shape" in operations:
results["shape"] = {"rows": df.shape[0], "columns": df.shape[1]}
if "columns" in operations:
results["columns"] = df.columns.tolist()

return Step(
inputs={
"file_path": file_path,
"operations": operations
},
outputs={
"results": results,
"row_count": len(df),
"column_count": len(df.columns)
}
)

Step Registration

Steps are registered in the step library for use in workflows:

# In step module (e.g., mise/flows/steps/my_steps.py)
MY_STEPS = {
TextProcessor.activity_name: TextProcessor(),
AIAnalysis.activity_name: AIAnalysis(),
DataProcessor.activity_name: DataProcessor()
}

# In step_library.py
from mise.flows.steps import my_steps

STEP_LIBRARY = {
**existing_steps,
**my_steps.MY_STEPS,
# ... other step collections
}

Error Handling in Steps

Graceful Error Handling

class RobustStep(StepFunction):
activity_name = "robust_step"

async def __call__(self, trajectory: Trajectory, config: dict[str, Any], init_params: dict) -> Step:
try:
# Primary operation
result = await perform_operation(config)

return Step(
inputs={"config": config},
outputs={"result": result, "success": True}
)

except SpecificError as e:
# Handle known error types with fallback
logger.warning(f"Primary operation failed: {e}, using fallback")
fallback_result = await fallback_operation(config)

return Step(
inputs={"config": config},
outputs={
"result": fallback_result,
"success": False,
"error": str(e),
"fallback_used": True
}
)

except Exception as e:
# Log error and re-raise for activity retry handling
logger.error(f"Step execution failed: {e}")
raise # Let the workflow engine handle retries

Validation and Input Checking

class ValidatedStep(StepFunction):
activity_name = "validated_step"

async def __call__(self, trajectory: Trajectory, config: dict[str, Any], init_params: dict) -> Step:
# Validate required configuration
required_params = ["input_text", "operation_type"]
for param in required_params:
if param not in config:
raise ValueError(f"Missing required parameter: {param}")

# Validate input values
operation = config["operation_type"]
if operation not in ["analyze", "transform", "evaluate"]:
raise ValueError(f"Invalid operation type: {operation}")

# Proceed with validated inputs
result = await execute_operation(config)

return Step(
inputs=config,
outputs={"result": result, "validation_passed": True}
)

Step Testing and Development

Unit Testing Steps

import pytest
from mise.flows.steps.my_steps import TextProcessor
from mise import types as mise_types

@pytest.mark.asyncio
async def test_text_processor():
# Create test trajectory
trajectory = mise_types.Trajectory(
name="test_trajectory",
init_params={"text": "hello world"}
)

# Create step configuration
config = {"operation": "uppercase"}

# Execute step
step_instance = TextProcessor()
result = await step_instance(trajectory, config, trajectory.init_params)

# Verify results
assert result.outputs["processed_text"] == "HELLO WORLD"
assert result.outputs["original_length"] == 11
assert result.inputs["operation"] == "uppercase"

Integration Testing

@pytest.mark.asyncio
async def test_step_integration():
"""Test step integration within workflow context."""
# Create trajectory with previous step results
trajectory = mise_types.Trajectory(
name="integration_test",
init_params={"text": "test input"},
steps={
"previous_step": mise_types.Step(
inputs={"data": "input"},
outputs={"processed_data": "output"}
)
}
)

# Test step that depends on previous results
config = {"text_path": "previous_step.outputs.processed_data"}
step_instance = TextProcessor()
result = await step_instance(trajectory, config, trajectory.init_params)

# Verify step accessed previous results correctly
assert result.inputs["text"] == "output"

Best Practices

Step Design Principles

  1. Single Responsibility: Each step should have one clear purpose
  2. Idempotent Operations: Steps should produce consistent results when run multiple times
  3. Clear Input/Output: Use descriptive parameter names and document expected formats
  4. Error Handling: Implement appropriate error handling and recovery strategies
  5. Resource Management: Clean up resources and handle timeouts appropriately

Configuration Management

# Use descriptive configuration with sensible defaults
config = {
"model": config.get("model", "gemini-pro"), # Clear default
"temperature": config.get("temperature", 0.7), # Reasonable default
"max_tokens": config.get("max_tokens", 1000), # Safe limit
"timeout": config.get("timeout", 30) # Prevent hanging
}

Output Structure

# Provide rich, structured outputs
return Step(
inputs={
"user_query": query,
"model_config": model_config
},
outputs={
# Primary results
"response": model_response,
"confidence": confidence_score,

# Metadata for debugging and monitoring
"model_version": model_info.version,
"processing_time": end_time - start_time,
"token_usage": {
"input_tokens": usage.input_tokens,
"output_tokens": usage.output_tokens
},

# Status information
"success": True,
"warnings": warnings_list if warnings_list else None
}
)

Next Steps

  • Learn about Architecture Overview for deeper technical details about step execution
  • Explore the Step Library to see available workflow operations
  • Check out Guides to see steps in action across different use cases

Ready to build workflows? Continue to the Step Library to discover available operations.