taskqueue

package module
v0.0.0-...-ebd6e2b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2025 License: MIT Imports: 7 Imported by: 0

README

TaskQueue: Go-based Distributed Task Queue

Test and Coverage codecov

TaskQueue is a Go library that provides a simple and efficient way to manage and execute asynchronous tasks. It's inspired by Celery and designed to be highly extensible, allowing you to easily distribute tasks across multiple workers.

⚠️ Warning: Not Production Ready

Features
  • Task Definition: Easily define tasks with customizable arguments and retry behavior.

  • Worker: Workers fetch tasks from a broker and execute them with optional retry and backoff strategies.

  • Manager: Manage multiple workers, task registration, and graceful shutdown handling.

  • Broker Interface: Abstracts task transport; easily extensible to different backends.

  • Redis Broker: Built-in Redis-based broker for task queueing.

  • Backoff Policy: Optional exponential backoff with jitter for retrying failed tasks.

Coming Soon
  • Scheduled Tasks: Ability to schedule tasks to run at specific times or intervals.

  • Custom Logger Support: Allow users to inject their own logging system.

  • Better Error Handling and Dead Letter Queues: Capture and manage tasks that permanently fail after retries.

  • New Broker Implementations: Add support for new brokers

    • RabbitMQ: Native support for RabbitMQ as a task transport backend.

    • GCP Pub/Sub: Experimental support for Google Cloud Pub/Sub as a broker option.

Trying it out

Since the library is still under development and not yet published, you can clone the repository and run the examples locally:

git clone [email protected]:KengoWada/taskqueue.git
cd taskqueue/example

# Assumes you have redis running with this address: localhost:6379
# Change the address value if neccessary
go run main.go

The example/ directory contains a simple usage demonstration to help you get started quickly.

Documentation

Index

Constants

View Source
const DefaultNumOfWorkers int = 1

Variables

View Source
var DefaultBackoffPolicy = BackoffPolicy{
	BaseDelay: 1 * time.Second,
	MaxDelay:  30 * time.Second,
}
View Source
var ErrEmptyTaskName = errors.New("task name must not be empty")

Functions

This section is empty.

Types

type Backoff

type Backoff interface {
	// Calculate returns the duration to wait before the next retry attempt.
	// The input parameter retries indicates how many times the task has already been retried.
	Calculate(retries uint) time.Duration
}

Backoff defines the interface for calculating backoff delays between retries. Implementations of this interface can provide custom logic for determining how long to wait before retrying a failed task based on the number of retries.

type BackoffPolicy

type BackoffPolicy struct {
	BaseDelay time.Duration
	MaxDelay  time.Duration
}

BackoffPolicy specifies the parameters for calculating exponential backoff with jitter. It defines the base delay and the maximum delay allowed between retries.

Fields:

  • BaseDelay: Specifies the initial delay duration.
  • MaxDelay: Sets the upper bound for the backoff delay, preventing unbounded wait times.

func (*BackoffPolicy) Calculate

func (b *BackoffPolicy) Calculate(retries uint) time.Duration

Calculate returns a jittered backoff duration based on the number of retries. It uses exponential backoff with full jitter, where the delay is randomly chosen between 0 and the calculated maximum delay. The maximum delay grows exponentially with each retry, capped by MaxDelay.

Example:

BaseDelay = 100ms, MaxDelay = 3s, retries = 3
Max calculated delay = min(100ms * 2^3, 3s) = 800ms
Returned delay = random duration in [0, 800ms)

type Broker

type Broker interface {
	// Publish sends a task to the broker.
	// Returns an error if the task could not be published.
	Publish(task Task) error

	// Consume returns a channel of tasks that the worker can consume.
	// It also returns an error if the broker fails to start consuming tasks.
	Consume() (<-chan Task, error)
}

type DefaultWorker

type DefaultWorker struct {
	// contains filtered or unexported fields
}

DefaultWorker is the standard implementation of the Worker interface.

It is responsible for consuming tasks from the provided Broker, executing them using registered handler functions, and retrying failed tasks based on a backoff policy.

Fields:

  • id: A unique identifier for this worker instance.
  • broker: The Broker used to fetch tasks for execution.
  • backoff: The BackoffPolicy used to delay retries on task failure.
  • handlers: A map of task names to their associated TaskHandlerFunc implementations.
  • wg: A shared WaitGroup used by the Manager to coordinate worker shutdown.

DefaultWorker is intended to be created using a WorkerFactory and managed by a Manager. It supports context-based cancellation for graceful shutdown.

func (*DefaultWorker) Register

func (w *DefaultWorker) Register(name string, handler TaskHandlerFunc)

Register registers a task handler for a specific task name.

The handler function will be associated with the provided task name and invoked when a task with that name is received by the worker.

Parameters:

  • name: The unique name of the task being registered.
  • handler: The TaskHandlerFunc that will handle the task when it is dispatched.

This method allows users to dynamically register multiple tasks for a worker, enabling the worker to handle a variety of task types.

Example:

worker.Register("send_email", sendEmailHandler)

func (*DefaultWorker) Start

func (w *DefaultWorker) Start(ctx context.Context)

Start begins processing tasks for the worker and continuously listens for new tasks from the broker until the provided context is cancelled or the task channel is closed.

It runs a task consumption loop that will:

  • Consume tasks from the broker
  • Attempt to handle each task using the corresponding registered handler
  • Retry failed tasks according to the worker's backoff policy (if provided)
  • Exit when the context is cancelled or the task channel is closed

Parameters:

  • ctx: The context used to control the lifetime of the worker. When the context is cancelled, the worker will shut down gracefully.

The worker will log information about task successes, failures, retries, and backoff delays.

Example:

worker.Start(ctx)

type HandlerRegistry

type HandlerRegistry map[string]TaskHandlerFunc

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager is responsible for managing the lifecycle of workers, coordinating task consumption and processing, and gracefully stopping workers.

func NewManager

func NewManager(broker Broker, wf WorkerFactory, numWorkers int, opts ...ManagerOption) *Manager

NewManager creates a new Manager instance responsible for coordinating and supervising workers.

Parameters:

  • broker: The Broker used by all workers to fetch and process tasks.
  • wf: A WorkerFactory function used to create each Worker with a provided WorkerConfig.
  • numWorkers: The number of workers to spawn. If set to 0 or less, DefaultNumOfWorkers is used.
  • opts: Optional functional configuration parameters for customizing Manager behavior (e.g. backoff policy).

The Manager sets up a cancellable context for controlling the lifecycle of its workers, and assigns a shared WaitGroup to coordinate shutdowns. If no BackoffPolicy is provided in the options, a DefaultBackoffPolicy is used.

Each worker is created using the WorkerFactory and given a unique ID, shared broker, backoff policy, and reference to the Manager's WaitGroup.

Returns:

A pointer to a fully initialized Manager ready to register task handlers and start processing.

Example:

mgr := NewManager(broker, MyWorkerFactory(), 5, WithBackoff(customBackoff))
mgr.RegisterTask("send_email", emailHandler)
mgr.Start()

func (*Manager) BackoffPolicy

func (m *Manager) BackoffPolicy() Backoff

func (*Manager) PublishTask

func (m *Manager) PublishTask(taskName string, args TaskArgs, maxRetry uint) error

PublishTask publishes a task to the broker with the specified task name, arguments, and maximum retries.

This method creates a new task with the provided parameters and publishes it to the broker for consumption by workers.

Parameters:

  • taskName: The name of the task to publish. This is used to identify the task in the queue.
  • args: The arguments that will be passed to the task handler. These are passed as a map of key-value pairs.
  • maxRetry: The maximum number of retries allowed for the task in case of failure. If the task fails more than the specified number of times, it will not be retried again.

Returns:

  • An error if the task name is empty or there is an issue publishing the task to the broker. If no errors occur, the method will return nil.

Example usage:

err := manager.PublishTask("send_email", TaskArgs{"email": "[email protected]"}, 3)
if err != nil {
    log.Printf("Error publishing task: %v", err)
}

func (*Manager) RegisterTask

func (m *Manager) RegisterTask(taskName string, handler TaskHandlerFunc)

RegisterTask registers a task handler for the specified task name across all workers managed by the Manager. This allows each worker to handle tasks of the specified name by executing the provided handler function.

The handler function must have the signature: func(TaskArgs) error. If the task name is already registered, the handler will overwrite the existing one.

Parameters:

  • taskName: The name of the task to register. This name is used to identify tasks in the queue.
  • handler: The function that handles the task when it is consumed by the worker.

Example usage:

manager.RegisterTask("send_email", func(args TaskArgs) error {
    // Handle the task
    return nil
})

func (*Manager) Start

func (m *Manager) Start()

Start starts all the workers managed by the Manager in separate goroutines.

This method launches each worker's `Start` method concurrently, passing the Manager's context to each worker. The workers will begin consuming tasks from the broker and processing them based on the registered task handlers.

Example usage:

manager.Start()  // Starts all workers concurrently

func (*Manager) Stop

func (m *Manager) Stop()

Stop stops the Manager and all its workers gracefully.

This method cancels the context associated with the Manager, signaling all workers to stop their work. It then waits for all workers to complete their shutdown process using the WaitGroup.

It is important to call `Stop` to ensure that all workers have finished their tasks and the system shuts down cleanly.

Example usage:

manager.Stop()  // Stops all workers and waits for them to finish

func (*Manager) Workers

func (m *Manager) Workers() []Worker

type ManagerConfig

type ManagerConfig struct {
	BackoffPolicy Backoff
}

ManagerConfig holds configuration options for the Manager.

type ManagerOption

type ManagerOption func(*ManagerConfig)

ManagerOption is a function type that modifies the configuration of a ManagerConfig. It allows for functional configuration of the Manager, enabling users to customize various settings, such as the backoff policy, when creating a Manager.

func WithBackoffPolicy

func WithBackoffPolicy(bp Backoff) ManagerOption

WithBackoffPolicy is a functional option that sets the BackoffPolicy for a Manager. It allows the user to specify a custom backoff policy for retrying failed tasks in the Manager.

Example usage:

manager := NewManager(bp, numOfWorkers, WithBackoffPolicy(customBackoffPolicy))

type Task

type Task struct {
	Name      string    `json:"name"`      // The name of the task, used to identify which handler to invoke.
	Args      TaskArgs  `json:"args"`      // The arguments for the task, provided as a map of key-value pairs.
	Retry     uint      `json:"retry"`     // The current retry count for this task.
	MaxRetry  uint      `json:"max_retry"` // The maximum number of retries before the task is considered failed.
	Timestamp time.Time `json:"timestamp"` // The timestamp when the task was created or scheduled.
}

Task represents an individual task in the task queue.

type TaskArgs

type TaskArgs map[string]any

TaskArgs represents the arguments passed to a task handler. It is a map where the key is a string (usually a task argument name) and the value is of type `any` which allows flexibility in the type of data.

type TaskHandlerFunc

type TaskHandlerFunc func(TaskArgs) error

TaskHandlerFunc defines the signature of a function that handles tasks. It takes a TaskArgs object as input and returns an error if something goes wrong during task processing.

type Worker

type Worker interface {
	Register(name string, handler TaskHandlerFunc)
	Start(ctx context.Context)
}

Worker defines the contract for a task-processing worker.

Implementations of Worker are responsible for registering task handlers and starting the task execution loop, typically consuming tasks from a Broker.

Methods:

  • Register: Associates a task name with a handler function.
  • Start: Begins processing tasks using the provided context for cancellation.

This interface allows different worker implementations to be plugged into the Manager, enabling customizable behavior such as logging, metrics, or concurrency models.

Example usage:

type MyWorker struct { ... }
func (w *MyWorker) Register(name string, handler TaskHandlerFunc) { ... }
func (w *MyWorker) Start(ctx context.Context) { ... }

func DefaultWorkerFactory

func DefaultWorkerFactory(cfg WorkerConfig) Worker

DefaultWorkerFactory creates and returns a new instance of DefaultWorker using the provided WorkerConfig.

The worker is initialized with:

  • A unique ID
  • A broker for task consumption
  • A backoff policy for retrying failed tasks
  • A shared WaitGroup for graceful shutdown coordination
  • An empty handler map ready for task registration

This function returns a Worker interface, allowing the DefaultWorker to be used polymorphically.

Typically used within a WorkerFactory passed to the Manager.

type WorkerConfig

type WorkerConfig struct {
	ID      int
	Broker  Broker
	Backoff Backoff
	WG      *sync.WaitGroup
}

WorkerConfig provides the configuration necessary to initialize a Worker.

It is passed to the WorkerFactory function by the Manager to ensure that all workers have the required shared dependencies and context-specific data.

Fields:

  • ID: A unique identifier for the worker, typically assigned by the Manager.
  • Broker: The Broker instance used to retrieve and dispatch tasks.
  • Backoff: The policy used for retrying tasks on failure.
  • WG: A shared WaitGroup used by the Manager to coordinate worker shutdown.

This struct is designed to be extended if additional shared dependencies need to be passed to workers in the future.

type WorkerFactory

type WorkerFactory func(cfg WorkerConfig) Worker

WorkerFactory defines a function that creates a new Worker instance using the provided configuration.

This allows users to customize how Workers are constructed, enabling injection of custom dependencies (e.g., loggers, metrics, settings) without modifying the Manager.

The WorkerConfig contains common fields such as worker ID, broker, backoff policy, and a wait group.

Example:

func MyWorkerFactory(cfg WorkerConfig) Worker {
    return &MyCustomWorker{
        id:      cfg.ID,
        broker:  cfg.Broker,
        backoff: cfg.Backoff,
        wg:      cfg.WG,
        logger:  myLogger, // custom dependency
    }
}

Directories

Path Synopsis
brokers

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL