cluster

package
v0.0.3-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 5, 2026 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ClusterInterface

type ClusterInterface interface {
	// Run starts the cluster's execution engine. Workers begin listening to the queue and processing tasks.
	Run()

	// Stop - graceful shutdown. The cluster stops accepting new tasks and waits for active workers to finish.
	// If the timeout is reached before tasks finish, it returns an error.
	Stop(timeout time.Duration) error

	// AddTask - the entry point for submitting a task. It returns a builder that allows you to configure scheduling, timeouts, and metadata.
	// You must call .Submit() at the end of the chain to queue the task.
	AddTask(t task.TaskInterface) ClusterTaskBuilderInterface

	// Subscribe returns a channel that receives the task's result (val and err).
	Subscribe(id string) (<-chan Result, error)

	// Cancel - immediate shutdown. Instantly kills all workers and cancels all active task contexts.
	// Use this only when a graceful shutdown is not possible, as it may leave tasks in an incomplete state.
	Cancel()

	// CancelTask targets and cancels a specific task by its ID.
	// This triggers the cancelled channel inside the TaskFunc and closes the task's context.
	CancelTask(id string)
}

ClusterInterface - interface that provides encapsulation for cluster methods.

func NewCluster

func NewCluster(workers int, ctx context.Context) ClusterInterface

NewCluster is a function (constructor), creates a new cluster instance.

type ClusterTaskBuilderInterface

type ClusterTaskBuilderInterface interface {
	// WithStartTime schedules the task to run at a specific time.
	// If the time is in the past or time.Now(), the task will be executed as soon as a worker is available.
	WithStartTime(st time.Time) ClusterTaskBuilderInterface

	// WithTimeout sets a maximum execution time for the task.
	// If the task exceeds this duration, its ctx will be cancelled, and the task will be marked as timed out.
	WithTimeout(tm time.Duration) ClusterTaskBuilderInterface

	// WithPriority sets the task's priority.
	// Higher values (or lower, depending on your heap logic—usually higher) will move the task to the front of the queue.
	WithPriority(p int) ClusterTaskBuilderInterface

	// WithRetry sets the maximum number of times a task will be retried upon failure.
	// Default is 0 (no retries).
	WithRetry(r int) ClusterTaskBuilderInterface

	// WithBackoffStrategy defines the delay logic between retry attempts.
	// Default is FixedBackoff with 0s delay if not specified.
	WithBackoffStrategy(strategy RetryBackoffStrategy) ClusterTaskBuilderInterface

	// WithJitter adds a small random variation to the retry delay to prevent thundering herd problems.
	WithJitter() ClusterTaskBuilderInterface

	// RetryIf registers a predicate function to decide whether a retry should be attempted based on the error.
	// If the function returns true, the task is retried; otherwise, it fails immediately.
	RetryIf(func(err error) bool) ClusterTaskBuilderInterface

	// WithRetryMode defines where the retry delay happens.
	// Default is Requeue (returns the task to the queue).
	WithRetryMode(mode RetryMode) ClusterTaskBuilderInterface

	// IsCacheable determines if the task result should be stored in memory after completion.
	// Currently, cached results are stored for 5 minutes after the task completes.
	// After this period, the result is purged from memory to prevent leaks.
	// (Note: This duration may become configurable in future releases).
	IsCacheable(v bool) ClusterTaskBuilderInterface

	// OnComplete registers a callback that will be invoked once the task finishes its execution,
	// regardless of whether it succeeded, failed, or was cancelled.
	// The callback receives the task's unique ID, the resulting value, and any error encountered.
	OnComplete(fn func(id string, val any, err error)) ClusterTaskBuilderInterface

	// OnFailure registers a callback that will be invoked only if the task ends with an error,
	// a panic, or is cancelled.
	// The callback receives the task's unique ID and the error that caused the failure.
	OnFailure(fn func(id string, err error)) ClusterTaskBuilderInterface

	// Submit - the final method in the chain.
	// It validates the task, generates an ID (if empty), and pushes the task into the scheduler.
	// Returns an error if a task with the same ID is already running or managed by the cluster.
	Submit() (string, error)
}

ClusterTaskBuilderInterface provides a fluent API for configuring and submitting a task to the cluster. It allows setting execution parameters such as start time, timeout, priority, and lifecycle callbacks. The configuration must be finalized by calling the Submit method.

type ExponentialBackoff

type ExponentialBackoff struct {
	// Base is the starting delay duration.
	Base time.Duration

	// Multiplier is the factor by which the delay grows each attempt (e.g., 2.0).
	Multiplier float64

	// Max is the upper bound for the delay. If 0, no maximum is applied.
	Max time.Duration
}

ExponentialBackoff increases the delay exponentially, optionally capped by a maximum value.

func (ExponentialBackoff) Next

func (b ExponentialBackoff) Next(attempt int) time.Duration

Next returns a delay that grows exponentially: Base * (Multiplier ^ attempt).

type FixedBackoff

type FixedBackoff struct {
	// Delay is the duration to wait between retries.
	Delay time.Duration
}

FixedBackoff provides a constant delay between all retry attempts.

func (FixedBackoff) Next

func (b FixedBackoff) Next(_ int) time.Duration

Next returns the fixed delay regardless of the attempt number.

type LinearBackoff

type LinearBackoff struct {
	// Base is the initial delay duration.
	Base time.Duration

	// Step is the duration added to the delay after each attempt.
	Step time.Duration
}

LinearBackoff increases the delay by a constant step for each subsequent attempt.

func (LinearBackoff) Next

func (b LinearBackoff) Next(attempt int) time.Duration

Next returns a delay that grows linearly: Base + (Step * attempt).

type Result

type Result struct {
	// Data from the task.
	Result any

	// Error from the task, timeout, or panic.
	Err error
}

Result is what you get from the subscription channel.

type RetryBackoffStrategy

type RetryBackoffStrategy interface {
	// Next returns the duration to wait before the next attempt, given the current attempt number.
	Next(attempt int) time.Duration
}

RetryBackoffStrategy defines the interface for calculating delays between retry attempts. Any custom backoff logic must implement this interface.

func NewExponentialBackoff

func NewExponentialBackoff(base time.Duration, multiplier float64, max time.Duration) RetryBackoffStrategy

NewExponentialBackoff creates a strategy where delay grows by a multiplier. If multiplier is less than 1.0, it defaults to 2.0 to ensure growth.

func NewFixedBackoff

func NewFixedBackoff(delay time.Duration) RetryBackoffStrategy

NewFixedBackoff creates a strategy that always returns the same delay.

func NewLinearBackoff

func NewLinearBackoff(base time.Duration, step time.Duration) RetryBackoffStrategy

NewLinearBackoff creates a strategy that increases delay by a fixed step.

type RetryMode

type RetryMode int

RetryMode defines how the task should be retried after a failure.

const (
	// Requeue sends the task back to the priority queue after a failure.
	// This frees up the current worker immediately, allowing other tasks to run
	// during the backoff delay.
	Requeue RetryMode = iota

	// Immediate retries the task within the same worker goroutine.
	// The worker will block and wait for the backoff delay duration before
	// executing the task again. Use this for high-priority tasks where
	// keeping the execution slot is more important than worker availability.
	Immediate
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL