Workflows & Executions
Build workflow pipelines, execute them, track progress, inspect node-level traces, and manage versions.
Overview
Three resources work together:
- Workflows (
client.workflows) -- Create, configure, deploy, version, and share workflow pipelines. - Executions (
client.executions) -- Run workflows and inspect execution history, node spans, logs, and progress. - Workflow Nodes (
client.workflow_nodes) -- Browse the node catalog and discover available services (models, addons, data sources).
Workflows
Basic Usage
from strongly import Strongly
client = Strongly()
# List active workflows
for wf in client.workflows.list(status="active"):
print(f"{wf.name} (ID: {wf.id})")
# Get a specific workflow
wf = client.workflows.retrieve("wf-abc123")
print(f"{wf.name} — {len(wf.nodes)} nodes, {len(wf.connections)} connections")
Creating a Workflow
from strongly import Strongly
client = Strongly()
result = client.workflows.create({
"name": "daily-etl",
"description": "Extracts data, transforms, and loads into the warehouse",
"tags": ["etl", "production"],
"settings": {
"timeout": 3600,
"retry_count": 2,
},
"nodes": [
{
"id": "node-1",
"type": "postgresql-source",
"label": "Extract Orders",
"config": {
"query": "SELECT * FROM orders WHERE date = CURRENT_DATE",
},
},
{
"id": "node-2",
"type": "python-transform",
"label": "Clean Data",
"config": {
"script": "output = [r for r in input_data if r['amount'] > 0]",
},
},
{
"id": "node-3",
"type": "s3-dest",
"label": "Upload to S3",
"config": {
"bucket": "analytics-output",
"key_prefix": "daily/orders/",
},
},
],
"connections": [
{"source": "node-1", "target": "node-2"},
{"source": "node-2", "target": "node-3"},
],
})
print(f"Created workflow: {result.id}")
Updating a Workflow
from strongly import Strongly
client = Strongly()
updated = client.workflows.update("wf-abc123", {
"description": "Updated ETL pipeline with error handling",
"tags": ["etl", "production", "v2"],
})
print(f"Updated: {updated.name}")
Deploying and Undeploying
Deploy a workflow to make it available for scheduled or triggered execution:
from strongly import Strongly
client = Strongly()
# Deploy the workflow
client.workflows.deploy("wf-abc123")
# Undeploy when no longer needed
client.workflows.undeploy("wf-abc123")
Changing Status
from strongly import Strongly
client = Strongly()
# Pause a workflow
client.workflows.update_status("wf-abc123", status="paused")
# Reactivate it
client.workflows.update_status("wf-abc123", status="active")
Duplicating a Workflow
from strongly import Strongly
client = Strongly()
copy = client.workflows.duplicate("wf-abc123")
print(f"Duplicate ID: {copy['id']}")
Deleting a Workflow
from strongly import Strongly
client = Strongly()
client.workflows.delete("wf-abc123")
Version Management
Tag workflow snapshots and track change history:
from strongly import Strongly
client = Strongly()
# Create a version tag
client.workflows.create_version(
"wf-abc123",
version_tag="v1.0",
description="Initial production release",
)
# List all versions
version_info = client.workflows.versions("wf-abc123")
print(f"Current version: {version_info.current_version}")
for v in version_info.versions:
print(f" {v.version_tag} — {v.description}")
WorkflowVersionInfo Fields
| Field | Type | Description |
|---|---|---|
workflow_id | str | Workflow identifier |
current_version | str | Currently active version tag |
versions | list[WorkflowVersion] | List of all versions |
Sharing
Share workflows with other users in your organization:
from strongly import Strongly
client = Strongly()
# Share with a user
client.workflows.share("wf-abc123", user_id="user-xyz", permission="edit")
# List shared users
shared = client.workflows.shared_users("wf-abc123")
print(shared)
# Revoke access
client.workflows.unshare("wf-abc123", user_id="user-xyz")
Templates and Stats
from strongly import Strongly
client = Strongly()
# List available workflow templates
templates = client.workflows.templates()
for t in templates:
print(f"{t.name} — {t.description}")
# Get workflow stats for your organization
stats = client.workflows.stats()
print(f"Total: {stats.total}")
print(f"Active: {stats.active}")
print(f"Paused: {stats.paused}")
print(f"Draft: {stats.draft}")
print(f"Archived: {stats.archived}")
WorkflowStats Fields
| Field | Type | Description |
|---|---|---|
total | int | Total workflows |
active | int | Active workflows |
paused | int | Paused workflows |
draft | int | Draft workflows |
archived | int | Archived workflows |
Filtering and Searching
from strongly import Strongly
client = Strongly()
# Filter by status
for wf in client.workflows.list(status="active"):
print(wf.name)
# Filter by tag
for wf in client.workflows.list(tag="production"):
print(wf.name)
# Search by name
for wf in client.workflows.list(search="etl"):
print(f"{wf.name} — {wf.status}")
# Get all as a list
all_workflows = client.workflows.list().to_list()
Workflow Model
| Field | Type | Description |
|---|---|---|
id | str | Unique workflow identifier |
name | str | Workflow name |
description | str | Human-readable description |
status | str | Current status (active, paused, draft, archived) |
version | Any | Current version |
nodes | list | List of node definitions |
connections | list | List of node connections |
config | dict | Workflow-level configuration |
settings | dict | Workflow settings (timeout, retries, etc.) |
tags | list | Tags for organization and filtering |
organization_id | str | Owning organization |
owner_id | str | Owner user ID |
shared_with | list | User IDs the workflow is shared with |
is_public | bool | Whether the workflow is publicly visible |
is_template | bool | Whether the workflow is a reusable template |
created_at | str | Creation timestamp |
updated_at | str | Last update timestamp |
Workflows Method Reference
| Method | Description | Returns |
|---|---|---|
list(*, status=None, search=None, tag=None, limit=50) | List workflows with optional filters | SyncPaginator[Workflow] |
create(body) | Create a new workflow | Workflow |
retrieve(workflow_id) | Get a workflow by ID | Workflow |
update(workflow_id, body) | Update workflow fields | Workflow |
delete(workflow_id) | Delete a workflow | dict |
duplicate(workflow_id) | Duplicate a workflow | dict |
execute(workflow_id, *, config=None) | Execute the workflow | dict |
deploy(workflow_id, **kwargs) | Deploy the workflow | dict |
undeploy(workflow_id) | Undeploy the workflow | dict |
update_status(workflow_id, *, status) | Change workflow status | dict |
versions(workflow_id) | List all versions | WorkflowVersionInfo |
create_version(workflow_id, *, version_tag, description=None) | Create a version tag | dict |
share(workflow_id, *, user_id, permission=None) | Share with a user | dict |
unshare(workflow_id, *, user_id) | Revoke sharing | dict |
shared_users(workflow_id) | List users with access | List[WorkflowSharedUser] |
templates() | List workflow templates | List[Workflow] |
stats() | Get workflow statistics | WorkflowStats |
Executions
Executing a Workflow
from strongly import Strongly
client = Strongly()
# Execute a workflow
result = client.workflows.execute("wf-abc123")
execution_id = result["executionId"]
print(f"Execution started: {execution_id}")
# Execute with runtime config overrides
result = client.workflows.execute("wf-abc123", config={
"batch_size": 500,
"dry_run": False,
})
Listing Executions
from strongly import Strongly
client = Strongly()
# List all executions for a workflow
for ex in client.executions.list(workflow_id="wf-abc123"):
print(f"{ex.id} — {ex.status} — {ex.duration_ms}ms")
# Filter by status
for ex in client.executions.list(status="completed"):
print(f"{ex.id}: {ex.started_at} to {ex.ended_at}")
# Filter by date range
for ex in client.executions.list(since="2025-01-01", until="2025-01-31"):
print(f"{ex.id} — {ex.status}")
# Filter by trigger type
for ex in client.executions.list(trigger_type="manual"):
print(f"{ex.id} — triggered by {ex.user_id}")
Tracking Progress
Monitor a running execution in real time:
from strongly import Strongly
import time
client = Strongly()
result = client.workflows.execute("wf-abc123")
execution_id = result["executionId"]
while True:
progress = client.executions.progress(execution_id)
pct = progress.progress
print(
f" [{pct}%] "
f"{progress.completed_nodes}/{progress.total_nodes} nodes done, "
f"{progress.running_nodes} running, "
f"{progress.failed_nodes} failed"
)
if progress.status in ("completed", "failed", "stopped"):
print(f"\nFinal status: {progress.status}")
if progress.duration_ms:
print(f"Duration: {progress.duration_ms / 1000:.1f}s")
break
time.sleep(2)
ExecutionProgress Fields
| Field | Type | Description |
|---|---|---|
execution_id | str | Execution identifier |
status | str | Current status |
progress | int | Completion percentage (0-100) |
total_nodes | int | Total nodes in the workflow |
completed_nodes | int | Nodes that finished successfully |
failed_nodes | int | Nodes that failed |
running_nodes | int | Nodes currently running |
started_at | str | Execution start timestamp |
ended_at | str | Execution end timestamp |
duration_ms | float | Total duration in milliseconds |
Inspecting Spans
Spans provide node-level execution traces -- input data, output data, timing, and errors:
from strongly import Strongly
client = Strongly()
spans = client.executions.spans("exec-abc123")
for span in spans:
status_icon = "OK" if span.status == "completed" else "FAIL"
print(f"[{status_icon}] {span.node_label} ({span.node_type}) — {span.duration_ms}ms")
if span.error:
print(f" Error: {span.error}")
Filter spans for a specific node:
spans = client.executions.spans("exec-abc123", node_id="node-2")
for span in spans:
print(f"Input: {span.input_data}")
print(f"Output: {span.output_data}")
ExecutionSpan Fields
| Field | Type | Description |
|---|---|---|
id | str | Span identifier |
execution_id | str | Parent execution ID |
node_id | str | Node identifier |
node_label | str | Node display label |
node_type | str | Node type (e.g., python-transform, postgresql-source) |
status | str | Span status (completed, failed, running) |
started_at | str | Span start timestamp |
ended_at | str | Span end timestamp |
duration_ms | float | Duration in milliseconds |
input_data | dict | Data received by the node |
output_data | dict | Data produced by the node |
error | str | Error message if the span failed |
Viewing Logs
from strongly import Strongly
client = Strongly()
# Get all logs
logs = client.executions.logs("exec-abc123")
for log in logs:
print(f"[{log.level}] {log.created_at} — {log.message}")
# Filter by level
errors = client.executions.logs("exec-abc123", level="error")
for log in errors:
print(f"Node {log.node_id}: {log.message}")
# Limit the number of log entries
recent = client.executions.logs("exec-abc123", limit=50)
ExecutionLog Fields
| Field | Type | Description |
|---|---|---|
id | str | Log entry identifier |
execution_id | str | Parent execution ID |
level | str | Log level (info, warn, error, debug) |
message | str | Log message text |
node_id | str | Originating node ID |
created_at | str | Timestamp |
Stopping and Resuming
from strongly import Strongly
client = Strongly()
# Stop a running execution
client.executions.stop("exec-abc123")
# Resume a paused or waiting execution
client.executions.resume("exec-abc123", trigger_data={"approved": True})
Execution Model
| Field | Type | Description |
|---|---|---|
id | str | Unique execution identifier |
workflow_id | str | Source workflow ID |
status | str | Current status (running, completed, failed, stopped) |
started_at | str | Start timestamp |
ended_at | str | End timestamp |
duration_ms | float | Total duration in milliseconds |
trigger | str | Trigger description |
trigger_type | str | Trigger type (manual, schedule, webhook, api) |
user_id | str | User who triggered the execution |
organization_id | str | Organization ID |
config | dict | Runtime configuration overrides |
error | str | Error message if execution failed |
progress | dict | Progress summary |
created_at | str | Creation timestamp |
updated_at | str | Last update timestamp |
Executions Method Reference
| Method | Description | Returns |
|---|---|---|
list(*, workflow_id=None, status=None, since=None, until=None, trigger_type=None, limit=50) | List executions with optional filters | SyncPaginator[Execution] |
retrieve(execution_id) | Get an execution by ID | Execution |
stop(execution_id) | Stop a running execution | dict |
resume(execution_id, *, trigger_data=None) | Resume a paused execution | dict |
spans(execution_id, *, node_id=None) | Get node-level execution traces | List[ExecutionSpan] |
logs(execution_id, *, level=None, limit=None) | Get execution logs | List[ExecutionLog] |
progress(execution_id) | Get real-time progress | ExecutionProgress |
Workflow Nodes
The node catalog lists every node type available in the workflow builder, along with service discovery helpers for models, addons, and data sources.
Browsing the Catalog
from strongly import Strongly
client = Strongly()
# List all available node types
for node in client.workflow_nodes.list():
print(f"{node.id} — {node.category}")
# Filter by category
for node in client.workflow_nodes.list(category="transform"):
print(f"{node.id}: {node.description}")
# Search by name
for node in client.workflow_nodes.list(search="python"):
print(node.id)
# Filter system vs. custom nodes
for node in client.workflow_nodes.list(is_system=True):
print(node.id)
Service Discovery
Discover which models, addons, and data sources are available for use in workflow node configurations:
from strongly import Strongly
client = Strongly()
# List available AI models
models = client.workflow_nodes.services_models()
print(models)
# Filter models by provider
openai_models = client.workflow_nodes.services_models(provider="openai")
print(openai_models)
# List available addons (managed databases, etc.)
addons = client.workflow_nodes.services_addons()
print(addons)
# Filter addons by type
pg_addons = client.workflow_nodes.services_addons(type="postgresql")
print(pg_addons)
# List available data sources
datasources = client.workflow_nodes.services_datasources()
print(datasources)
# Filter data sources by category
db_sources = client.workflow_nodes.services_datasources(category="database")
print(db_sources)
Workflow Nodes Method Reference
| Method | Description | Returns |
|---|---|---|
list(*, search=None, category=None, type=None, is_system=None, limit=50) | List node types | SyncPaginator[WorkflowNode] |
create(body) | Register a custom node type | WorkflowNode |
retrieve(node_id) | Get a node type by ID | WorkflowNode |
update(node_id, body) | Update a node type | WorkflowNode |
delete(node_id) | Delete a custom node type | dict |
services_datasources(*, type=None, category=None) | Discover available data sources | ServiceDataSourcesResponse |
services_addons(*, type=None) | Discover available addons | ServiceAddonsResponse |
services_models(*, provider=None, type=None) | Discover available AI models | ServiceModelsResponse |
Complete Example
from strongly import Strongly
import time
def main():
client = Strongly()
# --- Check workflow stats ---
stats = client.workflows.stats()
print(f"Workflows — Total: {stats.total}, Active: {stats.active}, Draft: {stats.draft}")
# --- Create a workflow ---
print("\nCreating workflow...")
result = client.workflows.create({
"name": "data-pipeline",
"description": "Extract, transform, and load daily orders",
"tags": ["etl", "daily"],
"nodes": [
{
"id": "extract",
"type": "postgresql-source",
"label": "Extract Orders",
"config": {"query": "SELECT * FROM orders WHERE date = CURRENT_DATE"},
},
{
"id": "transform",
"type": "python-transform",
"label": "Clean Data",
"config": {"script": "output = [r for r in input_data if r['status'] == 'complete']"},
},
{
"id": "load",
"type": "s3-dest",
"label": "Upload Results",
"config": {"bucket": "analytics", "key_prefix": "daily/"},
},
],
"connections": [
{"source": "extract", "target": "transform"},
{"source": "transform", "target": "load"},
],
})
wf_id = result.id
print(f"Workflow ID: {wf_id}")
# --- Tag a version ---
print("\nCreating version tag...")
client.workflows.create_version(wf_id, version_tag="v1.0", description="Initial release")
versions = client.workflows.versions(wf_id)
print(f"Current version: {versions.current_version}")
# --- Execute ---
print("\nExecuting workflow...")
exec_result = client.workflows.execute(wf_id)
execution_id = exec_result["executionId"]
print(f"Execution ID: {execution_id}")
# --- Track progress ---
while True:
progress = client.executions.progress(execution_id)
print(
f" [{progress.progress}%] "
f"{progress.completed_nodes}/{progress.total_nodes} complete"
)
if progress.status in ("completed", "failed", "stopped"):
break
time.sleep(2)
print(f"\nFinal status: {progress.status}")
if progress.duration_ms:
print(f"Duration: {progress.duration_ms / 1000:.1f}s")
# --- Inspect spans ---
print("\nNode spans:")
spans = client.executions.spans(execution_id)
for span in spans:
flag = "OK" if span.status == "completed" else "FAIL"
print(f" [{flag}] {span.node_label} ({span.node_type}) — {span.duration_ms}ms")
if span.error:
print(f" Error: {span.error}")
# --- View logs ---
print("\nExecution logs:")
logs = client.executions.logs(execution_id, limit=20)
for log in logs:
print(f" [{log.level}] {log.message}")
# --- Share the workflow ---
print("\nSharing with teammate...")
client.workflows.share(wf_id, user_id="user-teammate-1", permission="view")
shared = client.workflows.shared_users(wf_id)
print(f"Shared with: {shared}")
# --- Browse the node catalog ---
print("\nAvailable transform nodes:")
for node in client.workflow_nodes.list(category="transform"):
print(f" {node.id}")
# --- Service discovery ---
print("\nAvailable AI models for workflows:")
models = client.workflow_nodes.services_models()
print(models)
if __name__ == "__main__":
main()