simpleworkflow

package module
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2026 License: MIT Imports: 18 Imported by: 0

README

simple-workflow

A durable system of record for intent — not a workflow engine.

Overview

simple-workflow records what should happen, decoupled from how or when it happens. Producers write intent to a workflow_run table. Workers poll for it. PostgreSQL (or SQLite) is the only coordination point — no scheduler, no broker, no central executor.

  • A durable inbox for asynchronous work
  • A contract between producers ("I need this done") and workers ("I can do this")
  • A system of record you can query, audit, and monitor with plain SQL

Features

  • Durable persistence — workflow runs survive crashes and restarts
  • Automatic retries — exponential backoff with jitter
  • Idempotency — duplicate requests don't create duplicate work
  • Observable — audit trail in PostgreSQL via workflow_event table
  • Type-prefix routing — route workflows by pattern (e.g., billing.%)
  • Heartbeat & cancellation — long-running jobs extend leases; cancel gracefully
  • Priority & scheduling — control execution order and timing

Installation

go get github.com/tendant/simple-workflow

Python: see python/README.md.

Quick Start

wf, err := simpleworkflow.New("sqlite:///tmp/workflow.db")
if err != nil { log.Fatal(err) }
defer wf.Close()

wf.HandleFunc("demo.hello.v1", func(ctx context.Context, run *simpleworkflow.WorkflowRun) (any, error) {
    log.Printf("Hello! payload=%s", run.Payload)
    return map[string]string{"status": "done"}, nil
})

runID, _ := wf.Submit("demo.hello.v1", map[string]string{"msg": "hi"}).Execute(ctx)

wf.WithAutoMigrate().Start(ctx) // creates tables automatically
Migrations

Embedded DDL (recommended for getting started):

wf.WithAutoMigrate().Start(ctx)   // auto-migrate on Start()
wf.AutoMigrate(ctx)               // or migrate explicitly
client.AutoMigrate(ctx)           // also works on Client and Poller

Works for both SQLite and PostgreSQL — no external tools needed.

Version-controlled migrations (recommended for production):

make migrate-up       # apply all pending migrations
make migrate-status   # check status
make migrate-down     # rollback last migration
Producer
client, _ := simpleworkflow.NewClient("postgres://user:pass@localhost/db?schema=workflow")
defer client.Close()

runID, err := client.Submit("content.thumbnail.v1", payload).
    WithIdempotency("thumbnail:c_123:300x300").
    WithPriority(10).
    Execute(ctx)

err = client.Cancel(ctx, runID)
Worker
poller, _ := simpleworkflow.NewPoller("postgres://user:pass@localhost/db?schema=workflow")
defer poller.Close()

poller.HandleFunc("content.thumbnail.v1", func(ctx context.Context, run *simpleworkflow.WorkflowRun) (any, error) {
    var params struct {
        ContentID string `json:"content_id"`
        Width     int    `json:"width"`
    }
    json.Unmarshal(run.Payload, &params)
    fmt.Printf("Generating thumbnail for %s\n", params.ContentID)
    return map[string]string{"status": "completed"}, nil
})

poller.Start(ctx) // type prefix "content.%" auto-detected from handlers

API Reference

Connection Strings
Database Format Example
SQLite sqlite://<path> sqlite:///tmp/workflow.db or sqlite://:memory:
PostgreSQL postgres://user:pass@host/db?schema=workflow postgres://user:pass@localhost/mydb?schema=workflow

The ?schema= parameter sets the PostgreSQL search_path (defaults to workflow).

Client (Producer)
client, err := simpleworkflow.NewClient("postgres://...")   // from connection string
client := simpleworkflow.NewClientWithDB(db)                // from existing *sql.DB
defer client.Close()

runID, err := client.Submit("workflow.type.v1", payload).
    WithIdempotency("unique-key").
    WithPriority(10).
    WithMaxAttempts(5).
    RunIn(5 * time.Minute).
    Execute(ctx)

err = client.Cancel(ctx, runID)
Poller (Worker)
poller, err := simpleworkflow.NewPoller("postgres://...")    // from connection string
poller := simpleworkflow.NewPollerWithDB(db)                 // from existing *sql.DB
defer poller.Close()

poller.WithWorkerID("custom-worker").
    WithLeaseDuration(60 * time.Second).
    WithPollInterval(5 * time.Second).
    WithTypePrefixes("billing.%", "media.%")                 // override auto-detection

poller.HandleFunc("billing.invoice.v1", handler)             // function handler
poller.Handle("billing.payment.v1", &PaymentExecutor{})      // struct handler

poller.Start(ctx)
Workflow (All-in-One)

Combines Client + Poller in a single object — ideal for small services or getting started.

wf, err := simpleworkflow.New("sqlite:///tmp/workflow.db")
wf := simpleworkflow.NewWithDB(db, dialect)
defer wf.Close()

wf.HandleFunc("demo.hello.v1", handler)
runID, err := wf.Submit("demo.hello.v1", payload).Execute(ctx)
wf.WithAutoMigrate().Start(ctx)
WithAutoMigrate
wf.WithAutoMigrate().Start(ctx)       // on Workflow
poller.WithAutoMigrate().Start(ctx)   // on Poller

Runs idempotent CREATE TABLE IF NOT EXISTS on Start(). Works for both SQLite and PostgreSQL.

Smart Defaults
  • Worker ID: hostname-pid (e.g., my-server-12345)
  • Lease duration: 30 seconds
  • Poll interval: 2 seconds
  • Type prefixes: auto-detected from registered handlers
  • Schema: workflow (or from ?schema= parameter)
Core Types
Intent
type Intent struct {
    Type           string      // Workflow type (e.g. "content.thumbnail.v1")
    Payload        interface{} // JSON-encodable data
    Priority       int         // Lower executes first (default: 100)
    RunAfter       time.Time   // Schedule for future (default: now)
    IdempotencyKey string      // Optional deduplication key
    MaxAttempts    int         // Retry limit (default: 3)
}
WorkflowRun
type WorkflowRun struct {
    ID          string
    Type        string
    Payload     []byte
    Attempt     int
    MaxAttempts int
    Heartbeat    HeartbeatFunc         // Extend the lease
    IsCancelled  CancellationCheckFunc // Check if cancelled
}
WorkflowExecutor
type WorkflowExecutor interface {
    Execute(ctx context.Context, run *WorkflowRun) (interface{}, error)
}

Type-Prefix Routing

Workers claim workflows by pattern using SQL LIKE:

poller.HandleFunc("billing.invoice.v1", invoiceHandler)
poller.HandleFunc("billing.payment.v1", paymentHandler)
// → auto-detects "billing.%" prefix

poller.WithTypePrefixes("billing.%") // or set explicitly

Failure & Retry

  • Failed runs automatically retry with exponential backoff and jitter
  • After max_attempts, runs move to failed status
  • If a worker crashes, the lease expires and another worker retries
Attempt 1: immediate
Attempt 2: +1 minute (±10% jitter)
Attempt 3: +4 minutes (±10% jitter)
Attempt 4: failed (permanent)

Heartbeat & Cancellation

For workflows that outlast the lease duration, extend the lease periodically. Check for cancellation to stop gracefully.

func (e *Executor) Execute(ctx context.Context, run *simpleworkflow.WorkflowRun) (any, error) {
    done := make(chan struct{})
    defer close(done)

    go func() {
        ticker := time.NewTicker(15 * time.Second)
        defer ticker.Stop()
        for {
            select {
            case <-ticker.C:
                run.Heartbeat(ctx, 30*time.Second)
            case <-done:
                return
            }
        }
    }()

    // Check for cancellation periodically
    if cancelled, _ := run.IsCancelled(ctx); cancelled {
        return nil, fmt.Errorf("workflow cancelled")
    }

    // Do long-running work...
    return result, nil
}

Cancel from the producer side:

err := client.Cancel(ctx, runID)

Database Schema

workflow_run
Column Type Description
id UUID Primary key
type TEXT Workflow type (e.g. content.thumbnail.v1)
payload JSONB Workflow arguments
status TEXT pending, leased, succeeded, failed, cancelled
priority INT Lower executes first (default: 100)
run_at TIMESTAMPTZ Earliest execution time
idempotency_key TEXT Unique deduplication key
attempt INT Current attempt number
max_attempts INT Retry limit (default: 3)
leased_by TEXT Worker ID holding the lease
lease_until TIMESTAMPTZ Lease expiration
last_error TEXT Most recent error message
result JSONB Workflow result
created_at TIMESTAMPTZ Run creation time
updated_at TIMESTAMPTZ Last update time
deleted_at TIMESTAMPTZ Soft delete (NULL = active)

workflow_event stores an audit trail of all lifecycle events (created, leased, succeeded, failed, etc.). workflow_registry tracks registered workflow types and their runtime.

License

MIT

Documentation

Index

Constants

View Source
const DefaultSchema = "workflow"

DefaultSchema is the default schema name if not specified

Variables

View Source
var (
	// ErrNotFound is returned when a resource does not exist or is not in the expected state.
	ErrNotFound = errors.New("not found")
	// ErrAlreadyComplete is returned when a workflow run cannot be cancelled because it is already completed.
	ErrAlreadyComplete = errors.New("workflow run not found or already completed")
)

Functions

func ParseConnString added in v0.0.3

func ParseConnString(connString, defaultSchema string) (string, string, error)

ParseConnString extracts schema information from a connection string and returns a modified connection string with search_path parameter.

Supports multiple formats:

  • postgres://user:pass@host/db?schema=myschema
  • postgres://user:pass@host/db?search_path=myschema
  • postgres://user:pass@host/db (uses defaultSchema)

Returns: (modifiedConnString, schemaName, error)

func RewritePlaceholders added in v0.0.4

func RewritePlaceholders(query string) string

RewritePlaceholders converts PostgreSQL-style $N placeholders to ? placeholders. Example: "WHERE id = $1 AND status = $2" → "WHERE id = ? AND status = ?"

Types

type CancellationCheckFunc added in v0.0.3

type CancellationCheckFunc func(ctx context.Context) (bool, error)

CancellationCheckFunc checks if the workflow run has been cancelled Returns true if cancelled, false otherwise

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client manages workflow runs in the database (Producer API)

func NewClient

func NewClient(connString string) (*Client, error)

NewClient creates a new client from a connection string. The connection string determines the dialect (PostgreSQL or SQLite).

Examples:

  • NewClient("postgres://user:pass@localhost/db?schema=workflow")
  • NewClient("sqlite:///path/to/db.sqlite")
  • NewClient("sqlite://:memory:")

The client will manage the database connection and Close() must be called.

func NewClientWithDB added in v0.0.4

func NewClientWithDB(db *sql.DB, dialect Dialect) *Client

NewClientWithDB creates a new client from an existing *sql.DB and Dialect. Useful for testing or when you manage the connection yourself.

func (*Client) AutoMigrate added in v0.0.4

func (c *Client) AutoMigrate(ctx context.Context) error

AutoMigrate runs the embedded DDL for the current dialect. This creates all required tables if they don't exist. For PostgreSQL users who prefer goose, this is optional. For SQLite users, this is the recommended way to set up the schema.

func (*Client) Cancel added in v0.0.3

