Documentation
¶
Index ¶
- Constants
- Variables
- func ParseConnString(connString, defaultSchema string) (string, string, error)
- func RewritePlaceholders(query string) string
- type CancellationCheckFunc
- type Client
- func (c *Client) AutoMigrate(ctx context.Context) error
- func (c *Client) Cancel(ctx context.Context, runID string) error
- func (c *Client) Close() error
- func (c *Client) DB() *sql.DB
- func (c *Client) DeleteSchedule(ctx context.Context, scheduleID string) error
- func (c *Client) Dialect() Dialect
- func (c *Client) GetWorkflowEvents(ctx context.Context, runID string) ([]WorkflowEvent, error)
- func (c *Client) GetWorkflowRun(ctx context.Context, id string) (*WorkflowRunStatus, error)
- func (c *Client) ListSchedules(ctx context.Context) ([]Schedule, error)
- func (c *Client) ListWorkflowRuns(ctx context.Context, opts ListOptions) ([]WorkflowRunStatus, error)
- func (c *Client) PauseSchedule(ctx context.Context, scheduleID string) error
- func (c *Client) ResumeSchedule(ctx context.Context, scheduleID string) error
- func (c *Client) Schedule(workflowType string, payload any) *ScheduleBuilder
- func (c *Client) Submit(workflowType string, payload any) *SubmitBuilder
- type Dialect
- type DueSchedule
- type HeartbeatFunc
- type Intent
- type ListOptions
- type MetricsCollector
- type Poller
- func (p *Poller) AutoMigrate(ctx context.Context) error
- func (p *Poller) Close() error
- func (p *Poller) Handle(workflowType string, executor WorkflowExecutor) *Poller
- func (p *Poller) HandleFunc(workflowType string, fn func(context.Context, *WorkflowRun) (any, error)) *Poller
- func (p *Poller) SetMetrics(m MetricsCollector)
- func (p *Poller) Start(ctx context.Context)
- func (p *Poller) Stop()
- func (p *Poller) WithAutoMigrate() *Poller
- func (p *Poller) WithLeaseDuration(d time.Duration) *Poller
- func (p *Poller) WithPollInterval(d time.Duration) *Poller
- func (p *Poller) WithScheduleTickInterval(d time.Duration) *Poller
- func (p *Poller) WithScheduleTicker() *Poller
- func (p *Poller) WithTypePrefixes(prefixes ...string) *Poller
- func (p *Poller) WithWorkerID(id string) *Poller
- type PollerConfig
- type PostgresDialect
- func (d *PostgresDialect) ClaimRunQuery(typeCondition string, leaseSec int) string
- func (d *PostgresDialect) ClaimSchedulesQuery() string
- func (d *PostgresDialect) DriverName() string
- func (d *PostgresDialect) IntervalParam(n int, seconds int) (string, any)
- func (d *PostgresDialect) MigrateSQL() string
- func (d *PostgresDialect) Now() string
- func (d *PostgresDialect) OpenDB(dsn string) (*sql.DB, error)
- func (d *PostgresDialect) Placeholder(n int) string
- func (d *PostgresDialect) TimestampAfterNow(seconds int) string
- type PrometheusMetrics
- func (m *PrometheusMetrics) RecordFailedAttempt(workflowType, workerID string, attemptNumber int)
- func (m *PrometheusMetrics) RecordIntentClaimed(workflowType, workerID string)
- func (m *PrometheusMetrics) RecordIntentCompleted(workflowType, workerID, status string, duration time.Duration)
- func (m *PrometheusMetrics) RecordIntentDeadletter(workflowType, workerID string)
- func (m *PrometheusMetrics) RecordPollCycle(workerID string)
- func (m *PrometheusMetrics) RecordPollError(workerID string, errorType string)
- func (m *PrometheusMetrics) RecordQueueDepth(workflowType string, status string, depth int)
- type RunRepository
- func (r *RunRepository) Cancel(ctx context.Context, runID string) error
- func (r *RunRepository) CheckCancelled(ctx context.Context, runID string) (bool, error)
- func (r *RunRepository) Claim(ctx context.Context, workerID string, typePrefixes []string, ...) (*WorkflowRun, error)
- func (r *RunRepository) CountPending(ctx context.Context, typePrefix string) (int, error)
- func (r *RunRepository) Create(ctx context.Context, intent Intent) (string, error)
- func (r *RunRepository) CreateFromSchedule(ctx context.Context, typ string, payload json.RawMessage, ...) (string, error)
- func (r *RunRepository) ExtendLease(ctx context.Context, runID string, duration time.Duration) error
- func (r *RunRepository) Get(ctx context.Context, id string) (*WorkflowRunStatus, error)
- func (r *RunRepository) GetEvents(ctx context.Context, runID string) ([]WorkflowEvent, error)
- func (r *RunRepository) List(ctx context.Context, opts ListOptions) ([]WorkflowRunStatus, error)
- func (r *RunRepository) MarkFailed(ctx context.Context, run *WorkflowRun, execErr error) string
- func (r *RunRepository) MarkSucceeded(ctx context.Context, runID string, result any) error
- func (r *RunRepository) WithTx(tx *sql.Tx) *RunRepository
- type SQLiteDialect
- func (d *SQLiteDialect) ClaimRunQuery(typeCondition string, leaseSec int) string
- func (d *SQLiteDialect) ClaimSchedulesQuery() string
- func (d *SQLiteDialect) DriverName() string
- func (d *SQLiteDialect) IntervalParam(n int, seconds int) (string, any)
- func (d *SQLiteDialect) MigrateSQL() string
- func (d *SQLiteDialect) Now() string
- func (d *SQLiteDialect) OpenDB(dsn string) (*sql.DB, error)
- func (d *SQLiteDialect) Placeholder(n int) string
- func (d *SQLiteDialect) TimestampAfterNow(seconds int) string
- type Schedule
- type ScheduleBuilder
- func (s *ScheduleBuilder) Create(ctx context.Context) (string, error)
- func (s *ScheduleBuilder) Cron(expr string) *ScheduleBuilder
- func (s *ScheduleBuilder) InTimezone(tz string) *ScheduleBuilder
- func (s *ScheduleBuilder) WithMaxAttempts(n int) *ScheduleBuilder
- func (s *ScheduleBuilder) WithPriority(p int) *ScheduleBuilder
- type ScheduleRepository
- func (r *ScheduleRepository) AdvanceNextRun(ctx context.Context, scheduleID string, nextRun time.Time) error
- func (r *ScheduleRepository) ClaimDue(ctx context.Context) ([]DueSchedule, error)
- func (r *ScheduleRepository) Create(ctx context.Context, workflowType, cronExpr, timezone string, payload any, ...) (string, error)
- func (r *ScheduleRepository) List(ctx context.Context) ([]Schedule, error)
- func (r *ScheduleRepository) SetEnabled(ctx context.Context, scheduleID string, enabled bool) error
- func (r *ScheduleRepository) SoftDelete(ctx context.Context, scheduleID string) error
- func (r *ScheduleRepository) WithTx(tx *sql.Tx) *ScheduleRepository
- type ScheduleTicker
- func (t *ScheduleTicker) Close() error
- func (t *ScheduleTicker) SetMetrics(m MetricsCollector)
- func (t *ScheduleTicker) Start(ctx context.Context)
- func (t *ScheduleTicker) Stop()
- func (t *ScheduleTicker) Tick(ctx context.Context) error
- func (t *ScheduleTicker) WithTickInterval(d time.Duration) *ScheduleTicker
- type SubmitBuilder
- func (s *SubmitBuilder) Execute(ctx context.Context) (string, error)
- func (s *SubmitBuilder) RunAfter(t time.Time) *SubmitBuilder
- func (s *SubmitBuilder) RunIn(d time.Duration) *SubmitBuilder
- func (s *SubmitBuilder) WithIdempotency(key string) *SubmitBuilder
- func (s *SubmitBuilder) WithMaxAttempts(attempts int) *SubmitBuilder
- func (s *SubmitBuilder) WithPriority(priority int) *SubmitBuilder
- type Workflow
- func (w *Workflow) AutoMigrate(ctx context.Context) error
- func (w *Workflow) Cancel(ctx context.Context, runID string) error
- func (w *Workflow) Close() error
- func (w *Workflow) DB() *sql.DB
- func (w *Workflow) DeleteSchedule(ctx context.Context, scheduleID string) error
- func (w *Workflow) Dialect() Dialect
- func (w *Workflow) GetWorkflowEvents(ctx context.Context, runID string) ([]WorkflowEvent, error)
- func (w *Workflow) GetWorkflowRun(ctx context.Context, id string) (*WorkflowRunStatus, error)
- func (w *Workflow) Handle(workflowType string, executor WorkflowExecutor) *Workflow
- func (w *Workflow) HandleFunc(workflowType string, fn func(context.Context, *WorkflowRun) (any, error)) *Workflow
- func (w *Workflow) ListSchedules(ctx context.Context) ([]Schedule, error)
- func (w *Workflow) ListWorkflowRuns(ctx context.Context, opts ListOptions) ([]WorkflowRunStatus, error)
- func (w *Workflow) PauseSchedule(ctx context.Context, scheduleID string) error
- func (w *Workflow) ResumeSchedule(ctx context.Context, scheduleID string) error
- func (w *Workflow) Schedule(workflowType string, payload any) *ScheduleBuilder
- func (w *Workflow) Start(ctx context.Context)
- func (w *Workflow) Stop()
- func (w *Workflow) Submit(workflowType string, payload any) *SubmitBuilder
- func (w *Workflow) WithAutoMigrate() *Workflow
- func (w *Workflow) WithLeaseDuration(d time.Duration) *Workflow
- func (w *Workflow) WithPollInterval(d time.Duration) *Workflow
- func (w *Workflow) WithScheduleTicker() *Workflow
- func (w *Workflow) WithTypePrefixes(prefixes ...string) *Workflow
- func (w *Workflow) WithWorkerID(id string) *Workflow
- type WorkflowEvent
- type WorkflowExecutor
- type WorkflowRun
- type WorkflowRunStatus
Constants ¶
const DefaultSchema = "workflow"
DefaultSchema is the default schema name if not specified
Variables ¶
var ( // ErrNotFound is returned when a resource does not exist or is not in the expected state. ErrNotFound = errors.New("not found") // ErrAlreadyComplete is returned when a workflow run cannot be cancelled because it is already completed. ErrAlreadyComplete = errors.New("workflow run not found or already completed") )
Functions ¶
func ParseConnString ¶ added in v0.0.3
ParseConnString extracts schema information from a connection string and returns a modified connection string with search_path parameter.
Supports multiple formats:
- postgres://user:pass@host/db?schema=myschema
- postgres://user:pass@host/db?search_path=myschema
- postgres://user:pass@host/db (uses defaultSchema)
Returns: (modifiedConnString, schemaName, error)
func RewritePlaceholders ¶ added in v0.0.4
RewritePlaceholders converts PostgreSQL-style $N placeholders to ? placeholders. Example: "WHERE id = $1 AND status = $2" → "WHERE id = ? AND status = ?"
Types ¶
type CancellationCheckFunc ¶ added in v0.0.3
CancellationCheckFunc checks if the workflow run has been cancelled Returns true if cancelled, false otherwise
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client manages workflow runs in the database (Producer API)
func NewClient ¶
NewClient creates a new client from a connection string. The connection string determines the dialect (PostgreSQL or SQLite).
Examples:
- NewClient("postgres://user:pass@localhost/db?schema=workflow")
- NewClient("sqlite:///path/to/db.sqlite")
- NewClient("sqlite://:memory:")
The client will manage the database connection and Close() must be called.
func NewClientWithDB ¶ added in v0.0.4
NewClientWithDB creates a new client from an existing *sql.DB and Dialect. Useful for testing or when you manage the connection yourself.
func (*Client) AutoMigrate ¶ added in v0.0.4
AutoMigrate runs the embedded DDL for the current dialect. This creates all required tables if they don't exist. For PostgreSQL users who prefer goose, this is optional. For SQLite users, this is the recommended way to set up the schema.
func (*Client) Cancel ¶ added in v0.0.3
Cancel marks a workflow run as cancelled (cooperative cancellation)
func (*Client) DB ¶ added in v0.0.4
DB returns the underlying database connection. Useful for the REST API layer that needs to pass the db to handler functions.
func (*Client) DeleteSchedule ¶ added in v0.0.4
DeleteSchedule soft-deletes a schedule.
func (*Client) GetWorkflowEvents ¶ added in v0.0.4
GetWorkflowEvents returns the audit event log for a workflow run.
func (*Client) GetWorkflowRun ¶ added in v0.0.4
GetWorkflowRun retrieves a single workflow run by ID.
func (*Client) ListSchedules ¶ added in v0.0.4
ListSchedules returns all active (non-deleted) schedules.
func (*Client) ListWorkflowRuns ¶ added in v0.0.4
func (c *Client) ListWorkflowRuns(ctx context.Context, opts ListOptions) ([]WorkflowRunStatus, error)
ListWorkflowRuns returns workflow runs matching the given options.
func (*Client) PauseSchedule ¶ added in v0.0.4
PauseSchedule disables a schedule so it won't fire.
func (*Client) ResumeSchedule ¶ added in v0.0.4
ResumeSchedule re-enables a paused schedule.
func (*Client) Schedule ¶ added in v0.0.4
func (c *Client) Schedule(workflowType string, payload any) *ScheduleBuilder
Schedule starts building a new workflow schedule.
Example:
id, err := client.Schedule("report.daily.v1", payload).
Cron("0 9 * * 1").
InTimezone("America/New_York").
Create(ctx)
func (*Client) Submit ¶ added in v0.0.3
func (c *Client) Submit(workflowType string, payload any) *SubmitBuilder
Submit creates a new workflow run with fluent configuration. Returns a SubmitBuilder for chaining configuration methods.
Example:
runID, err := client.Submit("billing.invoice.v1", payload).
WithIdempotency("invoice:123").
WithPriority(10).
Execute(ctx)
type Dialect ¶ added in v0.0.4
type Dialect interface {
// DriverName returns the database/sql driver name (e.g. "postgres", "sqlite").
DriverName() string
// Placeholder returns a positional parameter placeholder.
// For PostgreSQL: Placeholder(1) → "$1"
// For SQLite: Placeholder(1) → "?"
Placeholder(n int) string
// Now returns the SQL expression for the current timestamp.
// PostgreSQL: "NOW()"
// SQLite: "datetime('now')"
Now() string
// TimestampAfterNow returns a SQL expression for NOW() + seconds.
// PostgreSQL: "NOW() + $1::interval" (with value "N seconds")
// SQLite: "datetime('now', '+N seconds')"
// The literal parameter indicates whether to embed the value literally
// or use a placeholder.
TimestampAfterNow(seconds int) string
// IntervalParam returns the SQL fragment and argument for adding a duration.
// PostgreSQL: returns ("$N::interval", "30 seconds")
// SQLite: not used (TimestampAfterNow embeds the value)
IntervalParam(n int, seconds int) (sqlFragment string, arg any)
// ClaimRunQuery returns the SQL for atomically claiming a workflow run.
// typeCondition is an already-formatted "AND (type LIKE ...)" fragment.
// leaseSec is the lease duration in seconds.
// For PostgreSQL, caller provides args: [workerID, intervalString, ...typePrefixes].
// For SQLite, caller provides args: [workerID, ...typePrefixes] (lease embedded in SQL).
ClaimRunQuery(typeCondition string, leaseSec int) string
// ClaimSchedulesQuery returns the SQL for claiming due schedules.
ClaimSchedulesQuery() string
// MigrateSQL returns the DDL statements for creating all tables.
MigrateSQL() string
// OpenDB opens a database connection with dialect-specific settings.
OpenDB(dsn string) (*sql.DB, error)
}
Dialect abstracts SQL differences between PostgreSQL and SQLite. All dialect-specific SQL generation flows through this interface.
func DetectDialect ¶ added in v0.0.4
DetectDialect examines a connection string and returns the appropriate Dialect, the (possibly modified) DSN to pass to the driver, and any error.
Supported schemes:
- postgres:// or postgresql:// → PostgresDialect (DSN includes search_path)
- sqlite:// or sqlite3:// → SQLiteDialect (DSN is the file path with query params)
- key=value format (no scheme) → PostgresDialect (legacy PostgreSQL format)
type DueSchedule ¶ added in v0.0.4
type DueSchedule struct {
ID string
Type string
Payload json.RawMessage
CronExpr string
Timezone string
NextRunAt time.Time
Priority int
MaxAttempts int
}
DueSchedule holds the data for a schedule that is due to fire.
type HeartbeatFunc ¶ added in v0.0.3
HeartbeatFunc extends the lease on a workflow run duration: how long to extend the lease by
type Intent ¶
type Intent struct {
Type string // e.g. "content.thumbnail.v1" (renamed from Name for semantic clarity)
Payload any // Will be JSON-encoded
Priority int // Lower executes first (default: 100)
RunAfter time.Time // Schedule for future (default: now)
IdempotencyKey string // Optional: deduplication key
MaxAttempts int // Default: 3
}
Intent represents a workflow intent to be executed
type ListOptions ¶ added in v0.0.4
type ListOptions struct {
Type string // Filter by workflow type (exact match)
Status string // Filter by status
IdempotencyKey string // Filter by idempotency key (exact match)
Limit int // Max results (default: 50)
Offset int // Pagination offset
}
ListOptions configures filtering and pagination for listing workflow runs.
type MetricsCollector ¶ added in v0.0.2
type MetricsCollector interface {
// RecordIntentClaimed tracks when a workflow run is claimed by a worker
RecordIntentClaimed(workflowType, workerID string)
// RecordIntentCompleted tracks workflow completion with status and duration
// Status can be: "succeeded", "failed", "cancelled"
RecordIntentCompleted(workflowType, workerID, status string, duration time.Duration)
// RecordIntentDeadletter tracks workflows that permanently failed (PRIORITY METRIC)
RecordIntentDeadletter(workflowType, workerID string)
// RecordFailedAttempt tracks individual workflow execution failures before permanent failure
RecordFailedAttempt(workflowType, workerID string, attemptNumber int)
// RecordPollCycle tracks polling activity
RecordPollCycle(workerID string)
// RecordPollError tracks polling errors by type
RecordPollError(workerID string, errorType string)
// RecordQueueDepth updates the current queue depth gauge
RecordQueueDepth(workflowType string, status string, depth int)
}
MetricsCollector interface allows optional metrics collection Implementations can track workflow execution metrics for observability
type Poller ¶
type Poller struct {
// contains filtered or unexported fields
}
Poller polls workflow_run table and executes workflows (Worker API)
func NewPoller ¶
NewPoller creates a new workflow run poller from a PostgreSQL connection string. Type prefixes are auto-detected from registered handlers. Use fluent methods to configure: WithWorkerID(), WithLeaseDuration(), etc.
Example:
poller, err := NewPoller("postgres://user:pass@localhost/db?schema=workflow")
poller.HandleFunc("billing.invoice.v1", handler)
poller.Start(ctx)
func NewPollerWithDB ¶ added in v0.0.4
NewPollerWithDB creates a new workflow run poller from an existing *sql.DB and Dialect. Useful for testing or when you manage the connection yourself.
func (*Poller) AutoMigrate ¶ added in v0.0.4
AutoMigrate runs the embedded DDL for the current dialect. This creates all required tables if they don't exist.
func (*Poller) Handle ¶ added in v0.0.3
func (p *Poller) Handle(workflowType string, executor WorkflowExecutor) *Poller
Handle registers a WorkflowExecutor for a workflow type. Use this for advanced cases where you need a stateful executor.
func (*Poller) HandleFunc ¶ added in v0.0.3
func (p *Poller) HandleFunc(workflowType string, fn func(context.Context, *WorkflowRun) (any, error)) *Poller
HandleFunc registers a function handler for a workflow type. The function receives the WorkflowRun and returns a result or error.
Example:
poller.HandleFunc("billing.invoice.v1", func(ctx context.Context, run *WorkflowRun) (any, error) {
// Process invoice
return result, nil
})
func (*Poller) SetMetrics ¶ added in v0.0.2
func (p *Poller) SetMetrics(m MetricsCollector)
SetMetrics sets the metrics collector (optional, pass nil to disable metrics)
func (*Poller) WithAutoMigrate ¶ added in v0.0.4
WithAutoMigrate enables automatic schema migration when Start() is called.
func (*Poller) WithLeaseDuration ¶ added in v0.0.3
WithLeaseDuration sets the lease duration for claimed workflow runs. Default: 30 seconds
func (*Poller) WithPollInterval ¶ added in v0.0.3
WithPollInterval sets how often to poll for new workflow runs. Default: 2 seconds
func (*Poller) WithScheduleTickInterval ¶ added in v0.0.4
WithScheduleTickInterval sets the tick interval for the embedded schedule ticker. Default: 15 seconds. Only effective if WithScheduleTicker() is also called.
func (*Poller) WithScheduleTicker ¶ added in v0.0.4
WithScheduleTicker enables the schedule ticker inside the poller. The ticker converts due workflow_schedule rows into workflow_run rows.
func (*Poller) WithTypePrefixes ¶ added in v0.0.3
WithTypePrefixes explicitly sets type prefixes to watch. This overrides auto-detection from registered handlers. Example: WithTypePrefixes("billing.%", "notify.%")
func (*Poller) WithWorkerID ¶ added in v0.0.3
WithWorkerID sets a custom worker ID. Default: hostname-pid
type PollerConfig ¶ added in v0.0.3
type PollerConfig struct {
TypePrefixes []string // e.g. ["billing.%", "media.%"] for type-prefix routing
LeaseDuration time.Duration // Lease duration (default: 30s)
PollInterval time.Duration // Poll interval (default: 2s)
WorkerID string // Worker identifier (default: "go-worker")
}
PollerConfig configures the workflow poller
type PostgresDialect ¶ added in v0.0.4
type PostgresDialect struct{}
PostgresDialect implements Dialect for PostgreSQL.
func (*PostgresDialect) ClaimRunQuery ¶ added in v0.0.4
func (d *PostgresDialect) ClaimRunQuery(typeCondition string, leaseSec int) string
func (*PostgresDialect) ClaimSchedulesQuery ¶ added in v0.0.4
func (d *PostgresDialect) ClaimSchedulesQuery() string
func (*PostgresDialect) DriverName ¶ added in v0.0.4
func (d *PostgresDialect) DriverName() string
func (*PostgresDialect) IntervalParam ¶ added in v0.0.4
func (d *PostgresDialect) IntervalParam(n int, seconds int) (string, any)
func (*PostgresDialect) MigrateSQL ¶ added in v0.0.4
func (d *PostgresDialect) MigrateSQL() string
func (*PostgresDialect) Now ¶ added in v0.0.4
func (d *PostgresDialect) Now() string
func (*PostgresDialect) OpenDB ¶ added in v0.0.4
func (d *PostgresDialect) OpenDB(dsn string) (*sql.DB, error)
func (*PostgresDialect) Placeholder ¶ added in v0.0.4
func (d *PostgresDialect) Placeholder(n int) string
func (*PostgresDialect) TimestampAfterNow ¶ added in v0.0.4
func (d *PostgresDialect) TimestampAfterNow(seconds int) string
type PrometheusMetrics ¶ added in v0.0.2
type PrometheusMetrics struct {
// contains filtered or unexported fields
}
PrometheusMetrics implements MetricsCollector using Prometheus
func NewPrometheusMetrics ¶ added in v0.0.2
func NewPrometheusMetrics(registry prometheus.Registerer) *PrometheusMetrics
NewPrometheusMetrics creates a new Prometheus metrics collector Pass nil for registry to use the default Prometheus registry
func (*PrometheusMetrics) RecordFailedAttempt ¶ added in v0.0.2
func (m *PrometheusMetrics) RecordFailedAttempt(workflowType, workerID string, attemptNumber int)
RecordFailedAttempt tracks individual failure attempts
func (*PrometheusMetrics) RecordIntentClaimed ¶ added in v0.0.2
func (m *PrometheusMetrics) RecordIntentClaimed(workflowType, workerID string)
RecordIntentClaimed increments the claimed counter
func (*PrometheusMetrics) RecordIntentCompleted ¶ added in v0.0.2
func (m *PrometheusMetrics) RecordIntentCompleted(workflowType, workerID, status string, duration time.Duration)
RecordIntentCompleted tracks workflow completion with status and duration
func (*PrometheusMetrics) RecordIntentDeadletter ¶ added in v0.0.2
func (m *PrometheusMetrics) RecordIntentDeadletter(workflowType, workerID string)
RecordIntentDeadletter increments the failed counter (CRITICAL for alerts)
func (*PrometheusMetrics) RecordPollCycle ¶ added in v0.0.2
func (m *PrometheusMetrics) RecordPollCycle(workerID string)
RecordPollCycle increments poll cycle counter and updates worker health gauges
func (*PrometheusMetrics) RecordPollError ¶ added in v0.0.2
func (m *PrometheusMetrics) RecordPollError(workerID string, errorType string)
RecordPollError increments poll error counter
func (*PrometheusMetrics) RecordQueueDepth ¶ added in v0.0.2
func (m *PrometheusMetrics) RecordQueueDepth(workflowType string, status string, depth int)
RecordQueueDepth sets the current queue depth gauge
type RunRepository ¶ added in v0.0.4
type RunRepository struct {
// contains filtered or unexported fields
}
RunRepository encapsulates all data access for the workflow_run table.
func NewRunRepository ¶ added in v0.0.4
func NewRunRepository(db *sql.DB, dialect Dialect) *RunRepository
NewRunRepository creates a RunRepository from a db and dialect.
func (*RunRepository) Cancel ¶ added in v0.0.4
func (r *RunRepository) Cancel(ctx context.Context, runID string) error
Cancel marks a workflow run as cancelled.
func (*RunRepository) CheckCancelled ¶ added in v0.0.4
CheckCancelled checks if a workflow run has been cancelled.
func (*RunRepository) Claim ¶ added in v0.0.4
func (r *RunRepository) Claim(ctx context.Context, workerID string, typePrefixes []string, leaseDuration time.Duration) (*WorkflowRun, error)
Claim atomically claims a pending workflow run. Returns nil if no work is available.
func (*RunRepository) CountPending ¶ added in v0.0.4
CountPending counts pending runs matching a type prefix.
func (*RunRepository) Create ¶ added in v0.0.4
Create inserts a new workflow run and returns the ID. Returns "" if idempotency conflict.
func (*RunRepository) CreateFromSchedule ¶ added in v0.0.4
func (r *RunRepository) CreateFromSchedule(ctx context.Context, typ string, payload json.RawMessage, priority, maxAttempts int, idempotencyKey string) (string, error)
CreateFromSchedule inserts a workflow run from a schedule (within a transaction). Returns the created run ID, or "" if deduplicated by idempotency key.
func (*RunRepository) ExtendLease ¶ added in v0.0.4
func (r *RunRepository) ExtendLease(ctx context.Context, runID string, duration time.Duration) error
ExtendLease extends the lease on a workflow run.
func (*RunRepository) Get ¶ added in v0.0.4
func (r *RunRepository) Get(ctx context.Context, id string) (*WorkflowRunStatus, error)
Get retrieves a single workflow run by ID.
func (*RunRepository) GetEvents ¶ added in v0.0.4
func (r *RunRepository) GetEvents(ctx context.Context, runID string) ([]WorkflowEvent, error)
GetEvents returns all audit events for a workflow run, ordered by creation time.
func (*RunRepository) List ¶ added in v0.0.4
func (r *RunRepository) List(ctx context.Context, opts ListOptions) ([]WorkflowRunStatus, error)
List returns workflow runs matching the given options.
func (*RunRepository) MarkFailed ¶ added in v0.0.4
func (r *RunRepository) MarkFailed(ctx context.Context, run *WorkflowRun, execErr error) string
MarkFailed marks a workflow run as failed or retries it. Returns the new status.
func (*RunRepository) MarkSucceeded ¶ added in v0.0.4
MarkSucceeded marks a workflow run as succeeded with the given result.
func (*RunRepository) WithTx ¶ added in v0.0.4
func (r *RunRepository) WithTx(tx *sql.Tx) *RunRepository
WithTx returns a copy of the repository that executes queries within the given transaction.
type SQLiteDialect ¶ added in v0.0.4
type SQLiteDialect struct{}
SQLiteDialect implements Dialect for SQLite (via modernc.org/sqlite).
func (*SQLiteDialect) ClaimRunQuery ¶ added in v0.0.4
func (d *SQLiteDialect) ClaimRunQuery(typeCondition string, leaseSec int) string
func (*SQLiteDialect) ClaimSchedulesQuery ¶ added in v0.0.4
func (d *SQLiteDialect) ClaimSchedulesQuery() string
func (*SQLiteDialect) DriverName ¶ added in v0.0.4
func (d *SQLiteDialect) DriverName() string
func (*SQLiteDialect) IntervalParam ¶ added in v0.0.4
func (d *SQLiteDialect) IntervalParam(n int, seconds int) (string, any)
func (*SQLiteDialect) MigrateSQL ¶ added in v0.0.4
func (d *SQLiteDialect) MigrateSQL() string
func (*SQLiteDialect) Now ¶ added in v0.0.4
func (d *SQLiteDialect) Now() string
func (*SQLiteDialect) OpenDB ¶ added in v0.0.4
func (d *SQLiteDialect) OpenDB(dsn string) (*sql.DB, error)
func (*SQLiteDialect) Placeholder ¶ added in v0.0.4
func (d *SQLiteDialect) Placeholder(n int) string
func (*SQLiteDialect) TimestampAfterNow ¶ added in v0.0.4
func (d *SQLiteDialect) TimestampAfterNow(seconds int) string
type Schedule ¶ added in v0.0.4
type Schedule struct {
ID string
Type string
Payload any
CronExpr string
Timezone string
NextRunAt time.Time
LastRunAt *time.Time
Enabled bool
Priority int
MaxAttempts int
}
Schedule represents a recurring workflow schedule.
type ScheduleBuilder ¶ added in v0.0.4
type ScheduleBuilder struct {
// contains filtered or unexported fields
}
ScheduleBuilder provides a fluent API for creating workflow schedules.
func (*ScheduleBuilder) Create ¶ added in v0.0.4
func (s *ScheduleBuilder) Create(ctx context.Context) (string, error)
Create validates the cron expression, computes the next run time, and inserts the schedule. Returns the schedule ID.
func (*ScheduleBuilder) Cron ¶ added in v0.0.4
func (s *ScheduleBuilder) Cron(expr string) *ScheduleBuilder
Cron sets the cron expression (5-field standard format). Examples: "0 9 * * 1" (every Monday at 9am), "*/5 * * * *" (every 5 minutes)
func (*ScheduleBuilder) InTimezone ¶ added in v0.0.4
func (s *ScheduleBuilder) InTimezone(tz string) *ScheduleBuilder
InTimezone sets the IANA timezone for the schedule. Default: "UTC"
func (*ScheduleBuilder) WithMaxAttempts ¶ added in v0.0.4
func (s *ScheduleBuilder) WithMaxAttempts(n int) *ScheduleBuilder
WithMaxAttempts sets the max attempts inherited by created runs. Default: 3
func (*ScheduleBuilder) WithPriority ¶ added in v0.0.4
func (s *ScheduleBuilder) WithPriority(p int) *ScheduleBuilder
WithPriority sets the priority inherited by created runs. Default: 100
type ScheduleRepository ¶ added in v0.0.4
type ScheduleRepository struct {
// contains filtered or unexported fields
}
ScheduleRepository encapsulates all data access for the workflow_schedule table.
func NewScheduleRepository ¶ added in v0.0.4
func NewScheduleRepository(db *sql.DB, dialect Dialect) *ScheduleRepository
NewScheduleRepository creates a ScheduleRepository from a db and dialect.
func (*ScheduleRepository) AdvanceNextRun ¶ added in v0.0.4
func (r *ScheduleRepository) AdvanceNextRun(ctx context.Context, scheduleID string, nextRun time.Time) error
AdvanceNextRun updates a schedule's next run time and last run time.
func (*ScheduleRepository) ClaimDue ¶ added in v0.0.4
func (r *ScheduleRepository) ClaimDue(ctx context.Context) ([]DueSchedule, error)
ClaimDue returns schedules that are due to fire (within a transaction).
func (*ScheduleRepository) Create ¶ added in v0.0.4
func (r *ScheduleRepository) Create(ctx context.Context, workflowType, cronExpr, timezone string, payload any, priority, maxAttempts int) (string, error)
Create validates and inserts a new schedule. Returns the schedule ID.
func (*ScheduleRepository) List ¶ added in v0.0.4
func (r *ScheduleRepository) List(ctx context.Context) ([]Schedule, error)
List returns all active (non-deleted) schedules.
func (*ScheduleRepository) SetEnabled ¶ added in v0.0.4
SetEnabled enables or disables a schedule.
func (*ScheduleRepository) SoftDelete ¶ added in v0.0.4
func (r *ScheduleRepository) SoftDelete(ctx context.Context, scheduleID string) error
SoftDelete soft-deletes a schedule.
func (*ScheduleRepository) WithTx ¶ added in v0.0.4
func (r *ScheduleRepository) WithTx(tx *sql.Tx) *ScheduleRepository
WithTx returns a copy of the repository that executes queries within the given transaction.
type ScheduleTicker ¶ added in v0.0.4
type ScheduleTicker struct {
// contains filtered or unexported fields
}
ScheduleTicker converts due schedules into workflow_run rows. It can run standalone or be embedded in a Poller via WithScheduleTicker().
func NewScheduleTicker ¶ added in v0.0.4
func NewScheduleTicker(connString string) (*ScheduleTicker, error)
NewScheduleTicker creates a new ScheduleTicker from a connection string.
func (*ScheduleTicker) Close ¶ added in v0.0.4
func (t *ScheduleTicker) Close() error
Close closes the database connection (only for standalone tickers).
func (*ScheduleTicker) SetMetrics ¶ added in v0.0.4
func (t *ScheduleTicker) SetMetrics(m MetricsCollector)
SetMetrics sets the metrics collector for the schedule ticker (optional).
func (*ScheduleTicker) Start ¶ added in v0.0.4
func (t *ScheduleTicker) Start(ctx context.Context)
Start begins the schedule tick loop. It blocks until Stop() is called or ctx is cancelled.
func (*ScheduleTicker) Stop ¶ added in v0.0.4
func (t *ScheduleTicker) Stop()
Stop stops the ticker loop. Safe to call multiple times.
func (*ScheduleTicker) Tick ¶ added in v0.0.4
func (t *ScheduleTicker) Tick(ctx context.Context) error
Tick performs a single tick: finds due schedules and creates workflow_run rows. Exported for testing.
func (*ScheduleTicker) WithTickInterval ¶ added in v0.0.4
func (t *ScheduleTicker) WithTickInterval(d time.Duration) *ScheduleTicker
WithTickInterval sets how often the ticker checks for due schedules. Default: 15 seconds
type SubmitBuilder ¶ added in v0.0.3
type SubmitBuilder struct {
// contains filtered or unexported fields
}
SubmitBuilder provides a fluent API for configuring and submitting workflow runs.
func (*SubmitBuilder) Execute ¶ added in v0.0.3
func (s *SubmitBuilder) Execute(ctx context.Context) (string, error)
Execute submits the workflow run and returns its ID. Returns empty string if idempotency key conflict (run already exists).
func (*SubmitBuilder) RunAfter ¶ added in v0.0.3
func (s *SubmitBuilder) RunAfter(t time.Time) *SubmitBuilder
RunAfter schedules the workflow to run after a specific time.
func (*SubmitBuilder) RunIn ¶ added in v0.0.3
func (s *SubmitBuilder) RunIn(d time.Duration) *SubmitBuilder
RunIn schedules the workflow to run after a duration from now.
func (*SubmitBuilder) WithIdempotency ¶ added in v0.0.3
func (s *SubmitBuilder) WithIdempotency(key string) *SubmitBuilder
WithIdempotency sets an idempotency key for deduplication. If a workflow run with this key already exists, submission will be skipped.
func (*SubmitBuilder) WithMaxAttempts ¶ added in v0.0.3
func (s *SubmitBuilder) WithMaxAttempts(attempts int) *SubmitBuilder
WithMaxAttempts sets the maximum number of retry attempts. Default: 3
func (*SubmitBuilder) WithPriority ¶ added in v0.0.3
func (s *SubmitBuilder) WithPriority(priority int) *SubmitBuilder
WithPriority sets the priority (lower number = higher priority). Default: 100
type Workflow ¶ added in v0.0.4
type Workflow struct {
// contains filtered or unexported fields
}
Workflow combines Client (producer) and Poller (consumer) into a single entry point for applications that both submit and process workflows.
func New ¶ added in v0.0.4
New creates a Workflow from a connection string, sharing one DB connection between the client and poller.
func (*Workflow) AutoMigrate ¶ added in v0.0.4
AutoMigrate runs the embedded DDL for the current dialect.
func (*Workflow) DeleteSchedule ¶ added in v0.0.4
DeleteSchedule soft-deletes a schedule.
func (*Workflow) GetWorkflowEvents ¶ added in v0.0.4
GetWorkflowEvents returns the audit event log for a workflow run.
func (*Workflow) GetWorkflowRun ¶ added in v0.0.4
GetWorkflowRun retrieves a single workflow run by ID.
func (*Workflow) Handle ¶ added in v0.0.4
func (w *Workflow) Handle(workflowType string, executor WorkflowExecutor) *Workflow
Handle registers a WorkflowExecutor for a workflow type.
func (*Workflow) HandleFunc ¶ added in v0.0.4
func (w *Workflow) HandleFunc(workflowType string, fn func(context.Context, *WorkflowRun) (any, error)) *Workflow
HandleFunc registers a function handler for a workflow type.
func (*Workflow) ListSchedules ¶ added in v0.0.4
ListSchedules returns all active (non-deleted) schedules.
func (*Workflow) ListWorkflowRuns ¶ added in v0.0.4
func (w *Workflow) ListWorkflowRuns(ctx context.Context, opts ListOptions) ([]WorkflowRunStatus, error)
ListWorkflowRuns returns workflow runs matching the given options.
func (*Workflow) PauseSchedule ¶ added in v0.0.4
PauseSchedule disables a schedule so it won't fire.
func (*Workflow) ResumeSchedule ¶ added in v0.0.4
ResumeSchedule re-enables a paused schedule.
func (*Workflow) Schedule ¶ added in v0.0.4
func (w *Workflow) Schedule(workflowType string, payload any) *ScheduleBuilder
Schedule starts building a new workflow schedule.
func (*Workflow) Start ¶ added in v0.0.4
Start begins polling for workflow runs. Blocks until stopped or context is cancelled.
func (*Workflow) Stop ¶ added in v0.0.4
func (w *Workflow) Stop()
Stop stops the poller. Safe to call multiple times.
func (*Workflow) Submit ¶ added in v0.0.4
func (w *Workflow) Submit(workflowType string, payload any) *SubmitBuilder
Submit creates a new workflow run with fluent configuration.
func (*Workflow) WithAutoMigrate ¶ added in v0.0.4
WithAutoMigrate enables automatic schema migration when Start() is called.
func (*Workflow) WithLeaseDuration ¶ added in v0.0.4
WithLeaseDuration sets the lease duration for claimed workflow runs.
func (*Workflow) WithPollInterval ¶ added in v0.0.4
WithPollInterval sets how often to poll for new workflow runs.
func (*Workflow) WithScheduleTicker ¶ added in v0.0.4
WithScheduleTicker enables the schedule ticker inside the poller.
func (*Workflow) WithTypePrefixes ¶ added in v0.0.4
WithTypePrefixes explicitly sets type prefixes to watch.
func (*Workflow) WithWorkerID ¶ added in v0.0.4
WithWorkerID sets a custom worker ID.
type WorkflowEvent ¶ added in v0.0.4
type WorkflowEvent struct {
ID int64 `json:"id"`
WorkflowID string `json:"workflow_id"`
EventType string `json:"event_type"`
Data json.RawMessage `json:"data,omitempty"`
CreatedAt time.Time `json:"created_at"`
}
WorkflowEvent represents an audit event for a workflow run.
type WorkflowExecutor ¶
type WorkflowExecutor interface {
// Execute runs the workflow and returns the result or error
Execute(ctx context.Context, run *WorkflowRun) (any, error)
}
WorkflowExecutor is implemented by users to execute specific workflows
type WorkflowRun ¶ added in v0.0.3
type WorkflowRun struct {
ID string
Type string
Payload []byte // JSON
Attempt int
MaxAttempts int
// Functions for long-running workflows
Heartbeat HeartbeatFunc // Extend the lease to prevent timeout
IsCancelled CancellationCheckFunc // Check if workflow has been cancelled
}
WorkflowRun represents a claimed workflow run from the database (renamed from WorkflowIntent for consistency with new naming)
type WorkflowRunStatus ¶ added in v0.0.4
type WorkflowRunStatus struct {
ID string `json:"id"`
Type string `json:"type"`
Payload []byte `json:"payload"`
Status string `json:"status"`
Priority int `json:"priority"`
RunAt time.Time `json:"run_at"`
IdempotencyKey *string `json:"idempotency_key,omitempty"`
Attempt int `json:"attempt"`
MaxAttempts int `json:"max_attempts"`
LeasedBy *string `json:"leased_by,omitempty"`
LeaseUntil *time.Time `json:"lease_until,omitempty"`
LastError *string `json:"last_error,omitempty"`
Result []byte `json:"result,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
WorkflowRunStatus is the full read model for a workflow run, with JSON tags for API responses.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
cmd
|
|
|
server
command
|
|
|
examples
|
|
|
go/scheduled-report
command
|
|
|
go/sqlite-worker
command
|