oncetask

package
v0.0.0-...-4074229 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 20, 2026 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// CollectionOnceTasks is the Firestore collection name for storing once tasks.
	CollectionOnceTasks string = "onceTasks"
	// EnvVariable is the environment variable name for task environment separation.
	EnvVariable string = "ONCE_TASK_ENV"
	// DefaultEnv is the default environment name when EnvVariable is not set.
	DefaultEnv string = "DEFAULT"
	// NoWait is the zero value for waitUntil, representing immediate execution (epoch time).
	// Use this constant instead of calling time.Time{}.Format(time.RFC3339) everywhere.
	NoWait string = "0001-01-01T00:00:00Z"
)

Variables

View Source
var ErrHandlerAlreadyExists = errors.New("handler for this task type already exists")

ErrHandlerAlreadyExists is returned when trying to register a handler for a task type that already has a handler

Functions

func GetCurrentTaskID

func GetCurrentTaskID(ctx context.Context) string

GetCurrentTaskID returns the task ID stored in the context, or an empty string if not present. This is useful for debugging or when you need to access the current task ID within a handler.

func GetCurrentTaskResourceKey

func GetCurrentTaskResourceKey(ctx context.Context) string

GetCurrentTaskResourceKey returns the resource key stored in the context, or an empty string if not present. This is useful for debugging or when you need to access the current resource key within a handler.

func SafeExecute

func SafeExecute[P any, R any](ctx context.Context, fn func(context.Context, P) (R, error), p P) (result R, err error)

SafeExecute wraps a function execution with panic recovery. If the function panics, the panic is recovered and converted to an error. The stack trace is logged via slog.ErrorContext for debugging.

Example usage:

result, err := SafeExecute(ctx, handler, task)

Returns:

  • (result, nil) if fn completes successfully
  • (nil, error) if fn returns an error
  • (nil, error) if fn panics (panic converted to error)

Types

type ContextHandler

type ContextHandler struct {
	// contains filtered or unexported fields
}

ContextHandler is a slog.Handler that automatically extracts the task ID and resource key from context and adds them as attributes to all log records.

Usage:

handler := oncetask.NewContextHandler(slog.NewJSONHandler(os.Stdout, nil))
slog.SetDefault(slog.New(handler))

func NewContextHandler

func NewContextHandler(h slog.Handler) *ContextHandler

NewContextHandler creates a new ContextHandler that wraps another handler

func (*ContextHandler) Enabled

func (h *ContextHandler) Enabled(ctx context.Context, level slog.Level) bool

Enabled implements slog.Handler

func (*ContextHandler) Handle

func (h *ContextHandler) Handle(ctx context.Context, r slog.Record) error

Handle implements slog.Handler and automatically adds task ID and resource key from context

func (*ContextHandler) WithAttrs

func (h *ContextHandler) WithAttrs(attrs []slog.Attr) slog.Handler

WithAttrs implements slog.Handler

func (*ContextHandler) WithGroup

func (h *ContextHandler) WithGroup(name string) slog.Handler

WithGroup implements slog.Handler

type Data

type Data[TaskKind comparable] interface {
	GetType() TaskKind

	// Generate a deterministic, idempotent ID based on the task's natural key.
	// This ensures that reprocessing the same task produces the same task ID,
	// allowing for safe overwrites instead of creating duplicates.
	GenerateIdempotentID() string
}

Data defines the interface for task-specific data that can be stored in OnceTask. Each implementation represents a specific type of once-execution task with its own data structure.

type ExponentialBackoffPolicy

type ExponentialBackoffPolicy struct {
	MaxAttempts int           // Maximum attempts (0 = unlimited)
	BaseDelay   time.Duration // Initial delay (default: 1 second)
	MaxDelay    time.Duration // Maximum delay cap (default: 5 minutes)
	Multiplier  float64       // Multiplier per attempt (default: 2.0)
}

ExponentialBackoffPolicy retries with exponential backoff. Delay = BaseDelay * (Multiplier ^ (attempts - 1)), capped at MaxDelay.

func (ExponentialBackoffPolicy) NextRetryDelay

func (p ExponentialBackoffPolicy) NextRetryDelay(attempts int, err error) time.Duration

NextRetryDelay calculates the exponential backoff delay for the next retry.

func (ExponentialBackoffPolicy) ShouldRetry

func (p ExponentialBackoffPolicy) ShouldRetry(attempts int, err error) bool

ShouldRetry returns true if the task should be retried based on attempt count.

type FixedDelayPolicy

type FixedDelayPolicy struct {
	MaxAttempts int           // Maximum attempts (0 = unlimited)
	Delay       time.Duration // Delay between retries
}

FixedDelayPolicy retries with a constant delay between attempts.

func (FixedDelayPolicy) NextRetryDelay

func (p FixedDelayPolicy) NextRetryDelay(attempts int, err error) time.Duration

NextRetryDelay returns the fixed delay for the next retry.

func (FixedDelayPolicy) ShouldRetry

func (p FixedDelayPolicy) ShouldRetry(attempts int, err error) bool

ShouldRetry returns true if the task should be retried based on attempt count.

type Handler

type Handler[TaskKind ~string] func(ctx context.Context, task *OnceTask[TaskKind]) (any, error)

Handler processes a single pending task. Returns (result, nil) on success. Result can be nil if no output is needed. Returns (nil, error) on failure; task will be retried according to config.

One task kind can either have a single handler OR a resource key handler, but not both.

This handler supports two execution strategies (determined dynamically by the task's ResourceKey):

  1. Concurrent (ResourceKey is empty): Multiple tasks of the same kind execute concurrently. No locking constraints between tasks.

  2. OnePerResourceKey (ResourceKey is non-empty): Only one task with the same ResourceKey executes at a time. Execution is serialized (queued) for that specific ResourceKey - ordering not guaranteed.

func NoResult

func NoResult[TaskKind ~string](fn func(ctx context.Context, task *OnceTask[TaskKind]) error) Handler[TaskKind]

NoResult adapts a single-task handler that doesn't return a result. Use this for handlers that only need to return an error on failure.

type HandlerOption

type HandlerOption func(*handlerConfig)

HandlerOption is a functional option for configuring handlers.

func WithCancellationHandler

func WithCancellationHandler[TaskKind ~string](handler Handler[TaskKind]) HandlerOption

WithCancellationHandler sets a cleanup handler for cancelled tasks. The handler uses the same signature as OnceTaskHandler[TaskKind]. Cancelled tasks are always processed one at a time, even if the normal handler is a resource-key handler. When a task is cancelled:

  • If handler is set: Handler is invoked to perform cleanup, retried per CancellationRetryPolicy.
  • If handler is nil: Task is immediately marked as done-cancelled without executing any handler.

Example:

oncetask.WithCancellationHandler(func(ctx context.Context, task *oncetask.OnceTask[TaskKind]) (any, error) {
    // Perform cleanup operations
    return nil, nil
})

func WithCancellationRetryPolicy

func WithCancellationRetryPolicy(policy RetryPolicy) HandlerOption

WithCancellationRetryPolicy sets the retry policy for cancellation handlers.

func WithConcurrency

func WithConcurrency(n int) HandlerOption

WithConcurrency sets the number of concurrent workers processing tasks. Values less than 1 are ignored; default is 1 (serial execution).

func WithLeaseDuration

func WithLeaseDuration(d time.Duration) HandlerOption

WithLeaseDuration sets the lease duration for task execution.

func WithNoRetry

func WithNoRetry() HandlerOption

WithNoRetry disables retries - tasks fail permanently on first error.

func WithRetryPolicy

func WithRetryPolicy(policy RetryPolicy) HandlerOption

WithRetryPolicy sets a custom retry policy for the handler.

type Manager

type Manager[TaskKind ~string] interface {
	// CreateTask creates a once task to Firestore.
	// The task parameter should be created using fs_models/once.NewOnceTask().
	// If a task with the same ID already exists, logs and returns nil (idempotent).
	// Returns true if the task was created, false if it already existed.
	// Returns false if a task with the same ID already exists or error.
	CreateTask(ctx context.Context, taskData Data[TaskKind]) (bool, error)

	// CreateTasks creates multiple once tasks using Firestore BulkWriter.
	//
	// This method is NON-ATOMIC: individual task creations can succeed or fail independently.
	// This is intentional - it allows partial success when some tasks already exist or fail,
	// rather than failing the entire batch.
	//
	// Idempotency: Tasks that already exist (same ID) are silently skipped and do not
	// contribute to the returned error. This makes it safe to retry the entire batch.
	//
	// Returns:
	//   - (created count, nil) if all tasks were created or already existed
	//   - (created count, error) if at least one task failed for a reason other than already-existing
	//
	// The returned error aggregates all non-AlreadyExists failures using errors.Join.
	CreateTasks(ctx context.Context, taskDataList []Data[TaskKind]) (int, error)

	// RegisterTaskHandler listens for new tasks and executes the handler function for each task.
	// Handler returns (result, nil) on success or (nil, error) on failure.
	// Use NoResult() adapter for handlers that don't produce a result.
	//
	// Configuration options (see HandlerOption for details and examples):
	//   - WithRetryPolicy: Configure retry behavior for task execution
	//   - WithCancellationHandler: Register a cleanup handler for cancelled tasks (optional)
	//   - WithCancellationRetryPolicy: Configure retry behavior for cancellation handlers
	//   - WithLeaseDuration: Set how long a task is leased during execution
	//   - WithConcurrency: Set number of concurrent workers
	RegisterTaskHandler(taskType TaskKind, handler Handler[TaskKind], opts ...HandlerOption) error

	// RegisterResourceKeyHandler listens for new tasks and executes the handler for all tasks with the same resource key.
	// All pending tasks with the same resource key are grouped together and ordered by WaitUntil.
	// The handler is responsible for any additional ordering logic.
	// If the handler returns nil, all tasks for that resource key are marked as done.
	// If the handler returns an error, all tasks for that resource key will be retried.
	// Tasks without a resource key are processed individually.
	// Handler returns (result, nil) on success or (nil, error) on failure.
	// Use NoResultResourceKey() adapter for handlers that don't produce a result.
	//
	// Configuration options (see HandlerOption for details and examples):
	//   - WithRetryPolicy: Configure retry behavior for task execution
	//   - WithCancellationHandler: Register a cleanup handler for cancelled tasks (optional)
	//   - WithCancellationRetryPolicy: Configure retry behavior for cancellation handlers
	//   - WithLeaseDuration: Set how long a task is leased during execution
	//   - WithConcurrency: Set number of concurrent workers
	RegisterResourceKeyHandler(taskType TaskKind, handler ResourceKeyHandler[TaskKind], opts ...HandlerOption) error

	// GetTasksByResourceKey retrieves all tasks with the given resource key.
	// Returns tasks ordered by CreatedAt (oldest first).
	// The tasks must belong to the current environment (from ONCE_TASK_ENV).
	GetTasksByResourceKey(ctx context.Context, resourceKey string) ([]OnceTask[TaskKind], error)

	// GetTasksByIds retrieves tasks by their IDs.
	// Returns tasks in no guaranteed order.
	// Tasks not found are omitted from the result (no error).
	// The tasks must belong to the current environment (from ONCE_TASK_ENV).
	GetTasksByIds(ctx context.Context, ids []string) ([]OnceTask[TaskKind], error)

	// CancelTask marks a single task as cancelled.
	// Idempotent: no-op if task is already done or cancelled.
	// Sets isCancelled=true, cancelledAt=now, waitUntil=NoWait (immediate execution).
	CancelTask(ctx context.Context, taskID string) error

	// CancelTasksByResourceKey marks all non-done tasks with resourceKey as cancelled.
	// Returns count of tasks cancelled.
	CancelTasksByResourceKey(ctx context.Context, taskType TaskKind, resourceKey string) (int, error)

	// CancelTasksByIds marks multiple tasks as cancelled (bulk operation via BulkWriter).
	// Returns count of tasks cancelled. Partial failures return both count and aggregated error.
	// Returns error if task belongs to a different environment.
	// Idempotent: Tasks already done or cancelled are skipped (no-op).
	CancelTasksByIds(ctx context.Context, taskIDs []string) (int, error)

	// DeleteTask permanently removes a single task from Firestore.
	// Returns error if task belongs to a different environment.
	// Idempotent: no error if task doesn't exist.
	// WARNING: If the task is currently being executed (leased), the handler will
	// continue running but fail when attempting to update task status on completion.
	DeleteTask(ctx context.Context, taskID string) error

	// DeleteTasksByIds permanently removes multiple tasks from Firestore (bulk operation via BulkWriter).
	// Returns count of tasks deleted. Partial failures return both count and aggregated error.
	// Returns error if task belongs to a different environment.
	// Idempotent: non-existent tasks are silently skipped.
	// WARNING: Deleting leased tasks will cause running handlers to fail on completion.
	DeleteTasksByIds(ctx context.Context, taskIDs []string) (int, error)

	// ResetTask resets a single task back to pending state for re-execution.
	// Only applies to tasks in terminal states (doneAt != "").
	// Returns error if task belongs to a different environment.
	// Idempotent: no-op if task is already pending/running.
	// Clears all execution state (Attempts, Errors, Result) and cancellation state (IsCancelled).
	// Sets WaitUntil=NoWait for immediate execution.
	ResetTask(ctx context.Context, taskID string) error

	// ResetTasksByIds resets multiple tasks back to pending state (bulk operation via BulkWriter).
	// Returns count of tasks reset. Partial failures return both count and aggregated error.
	// Only resets tasks in terminal states (doneAt != "").
	// Returns error if task belongs to a different environment.
	// Idempotent: Tasks already in non-terminal states are skipped (no-op).
	ResetTasksByIds(ctx context.Context, taskIDs []string) (int, error)
}

Manager defines the interface for managing once-execution tasks.

func NewFirestoreOnceTaskManager

func NewFirestoreOnceTaskManager[TaskKind ~string](ctx context.Context, client *firestore.Client) (m Manager[TaskKind], cleanup func())

NewFirestoreOnceTaskManager creates a new firestore once task manager. The provided context is used as the parent for all background task processing goroutines. Context values (trace IDs, tenant IDs, etc.) will be inherited by task handlers. Returns the manager and a cleanup function that cancels all running goroutines.

type NoRetryPolicy

type NoRetryPolicy struct{}

NoRetryPolicy never retries - tasks fail permanently on first error.

func (NoRetryPolicy) NextRetryDelay

func (p NoRetryPolicy) NextRetryDelay(attempts int, err error) time.Duration

NextRetryDelay always returns 0 - no retry delay is needed.

func (NoRetryPolicy) ShouldRetry

func (p NoRetryPolicy) ShouldRetry(attempts int, err error) bool

ShouldRetry always returns false - no retries are performed.

type OnceTask

type OnceTask[TaskKind ~string] struct {
	Id   string                 `json:"id" firestore:"id"` // Also the idempotency key.
	Type TaskKind               `json:"type" firestore:"type"`
	Data map[string]interface{} `json:"data" firestore:"data"`

	// Optional - identifies a resource that requires serialization (e.g., calendarId, conversationId)
	// When set, lease acquisition checks for active leases on other tasks with the same ResourceKey
	ResourceKey string `json:"resourceKey" firestore:"resourceKey"`

	// Environment identifier for logical separation of tasks (e.g., "dev", "staging", "prod")
	// Read from `EnvVariable` (ONCE_TASK_ENV) environment variable, defaults to "DEFAULT"
	Env string `json:"env" firestore:"env"`

	WaitUntil   string `json:"waitUntil" firestore:"waitUntil"`     // ISO 8601 - wait until this time to execute the task.
	LeasedUntil string `json:"leasedUntil" firestore:"leasedUntil"` // ISO 8601 - lease expiration for the task executor.
	CreatedAt   string `json:"createdAt" firestore:"createdAt"`     // ISO 8601
	DoneAt      string `json:"doneAt" firestore:"doneAt"`           // ISO 8601

	// Retry and result tracking fields
	Attempts int         `json:"attempts" firestore:"attempts"` // Number of execution attempts (incremented when lease is acquired)
	Errors   []TaskError `json:"errors" firestore:"errors"`     // Failure history (one entry per failed attempt)
	Result   any         `json:"result" firestore:"result"`     // Handler output (optional, can be nil on success)

	// Recurrence fields
	// For recurrence tasks (generators): Recurrence is set, ParentRecurrenceID is empty
	// For occurrence tasks (instances): Recurrence is nil, ParentRecurrenceID points to generator
	Recurrence          *Recurrence `json:"recurrence" firestore:"recurrence"`                   // Recurrence config (nil = one-time or occurrence task)
	ParentRecurrenceID  string      `json:"parentRecurrenceId" firestore:"parentRecurrenceId"`   // ID of parent recurrence task (empty = standalone or recurrence task)
	OccurrenceTimestamp string      `json:"occurrenceTimestamp" firestore:"occurrenceTimestamp"` // Scheduled time of this occurrence (ISO 8601)

	// Cancellation fields
	IsCancelled bool   `json:"isCancelled" firestore:"isCancelled"` // Cancellation flag (false = not cancelled)
	CancelledAt string `json:"cancelledAt" firestore:"cancelledAt"` // ISO 8601 timestamp when task was cancelled (audit trail)
}

OnceTask represents a task to be executed exactly once, asynchronously.

func (*OnceTask[TaskKind]) GetStatus

func (t *OnceTask[TaskKind]) GetStatus(now time.Time) (TaskStatus, error)

GetStatus computes and returns the complete execution status of the task. The status is derived from the task's timestamp fields and retry tracking relative to the provided current time.

This method combines the semantics of both transient states (pending, leased, waiting, cancellation_pending) and terminal states (completed, failed, cancelled).

Returns an error if any timestamp field contains invalid RFC3339 format.

func (*OnceTask[TaskKind]) ReadInto

func (t *OnceTask[TaskKind]) ReadInto(v Data[TaskKind]) error

ReadInto allows for reading the data field into a specific type.

type Recurrence

type Recurrence struct {
	RRule   string   `json:"rrule" firestore:"rrule"`     // RFC 5545 rule, e.g., "FREQ=WEEKLY;BYDAY=MO"
	DTStart string   `json:"dtstart" firestore:"dtstart"` // Recurrence anchor time (ISO 8601)
	ExDates []string `json:"exdates" firestore:"exdates"` // Exception dates to skip
}

Recurrence defines a recurring task schedule using RFC 5545 RRULE. When set on a task, the task becomes a generator that creates occurrence tasks.

Architecture:

  • Recurrence Task (Generator): Holds the RRULE and spawns occurrence tasks. Has Recurrence set, never marked as done (unless RRULE exhausted).
  • Occurrence Task (Instance): A regular one-time task spawned by the generator. Has ParentRecurrenceID set, has its own lifecycle (attempts, errors, result, doneAt).

After 3 weeks of weekly runs, there are 4 tasks total:

  • 1 recurrence task (the generator, still active)
  • 3 occurrence tasks (each with their own execution history)

type RecurrenceProvider

type RecurrenceProvider interface {
	GetRecurrence() *Recurrence
}

RecurrenceProvider is an optional interface that Data implementations can implement to define recurring task schedules. When implemented, the task becomes a generator that spawns occurrence tasks according to the RRULE.

type ResourceKeyHandler

type ResourceKeyHandler[TaskKind ~string] func(ctx context.Context, tasks []OnceTask[TaskKind]) (any, error)

ResourceKeyHandler processes all pending tasks with the same non-empty resource key in a single execution (Pipelined Execution).

This handler corresponds to the "AllPerResourceKey" strategy:

  • All pending tasks for a specific ResourceKey are claimed and executed together.
  • Tasks are ordered by WaitUntil timestamp. The handler is responsible for any additional ordering logic.

If the ResourceKey is empty, the handler behaves like a single task handler (Concurrent strategy).

Return Values:

  • (result, nil): All tasks in the batch are marked as successfully completed. Result is stored in each task's Result field.
  • (nil, error): All tasks in the batch are failed and will be retried.

func NoResultResourceKey

func NoResultResourceKey[TaskKind ~string](fn func(ctx context.Context, tasks []OnceTask[TaskKind]) error) ResourceKeyHandler[TaskKind]

NoResultResourceKey adapts a resource-key handler that doesn't return a result. Use this for handlers that only need to return an error on failure.

type ResourceKeyProvider

type ResourceKeyProvider interface {
	GetResourceKey() string
}

ResourceKeyProvider is an optional interface that Data implementations can implement to enable resource-level serialization. When GetResourceKey() returns a non-empty string, lease acquisition will check for active leases on other tasks with the same ResourceKey, ensuring only one task executes at a time per resource (e.g., per calendarId or conversationId). If GetResourceKey() returns an empty string or the interface is not implemented, task-level leasing is used (only one handler processes the specific task).

type RetryPolicy

type RetryPolicy interface {
	// ShouldRetry returns true if the task should be retried after failure.
	// attempts is the current attempt count (1 = first attempt, 2 = first retry, etc.)
	ShouldRetry(attempts int, err error) bool

	// NextRetryDelay returns the duration to wait before the next retry.
	// attempts is the current attempt count.
	// err is the error returned by the handler, which can be used to determine the delay.
	NextRetryDelay(attempts int, err error) time.Duration
}

RetryPolicy defines retry behavior for task execution failures. Users can implement this interface for custom retry logic.

type ScheduledTask

type ScheduledTask interface {
	GetScheduledTime() time.Time
}

ScheduledTask is an optional interface that Data implementations can implement to specify a scheduled time for the task. When GetScheduledTime() returns a non-empty time, the task will not be executed until the specified time.

type TaskError

type TaskError struct {
	At    string `json:"at" firestore:"at"`       // ISO 8601 timestamp of failure
	Error string `json:"error" firestore:"error"` // Failure reason
}

TaskError represents a single failed execution attempt.

type TaskStatus

type TaskStatus string

TaskStatus represents the complete execution status of a task. Status is derived from the task's timestamp fields and retry tracking, combining both transient (pending, leased, waiting) and terminal (completed, failed, cancelled) states.

| Status | doneAt | isCancelled | leasedUntil | waitUntil | Description | |---------------------|--------|-------------|--------------|--------------|------------------------------------------| | cancellationPending | "" | true | "" or past | "" or past | Cancelled, waiting for handler | | cancelled | set | true | "" | unchanged | Cancelled and marked done | | pending | "" | false | "" or past | "" or past | Ready to execute | | waiting | "" | false | future | "" or past | Scheduled for future / backoff | | leased | "" | false | future | unchanged | Currently being executed | | completed | set | false | "" | unchanged | Last attempt succeeded | | failed | set | false | "" | unchanged | All attempts failed |

const (
	// TaskStatusWaiting indicates the task is scheduled for future execution (waitUntil > now)
	// or is in backoff period after a failed attempt.
	TaskStatusWaiting TaskStatus = "waiting"

	// TaskStatusPending indicates the task is ready to execute immediately
	// (waitUntil <= now, not leased, not done, not cancelled).
	TaskStatusPending TaskStatus = "pending"

	// TaskStatusLeased indicates the task is currently being executed by a worker
	// (leasedUntil > now, not cancelled).
	TaskStatusLeased TaskStatus = "leased"

	// TaskStatusCancellationPending indicates the task was cancelled but not yet marked as done
	// (isCancelled is true, doneAt is not set). The cancellation handler will run.
	TaskStatusCancellationPending TaskStatus = "cancellationPending"

	// TaskStatusCompleted indicates the task has finished successfully
	// (doneAt is set, isCancelled is false, attempts > len(errors)).
	TaskStatusCompleted TaskStatus = "completed"

	// TaskStatusFailed indicates all retry attempts were exhausted without success
	// (doneAt is set, isCancelled is false, attempts == len(errors)).
	TaskStatusFailed TaskStatus = "failed"

	// TaskStatusCancelled indicates the task was cancelled and marked as done
	// (doneAt is set, isCancelled is true).
	TaskStatusCancelled TaskStatus = "cancelled"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL