Python API¶
Ralphify can be used as a Python library, not just a CLI. This is useful when you want to:
- Embed the loop in a larger automation pipeline
- Build custom orchestration on top of ralphify
- Listen to events and react programmatically (e.g. send Slack alerts on failures)
- Script runs with more control than the CLI provides
Quick start¶
from ralphify import run_loop, RunConfig, RunState
config = RunConfig(
command="claude",
args=["-p", "--dangerously-skip-permissions"],
prompt_file="RALPH.md",
max_iterations=3,
)
state = RunState(run_id="my-run")
run_loop(config, state)
This runs the same loop as ralph run -n 3, using RALPH.md as the prompt. When the loop finishes, state contains the results.
Core function¶
run_loop(config, state, emitter=None)¶
The main loop. Discovers primitives, assembles prompts, pipes them to the agent, runs checks, and repeats.
from ralphify import run_loop, RunConfig, RunState, NullEmitter
config = RunConfig(
command="claude",
args=["-p", "--dangerously-skip-permissions"],
prompt_file="RALPH.md",
max_iterations=5,
stop_on_error=True,
timeout=300,
log_dir="ralph_logs",
)
state = RunState(run_id="build-features")
run_loop(config, state)
print(f"Completed: {state.completed}")
print(f"Failed: {state.failed}")
print(f"Total: {state.total}")
| Parameter | Type | Description |
|---|---|---|
config |
RunConfig |
All settings for the run |
state |
RunState |
Observable state — counters, status, control methods |
emitter |
EventEmitter | None |
Event listener. None uses NullEmitter (silent) |
The function blocks until the loop finishes (iteration limit reached, stop requested, error, or KeyboardInterrupt).
Configuration¶
RunConfig¶
A dataclass with all run settings. Fields match the CLI options.
from pathlib import Path
from ralphify import RunConfig
config = RunConfig(
command="claude",
args=["-p", "--dangerously-skip-permissions"],
prompt_file="RALPH.md",
prompt_text=None, # Ad-hoc prompt text (overrides prompt_file)
prompt_name=None, # Named ralph from .ralphify/ralphs/
max_iterations=10,
delay=2.0,
timeout=300,
stop_on_error=True,
log_dir="ralph_logs",
project_root=Path("."),
)
| Field | Type | Default | CLI equivalent |
|---|---|---|---|
command |
str |
— | ralph.toml [agent] command |
args |
list[str] |
— | ralph.toml [agent] args |
prompt_file |
str |
— | ralph.toml [agent] ralph or -f |
prompt_text |
str | None |
None |
-p |
prompt_name |
str | None |
None |
ralph run <name> |
max_iterations |
int | None |
None |
-n |
delay |
float |
0 |
-d / --delay |
timeout |
float | None |
None |
-t / --timeout |
stop_on_error |
bool |
False |
-s / --stop-on-error |
log_dir |
str | None |
None |
-l / --log-dir |
project_root |
Path |
Path(".") |
Working directory |
RunConfig is mutable — you can change fields mid-run (e.g. increase max_iterations), and the loop picks up changes at the next iteration boundary.
RunState¶
Observable state for a running loop. Created with a run_id and updated by the engine as iterations execute.
from ralphify import RunState
state = RunState(run_id="my-run")
# After run_loop() finishes:
print(state.status) # RunStatus.COMPLETED
print(state.iteration) # 5 (last iteration number)
print(state.completed) # 4
print(state.failed) # 1
print(state.timed_out) # 0 (subset of failed)
print(state.total) # 5 (completed + failed)
print(state.started_at) # datetime (UTC)
| Property / Field | Type | Description |
|---|---|---|
run_id |
str |
Unique identifier for this run |
status |
RunStatus |
Current lifecycle status |
iteration |
int |
Current iteration number (1-indexed) |
completed |
int |
Iterations that succeeded |
failed |
int |
Iterations that failed (includes timed out) |
timed_out |
int |
Iterations that timed out (subset of failed) |
total |
int |
completed + failed |
started_at |
datetime | None |
UTC timestamp when the run started |
Control methods¶
RunState provides thread-safe methods to control the loop from another thread:
state.request_stop() # Stop after current iteration
state.request_pause() # Pause between iterations
state.request_resume() # Resume a paused loop
state.request_reload() # Re-discover primitives before next iteration
state.stop_requested # bool — whether stop was requested
state.paused # bool — whether currently paused
These are useful when running the loop in a background thread (see Multi-run management below).
RunStatus¶
Enum representing the lifecycle of a run.
| Value | Description |
|---|---|
PENDING |
Created but not started |
RUNNING |
Loop is executing iterations |
PAUSED |
Paused between iterations |
STOPPED |
Stopped by user request |
COMPLETED |
Reached iteration limit or finished naturally |
FAILED |
Crashed with an exception |
Event system¶
The loop emits structured events so you can observe progress without coupling to the engine internals.
Listening to events¶
Implement the EventEmitter protocol — a single emit(event) method:
from ralphify import Event, EventEmitter, EventType, RunConfig, RunState, run_loop
class MyEmitter:
"""Custom event listener that prints iteration results."""
def emit(self, event: Event) -> None:
if event.type == EventType.ITERATION_COMPLETED:
duration = event.data["duration_formatted"]
print(f"Iteration {event.data['iteration']} completed ({duration})")
elif event.type == EventType.ITERATION_FAILED:
print(f"Iteration {event.data['iteration']} failed (exit {event.data['returncode']})")
elif event.type == EventType.CHECK_FAILED:
print(f" Check '{event.data['name']}' failed")
config = RunConfig(command="claude", args=["-p"], prompt_file="RALPH.md", max_iterations=3)
state = RunState(run_id="observed-run")
run_loop(config, state, emitter=MyEmitter())
Event¶
Every event has these fields:
| Field | Type | Description |
|---|---|---|
type |
EventType |
What happened |
run_id |
str |
Which run produced this event |
data |
dict |
Event-specific data |
timestamp |
datetime |
UTC timestamp |
Use event.to_dict() to serialize for JSON transport.
EventType¶
All event types the loop can emit. Every event has type, run_id, timestamp, and data fields.
Run lifecycle¶
| Event type | When | Data fields |
|---|---|---|
RUN_STARTED |
Run begins | checks, contexts, instructions (int counts), max_iterations, timeout, delay, prompt_name |
RUN_STOPPED |
Run ends for any reason | reason ("completed", "user_requested", "error"), total, completed, failed, timed_out |
RUN_PAUSED |
Run is paused | — |
RUN_RESUMED |
Run is resumed | — |
Iteration lifecycle¶
| Event type | When | Data fields |
|---|---|---|
ITERATION_STARTED |
Iteration begins | iteration |
ITERATION_COMPLETED |
Agent exits with code 0 | iteration, returncode, duration (seconds), duration_formatted, detail, log_file, result_text |
ITERATION_FAILED |
Agent exits non-zero | iteration, returncode, duration, duration_formatted, detail, log_file, result_text |
ITERATION_TIMED_OUT |
Agent exceeds timeout | iteration, returncode (null), duration, duration_formatted, detail, log_file, result_text |
Checks¶
| Event type | When | Data fields |
|---|---|---|
CHECKS_STARTED |
Check phase begins | iteration, count |
CHECK_PASSED |
A single check passes | iteration, name, passed, exit_code, timed_out, output |
CHECK_FAILED |
A single check fails | iteration, name, passed, exit_code, timed_out, output |
CHECKS_COMPLETED |
All checks finish | iteration, passed, failed, results (array of {name, passed, exit_code, timed_out, output}) |
Prompt assembly¶
| Event type | When | Data fields |
|---|---|---|
CONTEXTS_RESOLVED |
Contexts injected into prompt | iteration, count |
PROMPT_ASSEMBLED |
Full prompt built | iteration, prompt_length |
Other¶
| Event type | When | Data fields |
|---|---|---|
AGENT_ACTIVITY |
Each line of streamed agent output (Claude Code only) | raw (dict — one parsed JSON line from the agent's stream-json output), iteration |
PRIMITIVES_RELOADED |
Primitives re-discovered mid-run | checks, contexts, instructions (int counts) |
LOG_MESSAGE |
General log from the engine | message, level ("info", "error"), traceback (optional) |
Built-in emitters¶
| Emitter | Description | Use case |
|---|---|---|
NullEmitter |
Discards all events | Tests, silent runs |
QueueEmitter |
Pushes events into a queue.Queue |
Async consumers, UI layers |
FanoutEmitter |
Broadcasts to multiple emitters | Combining logging + monitoring |
from ralphify import QueueEmitter, FanoutEmitter
# Queue for async consumption
q_emitter = QueueEmitter()
# Combine multiple listeners
fanout = FanoutEmitter([q_emitter, MyEmitter()])
run_loop(config, state, emitter=fanout)
# Drain events from the queue
while not q_emitter.queue.empty():
event = q_emitter.queue.get()
print(event.to_dict())
Multi-run management¶
RunManager orchestrates concurrent runs in background threads.
from ralphify import RunManager, RunConfig
manager = RunManager()
# Create and start a run
config = RunConfig(
command="claude",
args=["-p", "--dangerously-skip-permissions"],
prompt_file="RALPH.md",
max_iterations=5,
)
managed = manager.create_run(config)
manager.start_run(managed.state.run_id)
# Check status
print(managed.state.status) # RunStatus.RUNNING
print(managed.state.completed) # 2
# Control the run
manager.pause_run(managed.state.run_id)
manager.resume_run(managed.state.run_id)
manager.stop_run(managed.state.run_id)
# List all runs
for run in manager.list_runs():
print(f"{run.state.run_id}: {run.state.status.value}")
RunManager methods¶
| Method | Description |
|---|---|
create_run(config) |
Register a new run (assigns a unique ID). Does not start it |
start_run(run_id) |
Start the run in a background daemon thread |
stop_run(run_id) |
Signal the run to stop after the current iteration |
pause_run(run_id) |
Pause the run between iterations |
resume_run(run_id) |
Resume a paused run |
list_runs() |
Return all registered runs |
get_run(run_id) |
Look up a run by ID (returns None if not found) |
ManagedRun¶
A run wrapped with its thread and event queue.
| Field | Type | Description |
|---|---|---|
config |
RunConfig |
The run's configuration |
state |
RunState |
Observable state and control methods |
emitter |
QueueEmitter |
Event queue for this run |
thread |
Thread | None |
The background thread (set after start_run) |
Use managed.add_listener(emitter) to register additional event listeners before starting the run.
Primitive discovery¶
Discover checks, contexts, instructions, and ralphs without running the loop.
from pathlib import Path
from ralphify import discover_checks, discover_contexts, discover_instructions, discover_ralphs
root = Path(".")
checks = discover_checks(root)
for check in checks:
print(f"Check: {check.name}, command: {check.command}, enabled: {check.enabled}")
contexts = discover_contexts(root)
for ctx in contexts:
print(f"Context: {ctx.name}, command: {ctx.command}")
instructions = discover_instructions(root)
for inst in instructions:
print(f"Instruction: {inst.name}, enabled: {inst.enabled}")
ralphs = discover_ralphs(root)
for ralph in ralphs:
print(f"Ralph: {ralph.name}, description: {ralph.description}")
Running primitives¶
from ralphify import run_all_checks, run_all_contexts
# Run all checks and get results
enabled_checks = [c for c in discover_checks(root) if c.enabled]
results = run_all_checks(enabled_checks, root)
for r in results:
print(f" {r.check.name}: {'PASS' if r.passed else 'FAIL'} (exit {r.exit_code})")
# Run all contexts and get output
enabled_contexts = [c for c in discover_contexts(root) if c.enabled]
context_results = run_all_contexts(enabled_contexts, root)
for cr in context_results:
print(f" {cr.context.name}: {len(cr.output)} chars of output")
Resolving ralphs¶
from ralphify import resolve_ralph_name
# Look up a named ralph and get the path to its RALPH.md
ralph_path = resolve_ralph_name("docs", Path("."))
if ralph_path:
print(f"Found: {ralph_path}")
else:
print("Named ralph 'docs' not found")
Example: Slack notification on failure¶
A practical example combining the API with external integrations:
import requests
from ralphify import Event, EventType, RunConfig, RunState, run_loop
class SlackNotifier:
"""Send a Slack message when a run finishes with failures."""
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
def emit(self, event: Event) -> None:
if event.type != EventType.RUN_STOPPED:
return
failed = event.data.get("failed", 0)
if failed == 0:
return
total = event.data.get("total", 0)
completed = event.data.get("completed", 0)
requests.post(self.webhook_url, json={
"text": f"Ralph loop finished: {completed}/{total} passed, {failed} failed."
})
config = RunConfig(
command="claude",
args=["-p", "--dangerously-skip-permissions"],
prompt_file="RALPH.md",
max_iterations=10,
)
state = RunState(run_id="monitored-run")
notifier = SlackNotifier("https://hooks.slack.com/services/YOUR/WEBHOOK/URL")
run_loop(config, state, emitter=notifier)
Imports¶
Everything is available from the top-level ralphify package:
from ralphify import (
# Core
run_loop,
# Configuration
RunConfig,
RunState,
RunStatus,
# Events
Event,
EventEmitter,
EventType,
FanoutEmitter,
NullEmitter,
QueueEmitter,
# Multi-run management
ManagedRun,
RunManager,
# Primitive discovery
discover_checks,
run_all_checks,
discover_contexts,
run_all_contexts,
discover_instructions,
discover_ralphs,
resolve_ralph_name,
)