Skip to main content

Workflows & Executions

Build workflow pipelines, execute them, track progress, inspect node-level traces, and manage versions.

Overview

Three resources work together:

  • Workflows (client.workflows) -- Create, configure, deploy, version, and share workflow pipelines.
  • Executions (client.executions) -- Run workflows and inspect execution history, node spans, logs, and progress.
  • Workflow Nodes (client.workflow_nodes) -- Browse the node catalog and discover available services (models, addons, data sources).

Workflows

Basic Usage

from strongly import Strongly

client = Strongly()

# List active workflows
for wf in client.workflows.list(status="active"):
print(f"{wf.name} (ID: {wf.id})")

# Get a specific workflow
wf = client.workflows.retrieve("wf-abc123")
print(f"{wf.name}{len(wf.nodes)} nodes, {len(wf.connections)} connections")

Creating a Workflow

from strongly import Strongly

client = Strongly()

result = client.workflows.create({
"name": "daily-etl",
"description": "Extracts data, transforms, and loads into the warehouse",
"tags": ["etl", "production"],
"settings": {
"timeout": 3600,
"retry_count": 2,
},
"nodes": [
{
"id": "node-1",
"type": "postgresql-source",
"label": "Extract Orders",
"config": {
"query": "SELECT * FROM orders WHERE date = CURRENT_DATE",
},
},
{
"id": "node-2",
"type": "python-transform",
"label": "Clean Data",
"config": {
"script": "output = [r for r in input_data if r['amount'] > 0]",
},
},
{
"id": "node-3",
"type": "s3-dest",
"label": "Upload to S3",
"config": {
"bucket": "analytics-output",
"key_prefix": "daily/orders/",
},
},
],
"connections": [
{"source": "node-1", "target": "node-2"},
{"source": "node-2", "target": "node-3"},
],
})

print(f"Created workflow: {result.id}")

Updating a Workflow

from strongly import Strongly

client = Strongly()

updated = client.workflows.update("wf-abc123", {
"description": "Updated ETL pipeline with error handling",
"tags": ["etl", "production", "v2"],
})

print(f"Updated: {updated.name}")

Deploying and Undeploying

Deploy a workflow to make it available for scheduled or triggered execution:

from strongly import Strongly

client = Strongly()

# Deploy the workflow
client.workflows.deploy("wf-abc123")

# Undeploy when no longer needed
client.workflows.undeploy("wf-abc123")

Changing Status

from strongly import Strongly

client = Strongly()

# Pause a workflow
client.workflows.update_status("wf-abc123", status="paused")

# Reactivate it
client.workflows.update_status("wf-abc123", status="active")

Duplicating a Workflow

from strongly import Strongly

client = Strongly()

copy = client.workflows.duplicate("wf-abc123")
print(f"Duplicate ID: {copy['id']}")

Deleting a Workflow

from strongly import Strongly

client = Strongly()

client.workflows.delete("wf-abc123")

Version Management

Tag workflow snapshots and track change history:

from strongly import Strongly

client = Strongly()

# Create a version tag
client.workflows.create_version(
"wf-abc123",
version_tag="v1.0",
description="Initial production release",
)

# List all versions
version_info = client.workflows.versions("wf-abc123")
print(f"Current version: {version_info.current_version}")

for v in version_info.versions:
print(f" {v.version_tag}{v.description}")

WorkflowVersionInfo Fields

FieldTypeDescription
workflow_idstrWorkflow identifier
current_versionstrCurrently active version tag
versionslist[WorkflowVersion]List of all versions

Sharing

Share workflows with other users in your organization:

from strongly import Strongly

client = Strongly()

# Share with a user
client.workflows.share("wf-abc123", user_id="user-xyz", permission="edit")

# List shared users
shared = client.workflows.shared_users("wf-abc123")
print(shared)

# Revoke access
client.workflows.unshare("wf-abc123", user_id="user-xyz")

Templates and Stats

from strongly import Strongly

client = Strongly()

# List available workflow templates
templates = client.workflows.templates()
for t in templates:
print(f"{t.name}{t.description}")

# Get workflow stats for your organization
stats = client.workflows.stats()
print(f"Total: {stats.total}")
print(f"Active: {stats.active}")
print(f"Paused: {stats.paused}")
print(f"Draft: {stats.draft}")
print(f"Archived: {stats.archived}")

WorkflowStats Fields

FieldTypeDescription
totalintTotal workflows
activeintActive workflows
pausedintPaused workflows
draftintDraft workflows
archivedintArchived workflows

Filtering and Searching

from strongly import Strongly

client = Strongly()

# Filter by status
for wf in client.workflows.list(status="active"):
print(wf.name)

# Filter by tag
for wf in client.workflows.list(tag="production"):
print(wf.name)

# Search by name
for wf in client.workflows.list(search="etl"):
print(f"{wf.name}{wf.status}")

# Get all as a list
all_workflows = client.workflows.list().to_list()

Workflow Model

FieldTypeDescription
idstrUnique workflow identifier
namestrWorkflow name
descriptionstrHuman-readable description
statusstrCurrent status (active, paused, draft, archived)
versionAnyCurrent version
nodeslistList of node definitions
connectionslistList of node connections
configdictWorkflow-level configuration
settingsdictWorkflow settings (timeout, retries, etc.)
tagslistTags for organization and filtering
organization_idstrOwning organization
owner_idstrOwner user ID
shared_withlistUser IDs the workflow is shared with
is_publicboolWhether the workflow is publicly visible
is_templateboolWhether the workflow is a reusable template
created_atstrCreation timestamp
updated_atstrLast update timestamp

Workflows Method Reference

MethodDescriptionReturns
list(*, status=None, search=None, tag=None, limit=50)List workflows with optional filtersSyncPaginator[Workflow]
create(body)Create a new workflowWorkflow
retrieve(workflow_id)Get a workflow by IDWorkflow
update(workflow_id, body)Update workflow fieldsWorkflow
delete(workflow_id)Delete a workflowdict
duplicate(workflow_id)Duplicate a workflowdict
execute(workflow_id, *, config=None)Execute the workflowdict
deploy(workflow_id, **kwargs)Deploy the workflowdict
undeploy(workflow_id)Undeploy the workflowdict
update_status(workflow_id, *, status)Change workflow statusdict
versions(workflow_id)List all versionsWorkflowVersionInfo
create_version(workflow_id, *, version_tag, description=None)Create a version tagdict
share(workflow_id, *, user_id, permission=None)Share with a userdict
unshare(workflow_id, *, user_id)Revoke sharingdict
shared_users(workflow_id)List users with accessList[WorkflowSharedUser]
templates()List workflow templatesList[Workflow]
stats()Get workflow statisticsWorkflowStats

Executions

Executing a Workflow

from strongly import Strongly

client = Strongly()

# Execute a workflow
result = client.workflows.execute("wf-abc123")
execution_id = result["executionId"]
print(f"Execution started: {execution_id}")

# Execute with runtime config overrides
result = client.workflows.execute("wf-abc123", config={
"batch_size": 500,
"dry_run": False,
})

Listing Executions

from strongly import Strongly

client = Strongly()

# List all executions for a workflow
for ex in client.executions.list(workflow_id="wf-abc123"):
print(f"{ex.id}{ex.status}{ex.duration_ms}ms")

# Filter by status
for ex in client.executions.list(status="completed"):
print(f"{ex.id}: {ex.started_at} to {ex.ended_at}")

# Filter by date range
for ex in client.executions.list(since="2025-01-01", until="2025-01-31"):
print(f"{ex.id}{ex.status}")

# Filter by trigger type
for ex in client.executions.list(trigger_type="manual"):
print(f"{ex.id} — triggered by {ex.user_id}")

Tracking Progress

Monitor a running execution in real time:

from strongly import Strongly
import time

client = Strongly()

result = client.workflows.execute("wf-abc123")
execution_id = result["executionId"]

while True:
progress = client.executions.progress(execution_id)
pct = progress.progress
print(
f" [{pct}%] "
f"{progress.completed_nodes}/{progress.total_nodes} nodes done, "
f"{progress.running_nodes} running, "
f"{progress.failed_nodes} failed"
)

if progress.status in ("completed", "failed", "stopped"):
print(f"\nFinal status: {progress.status}")
if progress.duration_ms:
print(f"Duration: {progress.duration_ms / 1000:.1f}s")
break

time.sleep(2)

ExecutionProgress Fields

FieldTypeDescription
execution_idstrExecution identifier
statusstrCurrent status
progressintCompletion percentage (0-100)
total_nodesintTotal nodes in the workflow
completed_nodesintNodes that finished successfully
failed_nodesintNodes that failed
running_nodesintNodes currently running
started_atstrExecution start timestamp
ended_atstrExecution end timestamp
duration_msfloatTotal duration in milliseconds

Inspecting Spans

Spans provide node-level execution traces -- input data, output data, timing, and errors:

from strongly import Strongly

client = Strongly()

spans = client.executions.spans("exec-abc123")

for span in spans:
status_icon = "OK" if span.status == "completed" else "FAIL"
print(f"[{status_icon}] {span.node_label} ({span.node_type}) — {span.duration_ms}ms")

if span.error:
print(f" Error: {span.error}")

Filter spans for a specific node:

spans = client.executions.spans("exec-abc123", node_id="node-2")
for span in spans:
print(f"Input: {span.input_data}")
print(f"Output: {span.output_data}")

ExecutionSpan Fields

FieldTypeDescription
idstrSpan identifier
execution_idstrParent execution ID
node_idstrNode identifier
node_labelstrNode display label
node_typestrNode type (e.g., python-transform, postgresql-source)
statusstrSpan status (completed, failed, running)
started_atstrSpan start timestamp
ended_atstrSpan end timestamp
duration_msfloatDuration in milliseconds
input_datadictData received by the node
output_datadictData produced by the node
errorstrError message if the span failed

Viewing Logs

from strongly import Strongly

client = Strongly()

# Get all logs
logs = client.executions.logs("exec-abc123")
for log in logs:
print(f"[{log.level}] {log.created_at}{log.message}")

# Filter by level
errors = client.executions.logs("exec-abc123", level="error")
for log in errors:
print(f"Node {log.node_id}: {log.message}")

# Limit the number of log entries
recent = client.executions.logs("exec-abc123", limit=50)

ExecutionLog Fields

FieldTypeDescription
idstrLog entry identifier
execution_idstrParent execution ID
levelstrLog level (info, warn, error, debug)
messagestrLog message text
node_idstrOriginating node ID
created_atstrTimestamp

Stopping and Resuming

from strongly import Strongly

client = Strongly()

# Stop a running execution
client.executions.stop("exec-abc123")

# Resume a paused or waiting execution
client.executions.resume("exec-abc123", trigger_data={"approved": True})

Execution Model

FieldTypeDescription
idstrUnique execution identifier
workflow_idstrSource workflow ID
statusstrCurrent status (running, completed, failed, stopped)
started_atstrStart timestamp
ended_atstrEnd timestamp
duration_msfloatTotal duration in milliseconds
triggerstrTrigger description
trigger_typestrTrigger type (manual, schedule, webhook, api)
user_idstrUser who triggered the execution
organization_idstrOrganization ID
configdictRuntime configuration overrides
errorstrError message if execution failed
progressdictProgress summary
created_atstrCreation timestamp
updated_atstrLast update timestamp

Executions Method Reference

MethodDescriptionReturns
list(*, workflow_id=None, status=None, since=None, until=None, trigger_type=None, limit=50)List executions with optional filtersSyncPaginator[Execution]
retrieve(execution_id)Get an execution by IDExecution
stop(execution_id)Stop a running executiondict
resume(execution_id, *, trigger_data=None)Resume a paused executiondict
spans(execution_id, *, node_id=None)Get node-level execution tracesList[ExecutionSpan]
logs(execution_id, *, level=None, limit=None)Get execution logsList[ExecutionLog]
progress(execution_id)Get real-time progressExecutionProgress

Workflow Nodes

The node catalog lists every node type available in the workflow builder, along with service discovery helpers for models, addons, and data sources.

Browsing the Catalog

from strongly import Strongly

client = Strongly()

# List all available node types
for node in client.workflow_nodes.list():
print(f"{node.id}{node.category}")

# Filter by category
for node in client.workflow_nodes.list(category="transform"):
print(f"{node.id}: {node.description}")

# Search by name
for node in client.workflow_nodes.list(search="python"):
print(node.id)

# Filter system vs. custom nodes
for node in client.workflow_nodes.list(is_system=True):
print(node.id)

Service Discovery

Discover which models, addons, and data sources are available for use in workflow node configurations:

from strongly import Strongly

client = Strongly()

# List available AI models
models = client.workflow_nodes.services_models()
print(models)

# Filter models by provider
openai_models = client.workflow_nodes.services_models(provider="openai")
print(openai_models)

# List available addons (managed databases, etc.)
addons = client.workflow_nodes.services_addons()
print(addons)

# Filter addons by type
pg_addons = client.workflow_nodes.services_addons(type="postgresql")
print(pg_addons)

# List available data sources
datasources = client.workflow_nodes.services_datasources()
print(datasources)

# Filter data sources by category
db_sources = client.workflow_nodes.services_datasources(category="database")
print(db_sources)

Workflow Nodes Method Reference

MethodDescriptionReturns
list(*, search=None, category=None, type=None, is_system=None, limit=50)List node typesSyncPaginator[WorkflowNode]
create(body)Register a custom node typeWorkflowNode
retrieve(node_id)Get a node type by IDWorkflowNode
update(node_id, body)Update a node typeWorkflowNode
delete(node_id)Delete a custom node typedict
services_datasources(*, type=None, category=None)Discover available data sourcesServiceDataSourcesResponse
services_addons(*, type=None)Discover available addonsServiceAddonsResponse
services_models(*, provider=None, type=None)Discover available AI modelsServiceModelsResponse

Complete Example

from strongly import Strongly
import time

def main():
client = Strongly()

# --- Check workflow stats ---
stats = client.workflows.stats()
print(f"Workflows — Total: {stats.total}, Active: {stats.active}, Draft: {stats.draft}")

# --- Create a workflow ---
print("\nCreating workflow...")
result = client.workflows.create({
"name": "data-pipeline",
"description": "Extract, transform, and load daily orders",
"tags": ["etl", "daily"],
"nodes": [
{
"id": "extract",
"type": "postgresql-source",
"label": "Extract Orders",
"config": {"query": "SELECT * FROM orders WHERE date = CURRENT_DATE"},
},
{
"id": "transform",
"type": "python-transform",
"label": "Clean Data",
"config": {"script": "output = [r for r in input_data if r['status'] == 'complete']"},
},
{
"id": "load",
"type": "s3-dest",
"label": "Upload Results",
"config": {"bucket": "analytics", "key_prefix": "daily/"},
},
],
"connections": [
{"source": "extract", "target": "transform"},
{"source": "transform", "target": "load"},
],
})
wf_id = result.id
print(f"Workflow ID: {wf_id}")

# --- Tag a version ---
print("\nCreating version tag...")
client.workflows.create_version(wf_id, version_tag="v1.0", description="Initial release")

versions = client.workflows.versions(wf_id)
print(f"Current version: {versions.current_version}")

# --- Execute ---
print("\nExecuting workflow...")
exec_result = client.workflows.execute(wf_id)
execution_id = exec_result["executionId"]
print(f"Execution ID: {execution_id}")

# --- Track progress ---
while True:
progress = client.executions.progress(execution_id)
print(
f" [{progress.progress}%] "
f"{progress.completed_nodes}/{progress.total_nodes} complete"
)
if progress.status in ("completed", "failed", "stopped"):
break
time.sleep(2)

print(f"\nFinal status: {progress.status}")
if progress.duration_ms:
print(f"Duration: {progress.duration_ms / 1000:.1f}s")

# --- Inspect spans ---
print("\nNode spans:")
spans = client.executions.spans(execution_id)
for span in spans:
flag = "OK" if span.status == "completed" else "FAIL"
print(f" [{flag}] {span.node_label} ({span.node_type}) — {span.duration_ms}ms")
if span.error:
print(f" Error: {span.error}")

# --- View logs ---
print("\nExecution logs:")
logs = client.executions.logs(execution_id, limit=20)
for log in logs:
print(f" [{log.level}] {log.message}")

# --- Share the workflow ---
print("\nSharing with teammate...")
client.workflows.share(wf_id, user_id="user-teammate-1", permission="view")
shared = client.workflows.shared_users(wf_id)
print(f"Shared with: {shared}")

# --- Browse the node catalog ---
print("\nAvailable transform nodes:")
for node in client.workflow_nodes.list(category="transform"):
print(f" {node.id}")

# --- Service discovery ---
print("\nAvailable AI models for workflows:")
models = client.workflow_nodes.services_models()
print(models)

if __name__ == "__main__":
main()