func (c *Client) Cancel(ctx context.Context, runID string) error

Cancel marks a workflow run as cancelled (cooperative cancellation)

func (*Client) Close added in v0.0.3

func (c *Client) Close() error

Close closes the database connection.

func (*Client) DB added in v0.0.4

func (c *Client) DB() *sql.DB

DB returns the underlying database connection. Useful for the REST API layer that needs to pass the db to handler functions.

func (*Client) DeleteSchedule added in v0.0.4

func (c *Client) DeleteSchedule(ctx context.Context, scheduleID string) error

DeleteSchedule soft-deletes a schedule.

func (*Client) Dialect added in v0.0.4

func (c *Client) Dialect() Dialect

Dialect returns the dialect used by this client.

func (*Client) GetWorkflowEvents added in v0.0.4

func (c *Client) GetWorkflowEvents(ctx context.Context, runID string) ([]WorkflowEvent, error)

GetWorkflowEvents returns the audit event log for a workflow run.

func (*Client) GetWorkflowRun added in v0.0.4

func (c *Client) GetWorkflowRun(ctx context.Context, id string) (*WorkflowRunStatus, error)

GetWorkflowRun retrieves a single workflow run by ID.

func (*Client) ListSchedules added in v0.0.4

func (c *Client) ListSchedules(ctx context.Context) ([]Schedule, error)

ListSchedules returns all active (non-deleted) schedules.

func (*Client) ListWorkflowRuns added in v0.0.4

func (c *Client) ListWorkflowRuns(ctx context.Context, opts ListOptions) ([]WorkflowRunStatus, error)

ListWorkflowRuns returns workflow runs matching the given options.

func (*Client) PauseSchedule added in v0.0.4

func (c *Client) PauseSchedule(ctx context.Context, scheduleID string) error

PauseSchedule disables a schedule so it won't fire.

func (*Client) ResumeSchedule added in v0.0.4

func (c *Client) ResumeSchedule(ctx context.Context, scheduleID string) error

ResumeSchedule re-enables a paused schedule.

func (*Client) Schedule added in v0.0.4

func (c *Client) Schedule(workflowType string, payload any) *ScheduleBuilder

Schedule starts building a new workflow schedule.

Example:

id, err := client.Schedule("report.daily.v1", payload).
    Cron("0 9 * * 1").
    InTimezone("America/New_York").
    Create(ctx)

func (*Client) Submit added in v0.0.3

func (c *Client) Submit(workflowType string, payload any) *SubmitBuilder

Submit creates a new workflow run with fluent configuration. Returns a SubmitBuilder for chaining configuration methods.

Example:

runID, err := client.Submit("billing.invoice.v1", payload).
    WithIdempotency("invoice:123").
    WithPriority(10).
    Execute(ctx)

type Dialect added in v0.0.4

type Dialect interface {
	// DriverName returns the database/sql driver name (e.g. "postgres", "sqlite").
	DriverName() string

	// Placeholder returns a positional parameter placeholder.
	// For PostgreSQL: Placeholder(1) → "$1"
	// For SQLite:     Placeholder(1) → "?"
	Placeholder(n int) string

	// Now returns the SQL expression for the current timestamp.
	// PostgreSQL: "NOW()"
	// SQLite:     "datetime('now')"
	Now() string

	// TimestampAfterNow returns a SQL expression for NOW() + seconds.
	// PostgreSQL: "NOW() + $1::interval" (with value "N seconds")
	// SQLite:     "datetime('now', '+N seconds')"
	// The literal parameter indicates whether to embed the value literally
	// or use a placeholder.
	TimestampAfterNow(seconds int) string

	// IntervalParam returns the SQL fragment and argument for adding a duration.
	// PostgreSQL: returns ("$N::interval", "30 seconds")
	// SQLite:     not used (TimestampAfterNow embeds the value)
	IntervalParam(n int, seconds int) (sqlFragment string, arg any)

	// ClaimRunQuery returns the SQL for atomically claiming a workflow run.
	// typeCondition is an already-formatted "AND (type LIKE ...)" fragment.
	// leaseSec is the lease duration in seconds.
	// For PostgreSQL, caller provides args: [workerID, intervalString, ...typePrefixes].
	// For SQLite, caller provides args: [workerID, ...typePrefixes] (lease embedded in SQL).
	ClaimRunQuery(typeCondition string, leaseSec int) string

	// ClaimSchedulesQuery returns the SQL for claiming due schedules.
	ClaimSchedulesQuery() string

	// MigrateSQL returns the DDL statements for creating all tables.
	MigrateSQL() string

	// OpenDB opens a database connection with dialect-specific settings.
	OpenDB(dsn string) (*sql.DB, error)
}

Dialect abstracts SQL differences between PostgreSQL and SQLite. All dialect-specific SQL generation flows through this interface.

func DetectDialect added in v0.0.4

func DetectDialect(connString string) (Dialect, string, error)

DetectDialect examines a connection string and returns the appropriate Dialect, the (possibly modified) DSN to pass to the driver, and any error.

Supported schemes:

  • postgres:// or postgresql:// → PostgresDialect (DSN includes search_path)
  • sqlite:// or sqlite3:// → SQLiteDialect (DSN is the file path with query params)
  • key=value format (no scheme) → PostgresDialect (legacy PostgreSQL format)

type DueSchedule added in v0.0.4

type DueSchedule struct {
	ID          string
	Type        string
	Payload     json.RawMessage
	CronExpr    string
	Timezone    string
	NextRunAt   time.Time
	Priority    int
	MaxAttempts int
}

DueSchedule holds the data for a schedule that is due to fire.

type HeartbeatFunc added in v0.0.3

type HeartbeatFunc func(ctx context.Context, duration time.Duration) error

HeartbeatFunc extends the lease on a workflow run duration: how long to extend the lease by

type Intent

type Intent struct {
	Type           string    // e.g. "content.thumbnail.v1" (renamed from Name for semantic clarity)
	Payload        any       // Will be JSON-encoded
	Priority       int       // Lower executes first (default: 100)
	RunAfter       time.Time // Schedule for future (default: now)
	IdempotencyKey string    // Optional: deduplication key
	MaxAttempts    int       // Default: 3
}

Intent represents a workflow intent to be executed

type ListOptions added in v0.0.4

type ListOptions struct {
	Type           string // Filter by workflow type (exact match)
	Status         string // Filter by status
	IdempotencyKey string // Filter by idempotency key (exact match)
	Limit          int    // Max results (default: 50)
	Offset         int    // Pagination offset
}

ListOptions configures filtering and pagination for listing workflow runs.

type MetricsCollector added in v0.0.2

type MetricsCollector interface {
	// RecordIntentClaimed tracks when a workflow run is claimed by a worker
	RecordIntentClaimed(workflowType, workerID string)

	// RecordIntentCompleted tracks workflow completion with status and duration
	// Status can be: "succeeded", "failed", "cancelled"
	RecordIntentCompleted(workflowType, workerID, status string, duration time.Duration)

	// RecordIntentDeadletter tracks workflows that permanently failed (PRIORITY METRIC)
	RecordIntentDeadletter(workflowType, workerID string)

	// RecordFailedAttempt tracks individual workflow execution failures before permanent failure
	RecordFailedAttempt(workflowType, workerID string, attemptNumber int)

	// RecordPollCycle tracks polling activity
	RecordPollCycle(workerID string)

	// RecordPollError tracks polling errors by type
	RecordPollError(workerID string, errorType string)

	// RecordQueueDepth updates the current queue depth gauge
	RecordQueueDepth(workflowType string, status string, depth int)
}

MetricsCollector interface allows optional metrics collection Implementations can track workflow execution metrics for observability

type Poller

type Poller struct {
	// contains filtered or unexported fields
}

Poller polls workflow_run table and executes workflows (Worker API)

func NewPoller

func NewPoller(connString string) (*Poller, error)

NewPoller creates a new workflow run poller from a PostgreSQL connection string. Type prefixes are auto-detected from registered handlers. Use fluent methods to configure: WithWorkerID(), WithLeaseDuration(), etc.

Example:

poller, err := NewPoller("postgres://user:pass@localhost/db?schema=workflow")
poller.HandleFunc("billing.invoice.v1", handler)
poller.Start(ctx)

func NewPollerWithDB added in v0.0.4

func NewPollerWithDB(db *sql.DB, dialect Dialect) *Poller

NewPollerWithDB creates a new workflow run poller from an existing *sql.DB and Dialect. Useful for testing or when you manage the connection yourself.

func (*Poller) AutoMigrate added in v0.0.4

func (p *Poller) AutoMigrate(ctx context.Context) error

AutoMigrate runs the embedded DDL for the current dialect. This creates all required tables if they don't exist.

func (*Poller) Close added in v0.0.3

func (p *Poller) Close() error

Close closes the database connection.

func (*Poller) Handle added in v0.0.3

func (p *Poller) Handle(workflowType string, executor WorkflowExecutor) *Poller

Handle registers a WorkflowExecutor for a workflow type. Use this for advanced cases where you need a stateful executor.

func (*Poller) HandleFunc added in v0.0.3

func (p *Poller) HandleFunc(workflowType string, fn func(context.Context, *WorkflowRun) (any, error)) *Poller

HandleFunc registers a function handler for a workflow type. The function receives the WorkflowRun and returns a result or error.

Example:

poller.HandleFunc("billing.invoice.v1", func(ctx context.Context, run *WorkflowRun) (any, error) {
    // Process invoice
    return result, nil
})

func (*Poller) SetMetrics added in v0.0.2

func (p *Poller) SetMetrics(m MetricsCollector)

SetMetrics sets the metrics collector (optional, pass nil to disable metrics)

func (*Poller) Start

func (p *Poller) Start(ctx context.Context)

Start begins polling for workflow runs

func (*Poller) Stop

func (p *Poller) Stop()

Stop stops the poller. Safe to call multiple times.

func (*Poller) WithAutoMigrate added in v0.0.4

func (p *Poller) WithAutoMigrate() *Poller

WithAutoMigrate enables automatic schema migration when Start() is called.

func (*Poller) WithLeaseDuration added in v0.0.3

func (p *Poller) WithLeaseDuration(d time.Duration) *Poller

WithLeaseDuration sets the lease duration for claimed workflow runs. Default: 30 seconds

func (*Poller) WithPollInterval added in v0.0.3

func (p *Poller) WithPollInterval(d time.Duration) *Poller

WithPollInterval sets how often to poll for new workflow runs. Default: 2 seconds

func (*Poller) WithScheduleTickInterval added in v0.0.4

func (p *Poller) WithScheduleTickInterval(d time.Duration) *Poller

WithScheduleTickInterval sets the tick interval for the embedded schedule ticker. Default: 15 seconds. Only effective if WithScheduleTicker() is also called.

func (*Poller) WithScheduleTicker added in v0.0.4

func (p *Poller) WithScheduleTicker() *Poller

WithScheduleTicker enables the schedule ticker inside the poller. The ticker converts due workflow_schedule rows into workflow_run rows.

func (*Poller) WithTypePrefixes added in v0.0.3

func (p *Poller) WithTypePrefixes(prefixes ...string) *Poller

WithTypePrefixes explicitly sets type prefixes to watch. This overrides auto-detection from registered handlers. Example: WithTypePrefixes("billing.%", "notify.%")

func (*Poller) WithWorkerID added in v0.0.3

func (p *Poller) WithWorkerID(id string) *Poller

WithWorkerID sets a custom worker ID. Default: hostname-pid

type PollerConfig added in v0.0.3

type PollerConfig struct {
	TypePrefixes  []string      // e.g. ["billing.%", "media.%"] for type-prefix routing
	LeaseDuration time.Duration // Lease duration (default: 30s)
	PollInterval  time.Duration // Poll interval (default: 2s)
	WorkerID      string        // Worker identifier (default: "go-worker")
}

PollerConfig configures the workflow poller

type PostgresDialect added in v0.0.4

type PostgresDialect struct{}

PostgresDialect implements Dialect for PostgreSQL.

func (*PostgresDialect) ClaimRunQuery added in v0.0.4

func (d *PostgresDialect) ClaimRunQuery(typeCondition string, leaseSec int) string

func (*PostgresDialect) ClaimSchedulesQuery added in v0.0.4

func (d *PostgresDialect) ClaimSchedulesQuery() string

func (*PostgresDialect) DriverName added in v0.0.4

func (d *PostgresDialect) DriverName() string

func (*PostgresDialect) IntervalParam added in v0.0.4

func (d *PostgresDialect) IntervalParam(n int, seconds int) (string, any)

func (*PostgresDialect) MigrateSQL added in v0.0.4

func (d *PostgresDialect) MigrateSQL() string

func (*PostgresDialect) Now added in v0.0.4

func (d *PostgresDialect) Now() string

func (*PostgresDialect) OpenDB added in v0.0.4

func (d *PostgresDialect) OpenDB(dsn string) (*sql.DB, error)

func (*PostgresDialect) Placeholder added in v0.0.4

func (d *PostgresDialect) Placeholder(n int) string

func (*PostgresDialect) TimestampAfterNow added in v0.0.4

func (d *PostgresDialect) TimestampAfterNow(seconds int) string

type PrometheusMetrics added in v0.0.2

type PrometheusMetrics struct {
	// contains filtered or unexported fields
}

PrometheusMetrics implements MetricsCollector using Prometheus

func NewPrometheusMetrics added in v0.0.2

func NewPrometheusMetrics(registry prometheus.Registerer) *PrometheusMetrics

NewPrometheusMetrics creates a new Prometheus metrics collector Pass nil for registry to use the default Prometheus registry

func (*PrometheusMetrics) RecordFailedAttempt added in v0.0.2

func (m *PrometheusMetrics) RecordFailedAttempt(workflowType, workerID string, attemptNumber int)

RecordFailedAttempt tracks individual failure attempts

func (*PrometheusMetrics) RecordIntentClaimed added in v0.0.2

func (m *PrometheusMetrics) RecordIntentClaimed(workflowType, workerID string)

RecordIntentClaimed increments the claimed counter

func (*PrometheusMetrics) RecordIntentCompleted added in v0.0.2

func (m *PrometheusMetrics) RecordIntentCompleted(workflowType, workerID, status string, duration time.Duration)

RecordIntentCompleted tracks workflow completion with status and duration

func (*PrometheusMetrics) RecordIntentDeadletter added in v0.0.2

func (m *PrometheusMetrics) RecordIntentDeadletter(workflowType, workerID string)

RecordIntentDeadletter increments the failed counter (CRITICAL for alerts)

func (*PrometheusMetrics) RecordPollCycle added in v0.0.2

func (m *PrometheusMetrics) RecordPollCycle(workerID string)

RecordPollCycle increments poll cycle counter and updates worker health gauges

func (*PrometheusMetrics) RecordPollError added in v0.0.2

func (m *PrometheusMetrics) RecordPollError(workerID string, errorType string)

RecordPollError increments poll error counter

func (*PrometheusMetrics) RecordQueueDepth added in v0.0.2

func (m *PrometheusMetrics) RecordQueueDepth(workflowType string, status string, depth int)

RecordQueueDepth sets the current queue depth gauge

type RunRepository added in v0.0.4

type RunRepository struct {
	// contains filtered or unexported fields
}

RunRepository encapsulates all data access for the workflow_run table.

func NewRunRepository added in v0.0.4

func NewRunRepository(db *sql.DB, dialect Dialect) *RunRepository

NewRunRepository creates a RunRepository from a db and dialect.

func (*RunRepository) Cancel added in v0.0.4

func (r *RunRepository) Cancel(ctx context.Context, runID string) error

Cancel marks a workflow run as cancelled.

func (*RunRepository) CheckCancelled added in v0.0.4

func (r *RunRepository) CheckCancelled(ctx context.Context, runID string) (bool, error)

CheckCancelled checks if a workflow run has been cancelled.

func (*RunRepository) Claim added in v0.0.4

func (r *RunRepository) Claim(ctx context.Context, workerID string, typePrefixes []string, leaseDuration time.Duration) (*WorkflowRun, error)

Claim atomically claims a pending workflow run. Returns nil if no work is available.

func (*RunRepository) CountPending added in v0.0.4

func (r *RunRepository) CountPending(ctx context.Context, typePrefix string) (int, error)

CountPending counts pending runs matching a type prefix.

func (*RunRepository) Create added in v0.0.4

func (r *RunRepository) Create(ctx context.Context, intent Intent) (string, error)

Create inserts a new workflow run and returns the ID. Returns "" if idempotency conflict.

func (*RunRepository) CreateFromSchedule added in v0.0.4

func (r *RunRepository) CreateFromSchedule(ctx context.Context, typ string, payload json.RawMessage, priority, maxAttempts int, idempotencyKey string) (string, error)

CreateFromSchedule inserts a workflow run from a schedule (within a transaction). Returns the created run ID, or "" if deduplicated by idempotency key.

func (*RunRepository) ExtendLease added in v0.0.4

func (r *RunRepository) ExtendLease(ctx context.Context, runID string, duration time.Duration) error

ExtendLease extends the lease on a workflow run.

func (*RunRepository) Get added in v0.0.4

Get retrieves a single workflow run by ID.

func (*RunRepository) GetEvents added in v0.0.4

func (r *RunRepository) GetEvents(ctx context.Context, runID string) ([]WorkflowEvent, error)

GetEvents returns all audit events for a workflow run, ordered by creation time.

func (*RunRepository) List added in v0.0.4

List returns workflow runs matching the given options.

func (*RunRepository) MarkFailed added in v0.0.4

func (r *RunRepository) MarkFailed(ctx context.Context, run *WorkflowRun, execErr error) string

MarkFailed marks a workflow run as failed or retries it. Returns the new status.

func (*RunRepository) MarkSucceeded added in v0.0.4

func (r *RunRepository) MarkSucceeded(ctx context.Context, runID string, result any) error

MarkSucceeded marks a workflow run as succeeded with the given result.

func (*RunRepository) WithTx added in v0.0.4

func (r *RunRepository) WithTx(tx *sql.Tx) *RunRepository

WithTx returns a copy of the repository that executes queries within the given transaction.

type SQLiteDialect added in v0.0.4

type SQLiteDialect struct{}

SQLiteDialect implements Dialect for SQLite (via modernc.org/sqlite).

func (*SQLiteDialect) ClaimRunQuery added in v0.0.4

func (d *SQLiteDialect) ClaimRunQuery(typeCondition string, leaseSec int) string

func (*SQLiteDialect) ClaimSchedulesQuery added in v0.0.4

func (d *SQLiteDialect) ClaimSchedulesQuery() string

func (*SQLiteDialect) DriverName added in v0.0.4

func (d *SQLiteDialect) DriverName() string

func (*SQLiteDialect) IntervalParam added in v0.0.4

func (d *SQLiteDialect) IntervalParam(n int, seconds int) (string, any)

func (*SQLiteDialect) MigrateSQL added in v0.0.4

func (d *SQLiteDialect) MigrateSQL() string

func (*SQLiteDialect) Now added in v0.0.4

func (d *SQLiteDialect) Now() string

func (*SQLiteDialect) OpenDB added in v0.0.4

func (d *SQLiteDialect) OpenDB(dsn string) (*sql.DB, error)

func (*SQLiteDialect) Placeholder added in v0.0.4

func (d *SQLiteDialect) Placeholder(n int) string

func (*SQLiteDialect) TimestampAfterNow added in v0.0.4

func (d *SQLiteDialect) TimestampAfterNow(seconds int) string

type Schedule added in v0.0.4

type Schedule struct {
	ID          string
	Type        string
	Payload     any
	CronExpr    string
	Timezone    string
	NextRunAt   time.Time
	LastRunAt   *time.Time
	Enabled     bool
	Priority    int
	MaxAttempts int
}

Schedule represents a recurring workflow schedule.

type ScheduleBuilder added in v0.0.4

type ScheduleBuilder struct {
	// contains filtered or unexported fields
}

ScheduleBuilder provides a fluent API for creating workflow schedules.

func (*ScheduleBuilder) Create added in v0.0.4

func (s *ScheduleBuilder) Create(ctx context.Context) (string, error)

Create validates the cron expression, computes the next run time, and inserts the schedule. Returns the schedule ID.

func (*ScheduleBuilder) Cron added in v0.0.4

func (s *ScheduleBuilder) Cron(expr string) *ScheduleBuilder

Cron sets the cron expression (5-field standard format). Examples: "0 9 * * 1" (every Monday at 9am), "*/5 * * * *" (every 5 minutes)

func (*ScheduleBuilder) InTimezone added in v0.0.4

func (s *ScheduleBuilder) InTimezone(tz string) *ScheduleBuilder

InTimezone sets the IANA timezone for the schedule. Default: "UTC"

func (*ScheduleBuilder) WithMaxAttempts added in v0.0.4

func (s *ScheduleBuilder) WithMaxAttempts(n int) *ScheduleBuilder

WithMaxAttempts sets the max attempts inherited by created runs. Default: 3

func (*ScheduleBuilder) WithPriority added in v0.0.4

func (s *ScheduleBuilder) WithPriority(p int) *ScheduleBuilder

WithPriority sets the priority inherited by created runs. Default: 100

type ScheduleRepository added in v0.0.4

type ScheduleRepository struct {
	// contains filtered or unexported fields
}

ScheduleRepository encapsulates all data access for the workflow_schedule table.

func NewScheduleRepository added in v0.0.4

func NewScheduleRepository(db *sql.DB, dialect Dialect) *ScheduleRepository

NewScheduleRepository creates a ScheduleRepository from a db and dialect.

func (*ScheduleRepository) AdvanceNextRun added in v0.0.4

func (r *ScheduleRepository) AdvanceNextRun(ctx context.Context, scheduleID string, nextRun time.Time) error

AdvanceNextRun updates a schedule's next run time and last run time.

func (*ScheduleRepository) ClaimDue added in v0.0.4

func (r *ScheduleRepository) ClaimDue(ctx context.Context) ([]DueSchedule, error)

ClaimDue returns schedules that are due to fire (within a transaction).

func (*ScheduleRepository) Create added in v0.0.4

func (r *ScheduleRepository) Create(ctx context.Context, workflowType, cronExpr, timezone string, payload any, priority, maxAttempts int) (string, error)

Create validates and inserts a new schedule. Returns the schedule ID.

func (*ScheduleRepository) List added in v0.0.4

func (r *ScheduleRepository) List(ctx context.Context) ([]Schedule, error)

List returns all active (non-deleted) schedules.

func (*ScheduleRepository) SetEnabled added in v0.0.4

func (r *ScheduleRepository) SetEnabled(ctx context.Context, scheduleID string, enabled bool) error

SetEnabled enables or disables a schedule.

func (*ScheduleRepository) SoftDelete added in v0.0.4

func (r *ScheduleRepository) SoftDelete(ctx context.Context, scheduleID string) error

SoftDelete soft-deletes a schedule.

func (*ScheduleRepository) WithTx added in v0.0.4

func (r *ScheduleRepository) WithTx(tx *sql.Tx) *ScheduleRepository

WithTx returns a copy of the repository that executes queries within the given transaction.

type ScheduleTicker added in v0.0.4

type ScheduleTicker struct {
	// contains filtered or unexported fields
}

ScheduleTicker converts due schedules into workflow_run rows. It can run standalone or be embedded in a Poller via WithScheduleTicker().

func NewScheduleTicker added in v0.0.4

func NewScheduleTicker(connString string) (*ScheduleTicker, error)

NewScheduleTicker creates a new ScheduleTicker from a connection string.

func (*ScheduleTicker) Close added in v0.0.4

func (t *ScheduleTicker) Close() error

Close closes the database connection (only for standalone tickers).

func (*ScheduleTicker) SetMetrics added in v0.0.4

func (t *ScheduleTicker) SetMetrics(m MetricsCollector)

SetMetrics sets the metrics collector for the schedule ticker (optional).

func (*ScheduleTicker) Start added in v0.0.4

func (t *ScheduleTicker) Start(ctx context.Context)

Start begins the schedule tick loop. It blocks until Stop() is called or ctx is cancelled.

func (*ScheduleTicker) Stop added in v0.0.4

func (t *ScheduleTicker) Stop()

Stop stops the ticker loop. Safe to call multiple times.

func (*ScheduleTicker) Tick added in v0.0.4

func (t *ScheduleTicker) Tick(ctx context.Context) error

Tick performs a single tick: finds due schedules and creates workflow_run rows. Exported for testing.

func (*ScheduleTicker) WithTickInterval added in v0.0.4

func (t *ScheduleTicker) WithTickInterval(d time.Duration) *ScheduleTicker

WithTickInterval sets how often the ticker checks for due schedules. Default: 15 seconds

type SubmitBuilder added in v0.0.3

type SubmitBuilder struct {
	// contains filtered or unexported fields
}

SubmitBuilder provides a fluent API for configuring and submitting workflow runs.

func (*SubmitBuilder) Execute added in v0.0.3

func (s *SubmitBuilder) Execute(ctx context.Context) (string, error)

Execute submits the workflow run and returns its ID. Returns empty string if idempotency key conflict (run already exists).

func (*SubmitBuilder) RunAfter added in v0.0.3

func (s *SubmitBuilder) RunAfter(t time.Time) *SubmitBuilder

RunAfter schedules the workflow to run after a specific time.

func (*SubmitBuilder) RunIn added in v0.0.3

RunIn schedules the workflow to run after a duration from now.

func (*SubmitBuilder) WithIdempotency added in v0.0.3

func (s *SubmitBuilder) WithIdempotency(key string) *SubmitBuilder

WithIdempotency sets an idempotency key for deduplication. If a workflow run with this key already exists, submission will be skipped.

func (*SubmitBuilder) WithMaxAttempts added in v0.0.3

func (s *SubmitBuilder) WithMaxAttempts(attempts int) *SubmitBuilder

WithMaxAttempts sets the maximum number of retry attempts. Default: 3

func (*SubmitBuilder) WithPriority added in v0.0.3

func (s *SubmitBuilder) WithPriority(priority int) *SubmitBuilder

WithPriority sets the priority (lower number = higher priority). Default: 100

type Workflow added in v0.0.4

type Workflow struct {
	// contains filtered or unexported fields
}

Workflow combines Client (producer) and Poller (consumer) into a single entry point for applications that both submit and process workflows.

func New added in v0.0.4

func New(connString string) (*Workflow, error)

New creates a Workflow from a connection string, sharing one DB connection between the client and poller.

func NewWithDB added in v0.0.4

func NewWithDB(db *sql.DB, dialect Dialect) *Workflow

NewWithDB creates a Workflow from an existing *sql.DB and Dialect.

func (*Workflow) AutoMigrate added in v0.0.4

func (w *Workflow) AutoMigrate(ctx context.Context) error

AutoMigrate runs the embedded DDL for the current dialect.

func (*Workflow) Cancel added in v0.0.4

func (w *Workflow) Cancel(ctx context.Context, runID string) error

Cancel marks a workflow run as cancelled.

func (*Workflow) Close added in v0.0.4

func (w *Workflow) Close() error

Close closes the database connection.

func (*Workflow) DB added in v0.0.4

func (w *Workflow) DB() *sql.DB

DB returns the underlying database connection.

func (*Workflow) DeleteSchedule added in v0.0.4

func (w *Workflow) DeleteSchedule(ctx context.Context, scheduleID string) error

DeleteSchedule soft-deletes a schedule.

func (*Workflow) Dialect added in v0.0.4

func (w *Workflow) Dialect() Dialect

Dialect returns the dialect used by this workflow.

func (*Workflow) GetWorkflowEvents added in v0.0.4

func (w *Workflow) GetWorkflowEvents(ctx context.Context, runID string) ([]WorkflowEvent, error)

GetWorkflowEvents returns the audit event log for a workflow run.

func (*Workflow) GetWorkflowRun added in v0.0.4

func (w *Workflow) GetWorkflowRun(ctx context.Context, id string) (*WorkflowRunStatus, error)

GetWorkflowRun retrieves a single workflow run by ID.

func (*Workflow) Handle added in v0.0.4

func (w *Workflow) Handle(workflowType string, executor WorkflowExecutor) *Workflow

Handle registers a WorkflowExecutor for a workflow type.

func (*Workflow) HandleFunc added in v0.0.4

func (w *Workflow) HandleFunc(workflowType string, fn func(context.Context, *WorkflowRun) (any, error)) *Workflow

HandleFunc registers a function handler for a workflow type.

func (*Workflow) ListSchedules added in v0.0.4

func (w *Workflow) ListSchedules(ctx context.Context) ([]Schedule, error)

ListSchedules returns all active (non-deleted) schedules.

func (*Workflow) ListWorkflowRuns added in v0.0.4

func (w *Workflow) ListWorkflowRuns(ctx context.Context, opts ListOptions) ([]WorkflowRunStatus, error)

ListWorkflowRuns returns workflow runs matching the given options.

func (*Workflow) PauseSchedule added in v0.0.4

func (w *Workflow) PauseSchedule(ctx context.Context, scheduleID string) error

PauseSchedule disables a schedule so it won't fire.

func (*Workflow) ResumeSchedule added in v0.0.4

func (w *Workflow) ResumeSchedule(ctx context.Context, scheduleID string) error

ResumeSchedule re-enables a paused schedule.

func (*Workflow) Schedule added in v0.0.4

func (w *Workflow) Schedule(workflowType string, payload any) *ScheduleBuilder

Schedule starts building a new workflow schedule.

func (*Workflow) Start added in v0.0.4

func (w *Workflow) Start(ctx context.Context)

Start begins polling for workflow runs. Blocks until stopped or context is cancelled.

func (*Workflow) Stop added in v0.0.4

func (w *Workflow) Stop()

Stop stops the poller. Safe to call multiple times.

func (*Workflow) Submit added in v0.0.4

func (w *Workflow) Submit(workflowType string, payload any) *SubmitBuilder

Submit creates a new workflow run with fluent configuration.

func (*Workflow) WithAutoMigrate added in v0.0.4

func (w *Workflow) WithAutoMigrate() *Workflow

WithAutoMigrate enables automatic schema migration when Start() is called.

func (*Workflow) WithLeaseDuration added in v0.0.4

func (w *Workflow) WithLeaseDuration(d time.Duration) *Workflow

WithLeaseDuration sets the lease duration for claimed workflow runs.

func (*Workflow) WithPollInterval added in v0.0.4

func (w *Workflow) WithPollInterval(d time.Duration) *Workflow

WithPollInterval sets how often to poll for new workflow runs.

func (*Workflow) WithScheduleTicker added in v0.0.4

func (w *Workflow) WithScheduleTicker() *Workflow

WithScheduleTicker enables the schedule ticker inside the poller.

func (*Workflow) WithTypePrefixes added in v0.0.4

func (w *Workflow) WithTypePrefixes(prefixes ...string) *Workflow

WithTypePrefixes explicitly sets type prefixes to watch.

func (*Workflow) WithWorkerID added in v0.0.4

func (w *Workflow) WithWorkerID(id string) *Workflow

WithWorkerID sets a custom worker ID.

type WorkflowEvent added in v0.0.4

type WorkflowEvent struct {
	ID         int64           `json:"id"`
	WorkflowID string          `json:"workflow_id"`
	EventType  string          `json:"event_type"`
	Data       json.RawMessage `json:"data,omitempty"`
	CreatedAt  time.Time       `json:"created_at"`
}

WorkflowEvent represents an audit event for a workflow run.

type WorkflowExecutor

type WorkflowExecutor interface {
	// Execute runs the workflow and returns the result or error
	Execute(ctx context.Context, run *WorkflowRun) (any, error)
}

WorkflowExecutor is implemented by users to execute specific workflows

type WorkflowRun added in v0.0.3

type WorkflowRun struct {
	ID          string
	Type        string
	Payload     []byte // JSON
	Attempt     int
	MaxAttempts int

	// Functions for long-running workflows
	Heartbeat   HeartbeatFunc         // Extend the lease to prevent timeout
	IsCancelled CancellationCheckFunc // Check if workflow has been cancelled
}

WorkflowRun represents a claimed workflow run from the database (renamed from WorkflowIntent for consistency with new naming)

type WorkflowRunStatus added in v0.0.4

type WorkflowRunStatus struct {
	ID             string     `json:"id"`
	Type           string     `json:"type"`
	Payload        []byte     `json:"payload"`
	Status         string     `json:"status"`
	Priority       int        `json:"priority"`
	RunAt          time.Time  `json:"run_at"`
	IdempotencyKey *string    `json:"idempotency_key,omitempty"`
	Attempt        int        `json:"attempt"`
	MaxAttempts    int        `json:"max_attempts"`
	LeasedBy       *string    `json:"leased_by,omitempty"`
	LeaseUntil     *time.Time `json:"lease_until,omitempty"`
	LastError      *string    `json:"last_error,omitempty"`
	Result         []byte     `json:"result,omitempty"`
	CreatedAt      time.Time  `json:"created_at"`
	UpdatedAt      time.Time  `json:"updated_at"`
}

WorkflowRunStatus is the full read model for a workflow run, with JSON tags for API responses.

Directories

Path Synopsis
cmd
server command
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL