Documentation
¶
Index ¶
- Variables
- type InMemory
- func (q *InMemory[K, V]) Put(ctx context.Context, items ...WorkItem[K, V]) error
- func (q *InMemory[K, V]) PutOrUpdate(ctx context.Context, items ...WorkItem[K, V]) error
- func (q *InMemory[K, V]) PutOrUpdateConditional(ctx context.Context, ...) error
- func (q *InMemory[K, V]) Remove(ctx context.Context, key K) error
- func (q *InMemory[K, V]) Size(ctx context.Context) (SizeResult, error)
- func (q *InMemory[K, V]) Take(ctx context.Context) (WorkItem[K, V], error)
- func (q *InMemory[K, V]) TakeMany(ctx context.Context, n int) ([]WorkItem[K, V], error)
- func (q *InMemory[K, V]) TakeUpTo(ctx context.Context, maxItems int) ([]WorkItem[K, V], error)
- func (q *InMemory[K, V]) Update(ctx context.Context, items ...WorkItem[K, V]) error
- func (q *InMemory[K, V]) UpdateConditional(ctx context.Context, ...) error
- type Option
- type SizeResult
- type TimeProvider
- type WorkItem
Constants ¶
This section is empty.
Variables ¶
var ( // ErrInvalidItem is a base error for invalid work items. ErrInvalidItem = errors.New("workqueue: invalid item") // ErrExpiryBeforeDelay is returned when an item's expiry is before its delay. ErrExpiryBeforeDelay = fmt.Errorf("%w: expiry is before delay", ErrInvalidItem) // ErrItemExists is returned when an item with the same key already exists. ErrItemExists = errors.New("workqueue: item already exists") // ErrItemNotFound is returned when an item is not found in the queue. ErrItemNotFound = errors.New("workqueue: item not found") // ErrAtCapacity is returned when the queue is at capacity and cannot accept new items. ErrAtCapacity = errors.New("workqueue: queue is at capacity") )
Functions ¶
This section is empty.
Types ¶
type InMemory ¶
type InMemory[K comparable, V comparable] struct { // contains filtered or unexported fields }
InMemory is an in-memory priority/delay/expiry work queue.
func NewInMemory ¶
func NewInMemory[K comparable, V comparable](opts ...Option[K, V]) *InMemory[K, V]
NewInMemory constructs the queue with the provided options.
func (*InMemory[K, V]) Put ¶
Put synchronously adds new items to the queue. Returns an error if an item with the same key already exists. Returns ErrAtCapacity if the queue has a capacity limit and adding the items would exceed it.
func (*InMemory[K, V]) PutOrUpdate ¶
PutOrUpdate synchronously adds new items or updates existing items. For items with duplicate keys in the same batch, the last one wins.
func (*InMemory[K, V]) PutOrUpdateConditional ¶
func (q *InMemory[K, V]) PutOrUpdateConditional(ctx context.Context, shouldUpdate func(existing *WorkItem[K, V], new WorkItem[K, V]) bool, items ...WorkItem[K, V]) error
PutOrUpdateConditional synchronously adds new items or updates existing items conditionally. The shouldUpdate predicate receives a pointer to a copy of the existing item (nil if not found) and the new item. It should return true if the update should proceed. For new items (existing == nil), returning true will insert the item; returning false will skip it.
func (*InMemory[K, V]) Remove ¶
Remove synchronously removes an item from the queue by its key. Returns an error if no item with the given key exists. The removed item is returned if found.
func (*InMemory[K, V]) Size ¶
func (q *InMemory[K, V]) Size(ctx context.Context) (SizeResult, error)
Size runs GC and returns the number of non-expired items (pending, delayed).
func (*InMemory[K, V]) Take ¶
Take blocks until an item is available or ctx is canceled. On cancellation, returns the context error and the zero value of T.
func (*InMemory[K, V]) TakeMany ¶
TakeMany blocks until n items are available or the context is canceled. It waits until a full batch of n items is available before taking any, ensuring that items remain in the queue and are eligible for updates until the entire batch is returned. If the context is canceled, it returns immediately with a context error and no items.
func (*InMemory[K, V]) TakeUpTo ¶ added in v0.3.0
TakeUpTo blocks until at least one item is available, then takes up to maxItems. Unlike TakeMany which waits for exactly n items, TakeUpTo greedily takes whatever is available (up to maxItems) once at least one item is ready. If the context is canceled, it returns immediately with a context error and no items.
func (*InMemory[K, V]) Update ¶
Update synchronously updates existing items in the queue. Returns an error if any item with the given key does not exist. Always updates the item with new values, including heap reorganization if needed.
func (*InMemory[K, V]) UpdateConditional ¶
func (q *InMemory[K, V]) UpdateConditional(ctx context.Context, shouldUpdate func(existing WorkItem[K, V], new WorkItem[K, V]) bool, items ...WorkItem[K, V]) error
UpdateConditional synchronously updates existing items in the queue only if the shouldUpdate predicate returns true. Returns an error if any item with the given key does not exist. The predicate receives a copy of the existing item and the new item, and should return true if the update should proceed.
type Option ¶ added in v0.2.0
type Option[K comparable, V comparable] func(*InMemory[K, V])
Option configures an InMemory queue.
func WithCapacity ¶ added in v0.2.0
func WithCapacity[K comparable, V comparable](capacity int) Option[K, V]
WithCapacity sets the maximum number of items the queue can hold. When the queue is at capacity, Put will return ErrAtCapacity. PutOrUpdate will still succeed if it only results in an update (no new items). A capacity of 0 or negative means unlimited.
func WithTimeProvider ¶ added in v0.2.0
func WithTimeProvider[K comparable, V comparable](tp TimeProvider) Option[K, V]
WithTimeProvider sets a custom time provider for the queue.
type SizeResult ¶
type TimeProvider ¶
TimeProvider is an interface for getting the current time.
type WorkItem ¶
type WorkItem[K comparable, V comparable] struct { Key K Value V Priority int64 // zero means no priority differentiation ExpiresAt time.Time // if zero, no expiration DelayedUntil time.Time // if zero, effectively no delay }
WorkItem represents a unit of work in the queue.