Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ClusterInterface ¶
type ClusterInterface interface {
// Run starts the cluster's execution engine. Workers begin listening to the queue and processing tasks.
Run()
// Stop - graceful shutdown. The cluster stops accepting new tasks and waits for active workers to finish.
// If the timeout is reached before tasks finish, it returns an error.
Stop(timeout time.Duration) error
// AddTask - the entry point for submitting a task. It returns a builder that allows you to configure scheduling, timeouts, and metadata.
// You must call .Submit() at the end of the chain to queue the task.
AddTask(t task.TaskInterface) ClusterTaskBuilderInterface
// Subscribe returns a channel that receives the task's result (val and err).
Subscribe(id string) (<-chan Result, error)
// Cancel - immediate shutdown. Instantly kills all workers and cancels all active task contexts.
// Use this only when a graceful shutdown is not possible, as it may leave tasks in an incomplete state.
Cancel()
// CancelTask targets and cancels a specific task by its ID.
// This triggers the cancelled channel inside the TaskFunc and closes the task's context.
CancelTask(id string)
}
ClusterInterface - interface that provides encapsulation for cluster methods.
func NewCluster ¶
func NewCluster(workers int, ctx context.Context) ClusterInterface
NewCluster is a function (constructor), creates a new cluster instance.
type ClusterTaskBuilderInterface ¶
type ClusterTaskBuilderInterface interface {
// WithStartTime schedules the task to run at a specific time.
// If the time is in the past or time.Now(), the task will be executed as soon as a worker is available.
WithStartTime(st time.Time) ClusterTaskBuilderInterface
// WithTimeout sets a maximum execution time for the task.
// If the task exceeds this duration, its ctx will be cancelled, and the task will be marked as timed out.
WithTimeout(tm time.Duration) ClusterTaskBuilderInterface
// WithPriority sets the task's priority.
// Higher values (or lower, depending on your heap logic—usually higher) will move the task to the front of the queue.
WithPriority(p int) ClusterTaskBuilderInterface
// WithRetry sets the maximum number of times a task will be retried upon failure.
// Default is 0 (no retries).
WithRetry(r int) ClusterTaskBuilderInterface
// WithBackoffStrategy defines the delay logic between retry attempts.
// Default is FixedBackoff with 0s delay if not specified.
WithBackoffStrategy(strategy RetryBackoffStrategy) ClusterTaskBuilderInterface
// WithJitter adds a small random variation to the retry delay to prevent thundering herd problems.
WithJitter() ClusterTaskBuilderInterface
// RetryIf registers a predicate function to decide whether a retry should be attempted based on the error.
// If the function returns true, the task is retried; otherwise, it fails immediately.
RetryIf(func(err error) bool) ClusterTaskBuilderInterface
// WithRetryMode defines where the retry delay happens.
// Default is Requeue (returns the task to the queue).
WithRetryMode(mode RetryMode) ClusterTaskBuilderInterface
// IsCacheable determines if the task result should be stored in memory after completion.
// Currently, cached results are stored for 5 minutes after the task completes.
// After this period, the result is purged from memory to prevent leaks.
// (Note: This duration may become configurable in future releases).
IsCacheable(v bool) ClusterTaskBuilderInterface
// OnComplete registers a callback that will be invoked once the task finishes its execution,
// regardless of whether it succeeded, failed, or was cancelled.
// The callback receives the task's unique ID, the resulting value, and any error encountered.
OnComplete(fn func(id string, val any, err error)) ClusterTaskBuilderInterface
// OnFailure registers a callback that will be invoked only if the task ends with an error,
// a panic, or is cancelled.
// The callback receives the task's unique ID and the error that caused the failure.
OnFailure(fn func(id string, err error)) ClusterTaskBuilderInterface
// Submit - the final method in the chain.
// It validates the task, generates an ID (if empty), and pushes the task into the scheduler.
// Returns an error if a task with the same ID is already running or managed by the cluster.
Submit() (string, error)
}
ClusterTaskBuilderInterface provides a fluent API for configuring and submitting a task to the cluster. It allows setting execution parameters such as start time, timeout, priority, and lifecycle callbacks. The configuration must be finalized by calling the Submit method.
type ExponentialBackoff ¶
type ExponentialBackoff struct {
// Base is the starting delay duration.
Base time.Duration
// Multiplier is the factor by which the delay grows each attempt (e.g., 2.0).
Multiplier float64
// Max is the upper bound for the delay. If 0, no maximum is applied.
Max time.Duration
}
ExponentialBackoff increases the delay exponentially, optionally capped by a maximum value.
type FixedBackoff ¶
FixedBackoff provides a constant delay between all retry attempts.
type LinearBackoff ¶
type LinearBackoff struct {
// Base is the initial delay duration.
Base time.Duration
// Step is the duration added to the delay after each attempt.
Step time.Duration
}
LinearBackoff increases the delay by a constant step for each subsequent attempt.
type Result ¶
type Result struct {
// Data from the task.
Result any
// Error from the task, timeout, or panic.
Err error
}
Result is what you get from the subscription channel.
type RetryBackoffStrategy ¶
type RetryBackoffStrategy interface {
// Next returns the duration to wait before the next attempt, given the current attempt number.
Next(attempt int) time.Duration
}
RetryBackoffStrategy defines the interface for calculating delays between retry attempts. Any custom backoff logic must implement this interface.
func NewExponentialBackoff ¶
func NewExponentialBackoff(base time.Duration, multiplier float64, max time.Duration) RetryBackoffStrategy
NewExponentialBackoff creates a strategy where delay grows by a multiplier. If multiplier is less than 1.0, it defaults to 2.0 to ensure growth.
func NewFixedBackoff ¶
func NewFixedBackoff(delay time.Duration) RetryBackoffStrategy
NewFixedBackoff creates a strategy that always returns the same delay.
func NewLinearBackoff ¶
func NewLinearBackoff(base time.Duration, step time.Duration) RetryBackoffStrategy
NewLinearBackoff creates a strategy that increases delay by a fixed step.
type RetryMode ¶
type RetryMode int
RetryMode defines how the task should be retried after a failure.
const ( // Requeue sends the task back to the priority queue after a failure. // This frees up the current worker immediately, allowing other tasks to run // during the backoff delay. Requeue RetryMode = iota // Immediate retries the task within the same worker goroutine. // The worker will block and wait for the backoff delay duration before // executing the task again. Use this for high-priority tasks where // keeping the execution slot is more important than worker availability. Immediate )