Steps
Steps are atomic workflow operations that represent the smallest unit of work in Jetty workflows. They are implemented as reusable functions that can be combined to create complex multi-step processes, from AI model interactions to data transformations and evaluations.
What are Steps?
Steps are the building blocks of Jetty workflows that provide:
- Atomic Operations: Single-purpose functions that perform specific tasks
- Reusable Components: Modular functions that can be used across multiple workflows
- Standardized Interface: Consistent input/output patterns for seamless composition
- Rich Functionality: Extensive library covering AI models, data processing, and utility functions
Each step is executed by an Activity within a workflow and contributes to the overall Trajectory state.
Step Architecture
Step Function Structure
All steps in Jetty follow a standardized implementation pattern as StepFunction classes:
class MyStep(StepFunction):
"""Description of what this step does."""
activity_name = "my_step_activity"
async def __call__(
self,
trajectory: Trajectory,
config: dict[str, Any],
init_params: dict
) -> Step:
# Step implementation logic
# Process inputs, execute operations, return results
return Step(
inputs={"input_param": input_value},
outputs={"output_param": output_value}
)
Core Components
activity_name (required)
Unique identifier for the step that maps to flow configurations:
class TextAnalysis(StepFunction):
activity_name = "text_analysis" # Referenced in flow step_configs
**call method** (required)
The main execution function that implements the step logic:
async def __call__(self, trajectory: Trajectory, config: dict[str, Any], init_params: dict) -> Step:
# Extract inputs from configuration or trajectory
text = config.get("text", init_params.get("default_text"))
# Perform step operations
result = await process_text(text)
# Return structured step result
return Step(
inputs={"text": text},
outputs={"processed_text": result, "word_count": len(text.split())}
)
Input Parameters
trajectory: Current workflow state containing previous step results and configuration
config: Step-specific configuration from the flow's step_configs section
init_params: Workflow initialization parameters from the flow's init_params section
Output Structure
Steps return a Step object containing:
Step(
inputs={"param1": value1, "param2": value2}, # Input values used by the step
outputs={"result1": output1, "result2": output2} # Results produced by the step
)
Step Library Organization
The step library is organized into thematic categories, each providing specialized functionality:
AI Model Integration Steps
Gemini Steps (gemini_steps)
Google's Gemini AI model integration for text generation and analysis:
# Example: Text generation with Gemini
{
"step_configs": {
"generate_content": {
"activity": "gemini_text_generation",
"model": "gemini-pro",
"prompt": "Write a summary of: {{init_params.text}}",
"temperature": 0.7,
"max_tokens": 500
}
}
}
LiteLLM Steps (litellm_steps)
Multi-provider LLM integration supporting OpenAI, Anthropic, and other providers:
# Example: Cross-provider chat completion
{
"step_configs": {
"chat_analysis": {
"activity": "litellm_chat",
"model": "gpt-4",
"messages": [
{"role": "user", "content": "Analyze this text: {{init_params.text}}"}
],
"temperature": 0.3
}
}
}
Replicate Steps (replicate_steps)
Integration with Replicate's model hosting platform:
# Example: Image generation via Replicate
{
"step_configs": {
"create_image": {
"activity": "replicate_prediction",
"model": "stability-ai/stable-diffusion",
"input": {
"prompt": "{{init_params.image_prompt}}",
"num_outputs": 1
}
}
}
}
Data Processing Steps
Pandas Steps (pandas_steps)
Data manipulation and analysis using pandas DataFrames:
# Example: CSV processing and analysis
{
"step_configs": {
"data_analysis": {
"activity": "pandas_processor",
"operations": ["load_csv", "clean_data", "calculate_stats"],
"input_file": "{{init_params.data_file}}"
}
}
}
Tools Steps (tools_steps)
General-purpose utility functions for common operations:
# Example: File processing utilities
{
"step_configs": {
"process_files": {
"activity": "file_processor",
"operation": "merge_documents",
"input_format": "pdf",
"output_format": "text"
}
}
}
Evaluation and Testing Steps
Evaluation Steps (evals_steps)
Framework for evaluating AI model outputs and workflow results:
{
"step_configs": {
"evaluate_quality": {
"activity": "quality_evaluator",
"metrics": ["accuracy", "relevance", "coherence"],
"reference_data": "{{init_params.ground_truth}}"
}
}
}
AgentBench Steps (agentbench_steps)
Specialized evaluation framework for AI agent performance:
{
"step_configs": {
"benchmark_agent": {
"activity": "agentbench_evaluator",
"test_suite": "coding_tasks",
"difficulty_level": "intermediate"
}
}
}
Verdict Steps (verdict_steps)
Advanced evaluation system for complex assessment workflows:
{
"step_configs": {
"comprehensive_verdict": {
"activity": "verdict_evaluator",
"criteria": ["technical_accuracy", "clarity", "completeness"],
"scoring_method": "weighted_average"
}
}
}
Harbor Terminal Bench Steps (harbor_terminal_bench_steps)
Container-based agent evaluation framework:
{
"step_configs": {
"evaluate_agent": {
"activity": "harbor_terminal_bench",
"agent_path": "init_params.agent",
"model_path": "init_params.model",
"dataset_path": "init_params.dataset"
}
}
}
Utility and Development Steps
Toy Steps (toy_steps)
Simple demonstration and testing steps for development:
{
"step_configs": {
"double_text": {
"activity": "text_doubler",
"text_path": "init_params.text",
"sleep_time": "1.0"
},
"compliance_check": {
"activity": "random_compliance_check"
}
}
}
Step Data Flow
Input Resolution
Steps can access data from multiple sources:
# From workflow initialization parameters
text = init_params.get("input_text")
# From step configuration
model = config.get("model", "default-model")
# From previous step outputs using step utilities
previous_result = await step_utils.extract_step_value_async(
trajectory,
"previous_step.outputs.result"
)
Parameter Extraction
Use the step_utils module for consistent data extraction:
from mise.flows import step_utils
# Extract value from trajectory using path notation
value = await step_utils.extract_step_value_async(
trajectory,
"init_params.user_input" # or "step_name.outputs.result"
)
Output Generation
Steps produce structured outputs that become available to subsequent steps:
return Step(
inputs={
"source_text": input_text,
"processing_mode": mode
},
outputs={
"processed_text": result_text,
"word_count": len(result_text.split()),
"processing_time": execution_time,
"metadata": {
"model_version": "v1.2.3",
"confidence_score": 0.95
}
}
)
Step Implementation Examples
Simple Text Processing Step
class TextProcessor(StepFunction):
"""Process text input with various transformations."""
activity_name = "text_processor"
async def __call__(self, trajectory: Trajectory, config: dict[str, Any], init_params: dict) -> Step:
# Extract input text
text_path = config.get("text_path", "init_params.text")
text = await step_utils.extract_step_value_async(trajectory, text_path)
# Get processing options
operation = config.get("operation", "uppercase")
# Process text based on operation
if operation == "uppercase":
result = text.upper()
elif operation == "lowercase":
result = text.lower()
elif operation == "reverse":
result = text[::-1]
else:
result = text
return Step(
inputs={"text": text, "operation": operation},
outputs={"processed_text": result, "original_length": len(text)}
)
AI Model Integration Step
class AIAnalysis(StepFunction):
"""Analyze text using an AI model."""
activity_name = "ai_analysis"
async def __call__(self, trajectory: Trajectory, config: dict[str, Any], init_params: dict) -> Step:
# Extract configuration
model_name = config.get("model", "gemini-pro")
prompt = config.get("prompt", "Analyze this text")
temperature = config.get("temperature", 0.7)
# Get input text
text = await step_utils.extract_step_value_async(
trajectory,
config.get("text_path", "init_params.text")
)
# Call AI model (implementation depends on model provider)
analysis_result = await call_ai_model(
model=model_name,
prompt=f"{prompt}: {text}",
temperature=temperature
)
return Step(
inputs={
"text": text,
"model": model_name,
"prompt": prompt,
"temperature": temperature
},
outputs={
"analysis": analysis_result,
"model_response": analysis_result.get("content"),
"confidence": analysis_result.get("confidence", 0.0),
"token_usage": analysis_result.get("usage", {})
}
)
Data Processing Step with File Handling
class DataProcessor(StepFunction):
"""Process data files with pandas operations."""
activity_name = "data_processor"
async def __call__(self, trajectory: Trajectory, config: dict[str, Any], init_params: dict) -> Step:
import pandas as pd
# Get file path from configuration or previous step
file_path = config.get("file_path") or await step_utils.extract_step_value_async(
trajectory, "init_params.data_file"
)
# Load data
if file_path.endswith('.csv'):
df = pd.read_csv(file_path)
elif file_path.endswith('.json'):
df = pd.read_json(file_path)
else:
raise ValueError(f"Unsupported file format: {file_path}")
# Perform operations
operations = config.get("operations", ["describe"])
results = {}
if "describe" in operations:
results["description"] = df.describe().to_dict()
if "shape" in operations:
results["shape"] = {"rows": df.shape[0], "columns": df.shape[1]}
if "columns" in operations:
results["columns"] = df.columns.tolist()
return Step(
inputs={
"file_path": file_path,
"operations": operations
},
outputs={
"results": results,
"row_count": len(df),
"column_count": len(df.columns)
}
)
Step Registration
Steps are registered in the step library for use in workflows:
# In step module (e.g., mise/flows/steps/my_steps.py)
MY_STEPS = {
TextProcessor.activity_name: TextProcessor(),
AIAnalysis.activity_name: AIAnalysis(),
DataProcessor.activity_name: DataProcessor()
}
# In step_library.py
from mise.flows.steps import my_steps
STEP_LIBRARY = {
**existing_steps,
**my_steps.MY_STEPS,
# ... other step collections
}
Error Handling in Steps
Graceful Error Handling
class RobustStep(StepFunction):
activity_name = "robust_step"
async def __call__(self, trajectory: Trajectory, config: dict[str, Any], init_params: dict) -> Step:
try:
# Primary operation
result = await perform_operation(config)
return Step(
inputs={"config": config},
outputs={"result": result, "success": True}
)
except SpecificError as e:
# Handle known error types with fallback
logger.warning(f"Primary operation failed: {e}, using fallback")
fallback_result = await fallback_operation(config)
return Step(
inputs={"config": config},
outputs={
"result": fallback_result,
"success": False,
"error": str(e),
"fallback_used": True
}
)
except Exception as e:
# Log error and re-raise for activity retry handling
logger.error(f"Step execution failed: {e}")
raise # Let the workflow engine handle retries
Validation and Input Checking
class ValidatedStep(StepFunction):
activity_name = "validated_step"
async def __call__(self, trajectory: Trajectory, config: dict[str, Any], init_params: dict) -> Step:
# Validate required configuration
required_params = ["input_text", "operation_type"]
for param in required_params:
if param not in config:
raise ValueError(f"Missing required parameter: {param}")
# Validate input values
operation = config["operation_type"]
if operation not in ["analyze", "transform", "evaluate"]:
raise ValueError(f"Invalid operation type: {operation}")
# Proceed with validated inputs
result = await execute_operation(config)
return Step(
inputs=config,
outputs={"result": result, "validation_passed": True}
)
Step Testing and Development
Unit Testing Steps
import pytest
from mise.flows.steps.my_steps import TextProcessor
from mise import types as mise_types
@pytest.mark.asyncio
async def test_text_processor():
# Create test trajectory
trajectory = mise_types.Trajectory(
name="test_trajectory",
init_params={"text": "hello world"}
)
# Create step configuration
config = {"operation": "uppercase"}
# Execute step
step_instance = TextProcessor()
result = await step_instance(trajectory, config, trajectory.init_params)
# Verify results
assert result.outputs["processed_text"] == "HELLO WORLD"
assert result.outputs["original_length"] == 11
assert result.inputs["operation"] == "uppercase"
Integration Testing
@pytest.mark.asyncio
async def test_step_integration():
"""Test step integration within workflow context."""
# Create trajectory with previous step results
trajectory = mise_types.Trajectory(
name="integration_test",
init_params={"text": "test input"},
steps={
"previous_step": mise_types.Step(
inputs={"data": "input"},
outputs={"processed_data": "output"}
)
}
)
# Test step that depends on previous results
config = {"text_path": "previous_step.outputs.processed_data"}
step_instance = TextProcessor()
result = await step_instance(trajectory, config, trajectory.init_params)
# Verify step accessed previous results correctly
assert result.inputs["text"] == "output"
Best Practices
Step Design Principles
- Single Responsibility: Each step should have one clear purpose
- Idempotent Operations: Steps should produce consistent results when run multiple times
- Clear Input/Output: Use descriptive parameter names and document expected formats
- Error Handling: Implement appropriate error handling and recovery strategies
- Resource Management: Clean up resources and handle timeouts appropriately
Configuration Management
# Use descriptive configuration with sensible defaults
config = {
"model": config.get("model", "gemini-pro"), # Clear default
"temperature": config.get("temperature", 0.7), # Reasonable default
"max_tokens": config.get("max_tokens", 1000), # Safe limit
"timeout": config.get("timeout", 30) # Prevent hanging
}
Output Structure
# Provide rich, structured outputs
return Step(
inputs={
"user_query": query,
"model_config": model_config
},
outputs={
# Primary results
"response": model_response,
"confidence": confidence_score,
# Metadata for debugging and monitoring
"model_version": model_info.version,
"processing_time": end_time - start_time,
"token_usage": {
"input_tokens": usage.input_tokens,
"output_tokens": usage.output_tokens
},
# Status information
"success": True,
"warnings": warnings_list if warnings_list else None
}
)
Next Steps
- Learn about Architecture Overview for deeper technical details about step execution
- Explore the Step Library to see available workflow operations
- Check out Guides to see steps in action across different use cases
Ready to build workflows? Continue to the Step Library to discover available operations.