Documentation
¶
Index ¶
Constants ¶
const DefaultNumOfWorkers int = 1
Variables ¶
var DefaultBackoffPolicy = BackoffPolicy{ BaseDelay: 1 * time.Second, MaxDelay: 30 * time.Second, }
var ErrEmptyTaskName = errors.New("task name must not be empty")
Functions ¶
This section is empty.
Types ¶
type Backoff ¶
type Backoff interface {
// Calculate returns the duration to wait before the next retry attempt.
// The input parameter retries indicates how many times the task has already been retried.
Calculate(retries uint) time.Duration
}
Backoff defines the interface for calculating backoff delays between retries. Implementations of this interface can provide custom logic for determining how long to wait before retrying a failed task based on the number of retries.
type BackoffPolicy ¶
BackoffPolicy specifies the parameters for calculating exponential backoff with jitter. It defines the base delay and the maximum delay allowed between retries.
Fields:
- BaseDelay: Specifies the initial delay duration.
- MaxDelay: Sets the upper bound for the backoff delay, preventing unbounded wait times.
func (*BackoffPolicy) Calculate ¶
func (b *BackoffPolicy) Calculate(retries uint) time.Duration
Calculate returns a jittered backoff duration based on the number of retries. It uses exponential backoff with full jitter, where the delay is randomly chosen between 0 and the calculated maximum delay. The maximum delay grows exponentially with each retry, capped by MaxDelay.
Example:
BaseDelay = 100ms, MaxDelay = 3s, retries = 3 Max calculated delay = min(100ms * 2^3, 3s) = 800ms Returned delay = random duration in [0, 800ms)
type Broker ¶
type Broker interface {
// Publish sends a task to the broker.
// Returns an error if the task could not be published.
Publish(task Task) error
// Consume returns a channel of tasks that the worker can consume.
// It also returns an error if the broker fails to start consuming tasks.
Consume() (<-chan Task, error)
}
type DefaultWorker ¶
type DefaultWorker struct {
// contains filtered or unexported fields
}
DefaultWorker is the standard implementation of the Worker interface.
It is responsible for consuming tasks from the provided Broker, executing them using registered handler functions, and retrying failed tasks based on a backoff policy.
Fields:
- id: A unique identifier for this worker instance.
- broker: The Broker used to fetch tasks for execution.
- backoff: The BackoffPolicy used to delay retries on task failure.
- handlers: A map of task names to their associated TaskHandlerFunc implementations.
- wg: A shared WaitGroup used by the Manager to coordinate worker shutdown.
DefaultWorker is intended to be created using a WorkerFactory and managed by a Manager. It supports context-based cancellation for graceful shutdown.
func (*DefaultWorker) Register ¶
func (w *DefaultWorker) Register(name string, handler TaskHandlerFunc)
Register registers a task handler for a specific task name.
The handler function will be associated with the provided task name and invoked when a task with that name is received by the worker.
Parameters:
- name: The unique name of the task being registered.
- handler: The TaskHandlerFunc that will handle the task when it is dispatched.
This method allows users to dynamically register multiple tasks for a worker, enabling the worker to handle a variety of task types.
Example:
worker.Register("send_email", sendEmailHandler)
func (*DefaultWorker) Start ¶
func (w *DefaultWorker) Start(ctx context.Context)
Start begins processing tasks for the worker and continuously listens for new tasks from the broker until the provided context is cancelled or the task channel is closed.
It runs a task consumption loop that will:
- Consume tasks from the broker
- Attempt to handle each task using the corresponding registered handler
- Retry failed tasks according to the worker's backoff policy (if provided)
- Exit when the context is cancelled or the task channel is closed
Parameters:
- ctx: The context used to control the lifetime of the worker. When the context is cancelled, the worker will shut down gracefully.
The worker will log information about task successes, failures, retries, and backoff delays.
Example:
worker.Start(ctx)
type HandlerRegistry ¶
type HandlerRegistry map[string]TaskHandlerFunc
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager is responsible for managing the lifecycle of workers, coordinating task consumption and processing, and gracefully stopping workers.
func NewManager ¶
func NewManager(broker Broker, wf WorkerFactory, numWorkers int, opts ...ManagerOption) *Manager
NewManager creates a new Manager instance responsible for coordinating and supervising workers.
Parameters:
- broker: The Broker used by all workers to fetch and process tasks.
- wf: A WorkerFactory function used to create each Worker with a provided WorkerConfig.
- numWorkers: The number of workers to spawn. If set to 0 or less, DefaultNumOfWorkers is used.
- opts: Optional functional configuration parameters for customizing Manager behavior (e.g. backoff policy).
The Manager sets up a cancellable context for controlling the lifecycle of its workers, and assigns a shared WaitGroup to coordinate shutdowns. If no BackoffPolicy is provided in the options, a DefaultBackoffPolicy is used.
Each worker is created using the WorkerFactory and given a unique ID, shared broker, backoff policy, and reference to the Manager's WaitGroup.
Returns:
A pointer to a fully initialized Manager ready to register task handlers and start processing.
Example:
mgr := NewManager(broker, MyWorkerFactory(), 5, WithBackoff(customBackoff))
mgr.RegisterTask("send_email", emailHandler)
mgr.Start()
func (*Manager) BackoffPolicy ¶
func (*Manager) PublishTask ¶
PublishTask publishes a task to the broker with the specified task name, arguments, and maximum retries.
This method creates a new task with the provided parameters and publishes it to the broker for consumption by workers.
Parameters:
- taskName: The name of the task to publish. This is used to identify the task in the queue.
- args: The arguments that will be passed to the task handler. These are passed as a map of key-value pairs.
- maxRetry: The maximum number of retries allowed for the task in case of failure. If the task fails more than the specified number of times, it will not be retried again.
Returns:
- An error if the task name is empty or there is an issue publishing the task to the broker. If no errors occur, the method will return nil.
Example usage:
err := manager.PublishTask("send_email", TaskArgs{"email": "[email protected]"}, 3)
if err != nil {
log.Printf("Error publishing task: %v", err)
}
func (*Manager) RegisterTask ¶
func (m *Manager) RegisterTask(taskName string, handler TaskHandlerFunc)
RegisterTask registers a task handler for the specified task name across all workers managed by the Manager. This allows each worker to handle tasks of the specified name by executing the provided handler function.
The handler function must have the signature: func(TaskArgs) error. If the task name is already registered, the handler will overwrite the existing one.
Parameters:
- taskName: The name of the task to register. This name is used to identify tasks in the queue.
- handler: The function that handles the task when it is consumed by the worker.
Example usage:
manager.RegisterTask("send_email", func(args TaskArgs) error {
// Handle the task
return nil
})
func (*Manager) Start ¶
func (m *Manager) Start()
Start starts all the workers managed by the Manager in separate goroutines.
This method launches each worker's `Start` method concurrently, passing the Manager's context to each worker. The workers will begin consuming tasks from the broker and processing them based on the registered task handlers.
Example usage:
manager.Start() // Starts all workers concurrently
func (*Manager) Stop ¶
func (m *Manager) Stop()
Stop stops the Manager and all its workers gracefully.
This method cancels the context associated with the Manager, signaling all workers to stop their work. It then waits for all workers to complete their shutdown process using the WaitGroup.
It is important to call `Stop` to ensure that all workers have finished their tasks and the system shuts down cleanly.
Example usage:
manager.Stop() // Stops all workers and waits for them to finish
type ManagerConfig ¶
type ManagerConfig struct {
BackoffPolicy Backoff
}
ManagerConfig holds configuration options for the Manager.
type ManagerOption ¶
type ManagerOption func(*ManagerConfig)
ManagerOption is a function type that modifies the configuration of a ManagerConfig. It allows for functional configuration of the Manager, enabling users to customize various settings, such as the backoff policy, when creating a Manager.
func WithBackoffPolicy ¶
func WithBackoffPolicy(bp Backoff) ManagerOption
WithBackoffPolicy is a functional option that sets the BackoffPolicy for a Manager. It allows the user to specify a custom backoff policy for retrying failed tasks in the Manager.
Example usage:
manager := NewManager(bp, numOfWorkers, WithBackoffPolicy(customBackoffPolicy))
type Task ¶
type Task struct {
Name string `json:"name"` // The name of the task, used to identify which handler to invoke.
Args TaskArgs `json:"args"` // The arguments for the task, provided as a map of key-value pairs.
Retry uint `json:"retry"` // The current retry count for this task.
MaxRetry uint `json:"max_retry"` // The maximum number of retries before the task is considered failed.
Timestamp time.Time `json:"timestamp"` // The timestamp when the task was created or scheduled.
}
Task represents an individual task in the task queue.
type TaskArgs ¶
TaskArgs represents the arguments passed to a task handler. It is a map where the key is a string (usually a task argument name) and the value is of type `any` which allows flexibility in the type of data.
type TaskHandlerFunc ¶
TaskHandlerFunc defines the signature of a function that handles tasks. It takes a TaskArgs object as input and returns an error if something goes wrong during task processing.
type Worker ¶
type Worker interface {
Register(name string, handler TaskHandlerFunc)
Start(ctx context.Context)
}
Worker defines the contract for a task-processing worker.
Implementations of Worker are responsible for registering task handlers and starting the task execution loop, typically consuming tasks from a Broker.
Methods:
- Register: Associates a task name with a handler function.
- Start: Begins processing tasks using the provided context for cancellation.
This interface allows different worker implementations to be plugged into the Manager, enabling customizable behavior such as logging, metrics, or concurrency models.
Example usage:
type MyWorker struct { ... }
func (w *MyWorker) Register(name string, handler TaskHandlerFunc) { ... }
func (w *MyWorker) Start(ctx context.Context) { ... }
func DefaultWorkerFactory ¶
func DefaultWorkerFactory(cfg WorkerConfig) Worker
DefaultWorkerFactory creates and returns a new instance of DefaultWorker using the provided WorkerConfig.
The worker is initialized with:
- A unique ID
- A broker for task consumption
- A backoff policy for retrying failed tasks
- A shared WaitGroup for graceful shutdown coordination
- An empty handler map ready for task registration
This function returns a Worker interface, allowing the DefaultWorker to be used polymorphically.
Typically used within a WorkerFactory passed to the Manager.
type WorkerConfig ¶
WorkerConfig provides the configuration necessary to initialize a Worker.
It is passed to the WorkerFactory function by the Manager to ensure that all workers have the required shared dependencies and context-specific data.
Fields:
- ID: A unique identifier for the worker, typically assigned by the Manager.
- Broker: The Broker instance used to retrieve and dispatch tasks.
- Backoff: The policy used for retrying tasks on failure.
- WG: A shared WaitGroup used by the Manager to coordinate worker shutdown.
This struct is designed to be extended if additional shared dependencies need to be passed to workers in the future.
type WorkerFactory ¶
type WorkerFactory func(cfg WorkerConfig) Worker
WorkerFactory defines a function that creates a new Worker instance using the provided configuration.
This allows users to customize how Workers are constructed, enabling injection of custom dependencies (e.g., loggers, metrics, settings) without modifying the Manager.
The WorkerConfig contains common fields such as worker ID, broker, backoff policy, and a wait group.
Example:
func MyWorkerFactory(cfg WorkerConfig) Worker {
return &MyCustomWorker{
id: cfg.ID,
broker: cfg.Broker,
backoff: cfg.Backoff,
wg: cfg.WG,
logger: myLogger, // custom dependency
}
}