workqueue

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 29, 2025 License: MIT Imports: 7 Imported by: 0

README

workqueue

Go Reference Go Report Card Release

workqueue is a high-performance task queue for Go applications. It provides powerful features like task expiration, delayed execution, deduplication by key, and prioritization, all without external dependencies. The current implementation is in-memory, ensuring low latency and high throughput for any workload. It was designed as a crawl frontier.

Features

  • Fast Processing: All operations (put, update, take, insert, expiration, delay) are all done in either O(n*log(n)) time or better using several priority queues to maintain state.
  • Task Expiration: Set an expiration time for items to ensure they are processed within a specific timeframe. Expired items are automatically removed.
  • Delayed Execution: Schedule items to become available for processing only after a certain delay.
  • Prioritization: Assign priorities to items to control the order of execution. Higher priority items are processed first.
  • FIFO Tie-Breaking: Items with the same priority are processed in a first-in, first-out (FIFO) order.
  • Goroutine-Safe: Safe for concurrent producers and consumers.

Usage

Creating a Work Queue

To get started, create a new in-memory work queue. The queue is generic and can handle any comparable key and value types.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/andrewortman/workqueue"
)

func main() {
	// Create a new work queue for string keys and values
	queue := workqueue.NewInMemory[string, string]()

	// The rest of the examples assume this `queue` instance
	// and a `context.Context` variable named `ctx` are available.
	ctx := context.Background()
	_ = queue
	_ = ctx
}
Configuration Options

The queue can be configured with functional options:

	// With a custom time provider (useful for testing)
	queue := workqueue.NewInMemory[string, string](
		workqueue.WithTimeProvider[string, string](myTimeProvider),
	)

	// With a capacity limit
	queue := workqueue.NewInMemory[string, string](
		workqueue.WithCapacity[string, string](1000),
	)
WithTimeProvider

Injects a custom TimeProvider for controlling time in tests or other scenarios.

WithCapacity

Sets a maximum number of items the queue can hold. When at capacity:

  • Put returns ErrAtCapacity and performs no mutations
  • PutOrUpdate returns ErrAtCapacity and performs no mutations if any new items would exceed capacity; a batch containing only updates to existing keys will succeed
Public API
Put

Adds one or more new items to the queue. It returns an error if an item with the same key already exists.

	item := workqueue.WorkItem[string, string]{
		Key:   "task1",
		Value: "process this data",
	}

	if err := queue.Put(ctx, item); err != nil {
		fmt.Println("Error adding item:", err)
	}
Update

Updates one or more existing items. It returns an error if an item's key is not found in the queue.

This can be used to update any field of the work item - eg priority, delay, or expiration.

	updatedItem := workqueue.WorkItem[string, string]{
		Key:   "task1",
		Value: "updated data",
	}

	if err := queue.Update(ctx, updatedItem); err != nil {
		fmt.Println("Error updating item:", err)
	}
PutOrUpdate

Adds new items or updates existing ones. This is useful when you want to ensure an item is in the queue, regardless of whether it was there before.

	newItem := workqueue.WorkItem[string, string]{
		Key:   "task2",
		Value: "new or updated data",
	}
	if err := queue.PutOrUpdate(ctx, newItem); err != nil {
		fmt.Println("Error with PutOrUpdate:", err)
	}
Remove

Removes an item from the queue by its key. It returns an error if the key is not found.

	if err := queue.Remove(ctx, "task1"); err != nil {
		fmt.Println("Error removing item:", err)
	}
Take

Blocks until an item is available, then returns it. Take is the primary method for consuming items from the queue. It respects context cancellation.

	// This will block until an item is available
	takenItem, err := queue.Take(ctx)
	if err != nil {
		fmt.Println("Error taking item:", err)
		return
	}
	fmt.Println("Processing item:", takenItem.Key)
TakeMany

Blocks until n items are available, and returns them as a slice. Like Take, it respects context cancellation.

	// This will block until 2 items are available
	items, err := queue.TakeMany(ctx, 2)
	if err != nil {
		fmt.Println("Error taking many items:", err)
		return
	}
	fmt.Println("Processing batch of", len(items), "items")
Context Handling

All blocking operations (Take, TakeMany) and synchronous operations respect context cancellation. If the context is canceled while an operation is in progress, the function will unblock and return a context error.

	// Create a context that will be canceled after 1 second
	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
	defer cancel()

	// If no item is available within 1 second, Take will return a context error.
	_, err := queue.Take(ctx)
	if err != nil {
		// This will print "context deadline exceeded"
		fmt.Println(err)
	}
Size

Returns the number of pending and delayed items in the queue.

	size, err := queue.Size(ctx)
	if err != nil {
		fmt.Println("Error getting size:", err)
		return
	}
	fmt.Printf("Queue size: %d pending, %d delayed\n", size.Pending, size.Delayed)
UpdateConditional

Updates existing items only when a predicate returns true. The predicate receives a copy of the existing item and the new item.

	shouldUpdate := func(existing workqueue.WorkItem[string, string], new workqueue.WorkItem[string, string]) bool {
		// Only update if the new priority is higher or the value changed
		return new.Priority > existing.Priority || new.Value != existing.Value
	}

	err := queue.UpdateConditional(ctx, shouldUpdate,
		workqueue.WorkItem[string, string]{Key: "task1", Value: "maybe update", Priority: 50},
	)
	if err != nil {
		fmt.Println("Error conditional update:", err)
	}
PutOrUpdateConditional

Inserts new items or updates existing items only when a predicate returns true. For existing items, the predicate receives a pointer to a copy of the existing item; for new items, the pointer is nil.

	shouldPutOrUpdate := func(existing *workqueue.WorkItem[string, string], new workqueue.WorkItem[string, string]) bool {
		if existing == nil {
			// Only insert if priority >= 10
			return new.Priority >= 10
		}
		// For existing items, only update if expiry or delay changed
		return !existing.ExpiresAt.Equal(new.ExpiresAt) || !existing.DelayedUntil.Equal(new.DelayedUntil)
	}

	err := queue.PutOrUpdateConditional(ctx, shouldPutOrUpdate,
		workqueue.WorkItem[string, string]{Key: "task2", Value: "maybe insert/update", Priority: 20},
	)
	if err != nil {
		fmt.Println("Error conditional putOrUpdate:", err)
	}

Performance

Complexity
Method Time Complexity Space Complexity
Put O(log n) O(1)
Update O(log n) O(1)
PutOrUpdate O(log n) O(1)
Remove O(log n) O(1)
Take O(log n) O(1)
TakeMany O(k * log n) for k items O(k) for k items
Size O(log n) O(1)

n = number of total items in the queue

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrInvalidItem is a base error for invalid work items.
	ErrInvalidItem = errors.New("workqueue: invalid item")
	// ErrExpiryBeforeDelay is returned when an item's expiry is before its delay.
	ErrExpiryBeforeDelay = fmt.Errorf("%w: expiry is before delay", ErrInvalidItem)
	// ErrItemExists is returned when an item with the same key already exists.
	ErrItemExists = errors.New("workqueue: item already exists")
	// ErrItemNotFound is returned when an item is not found in the queue.
	ErrItemNotFound = errors.New("workqueue: item not found")
	// ErrAtCapacity is returned when the queue is at capacity and cannot accept new items.
	ErrAtCapacity = errors.New("workqueue: queue is at capacity")
)

Functions

This section is empty.

Types

type InMemory

type InMemory[K comparable, V comparable] struct {
	// contains filtered or unexported fields
}

InMemory is an in-memory priority/delay/expiry work queue.

func NewInMemory

func NewInMemory[K comparable, V comparable](opts ...Option[K, V]) *InMemory[K, V]

NewInMemory constructs the queue with the provided options.

func (*InMemory[K, V]) Put

func (q *InMemory[K, V]) Put(ctx context.Context, items ...WorkItem[K, V]) error

Put synchronously adds new items to the queue. Returns an error if an item with the same key already exists. Returns ErrAtCapacity if the queue has a capacity limit and adding the items would exceed it.

func (*InMemory[K, V]) PutOrUpdate

func (q *InMemory[K, V]) PutOrUpdate(ctx context.Context, items ...WorkItem[K, V]) error

PutOrUpdate synchronously adds new items or updates existing items. For items with duplicate keys in the same batch, the last one wins.

func (*InMemory[K, V]) PutOrUpdateConditional

func (q *InMemory[K, V]) PutOrUpdateConditional(ctx context.Context, shouldUpdate func(existing *WorkItem[K, V], new WorkItem[K, V]) bool, items ...WorkItem[K, V]) error

PutOrUpdateConditional synchronously adds new items or updates existing items conditionally. The shouldUpdate predicate receives a pointer to a copy of the existing item (nil if not found) and the new item. It should return true if the update should proceed. For new items (existing == nil), returning true will insert the item; returning false will skip it.

func (*InMemory[K, V]) Remove

func (q *InMemory[K, V]) Remove(ctx context.Context, key K) error

Remove synchronously removes an item from the queue by its key. Returns an error if no item with the given key exists. The removed item is returned if found.

func (*InMemory[K, V]) Size

func (q *InMemory[K, V]) Size(ctx context.Context) (SizeResult, error)

Size runs GC and returns the number of non-expired items (pending, delayed).

func (*InMemory[K, V]) Take

func (q *InMemory[K, V]) Take(ctx context.Context) (WorkItem[K, V], error)

Take blocks until an item is available or ctx is canceled. On cancellation, returns the context error and the zero value of T.

func (*InMemory[K, V]) TakeMany

func (q *InMemory[K, V]) TakeMany(ctx context.Context, n int) ([]WorkItem[K, V], error)

TakeMany blocks until n items are available or the context is canceled. It waits until a full batch of n items is available before taking any, ensuring that items remain in the queue and are eligible for updates until the entire batch is returned. If the context is canceled, it returns immediately with a context error and no items.

func (*InMemory[K, V]) TakeUpTo added in v0.3.0

func (q *InMemory[K, V]) TakeUpTo(ctx context.Context, maxItems int) ([]WorkItem[K, V], error)

TakeUpTo blocks until at least one item is available, then takes up to maxItems. Unlike TakeMany which waits for exactly n items, TakeUpTo greedily takes whatever is available (up to maxItems) once at least one item is ready. If the context is canceled, it returns immediately with a context error and no items.

func (*InMemory[K, V]) Update

func (q *InMemory[K, V]) Update(ctx context.Context, items ...WorkItem[K, V]) error

Update synchronously updates existing items in the queue. Returns an error if any item with the given key does not exist. Always updates the item with new values, including heap reorganization if needed.

func (*InMemory[K, V]) UpdateConditional

func (q *InMemory[K, V]) UpdateConditional(ctx context.Context, shouldUpdate func(existing WorkItem[K, V], new WorkItem[K, V]) bool, items ...WorkItem[K, V]) error

UpdateConditional synchronously updates existing items in the queue only if the shouldUpdate predicate returns true. Returns an error if any item with the given key does not exist. The predicate receives a copy of the existing item and the new item, and should return true if the update should proceed.

type Option added in v0.2.0

type Option[K comparable, V comparable] func(*InMemory[K, V])

Option configures an InMemory queue.

func WithCapacity added in v0.2.0

func WithCapacity[K comparable, V comparable](capacity int) Option[K, V]

WithCapacity sets the maximum number of items the queue can hold. When the queue is at capacity, Put will return ErrAtCapacity. PutOrUpdate will still succeed if it only results in an update (no new items). A capacity of 0 or negative means unlimited.

func WithTimeProvider added in v0.2.0

func WithTimeProvider[K comparable, V comparable](tp TimeProvider) Option[K, V]

WithTimeProvider sets a custom time provider for the queue.

type SizeResult

type SizeResult struct {
	Pending int
	Delayed int
}

type TimeProvider

type TimeProvider interface {
	Now() time.Time
}

TimeProvider is an interface for getting the current time.

type WorkItem

type WorkItem[K comparable, V comparable] struct {
	Key   K
	Value V

	Priority     int64     // zero means no priority differentiation
	ExpiresAt    time.Time // if zero, no expiration
	DelayedUntil time.Time // if zero, effectively no delay
}

WorkItem represents a unit of work in the queue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL