uptask

package module
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2025 License: MIT Imports: 17 Imported by: 0

README

uptask

Task library wrapping the Upstash/Qstash APIs

Heavily inspired by river task library for go.

// Url schemaes

$Host/tasks/taskName $Host/events/eventHandler/event

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddEventHandler added in v0.5.0

func AddEventHandler[E Event](service *TaskService, handlerName string, handler EventHandler[E])

func AddEventHandlerSafely added in v0.5.0

func AddEventHandlerSafely[E Event](service *TaskService, handlerName string, handler EventHandler[E]) error

func AddTaskHandler

func AddTaskHandler[T TaskArgs](service *TaskService, handler TaskHandler[T])

AddTaskHandler registers a TaskHandler on the provided TaskService bundle. Each TaskHandler must be registered so that the Service knows it should handle a specific kind of task (as returned by its `Kind()` method).

Use by explicitly specifying a TaskArgs type and then passing an instance of a task handler for the same type:

taskservice.AddTaskHandler(taskservice, &SortTaskHandler{})

Note that AddTaskHandler can panic in some situations, such as if the task handler is already registered or if its configuration is otherwise invalid. This default probably makes sense for most applications because you wouldn't want to start an application with invalid hardcoded runtime configuration. If you want to avoid panics, use AddTaskHandlerSafely instead.

func AddTaskHandlerSafely

func AddTaskHandlerSafely[T TaskArgs](service *TaskService, handler TaskHandler[T]) error

AddTaskHandlerSafely registers a task handler on the provided TaskService bundle. Unlike AddTaskHandler, AddTaskHandlerSafely does not panic and instead returns an error if the task handler is already registered or if its configuration is invalid.

Use by explicitly specifying a TaskArgs type and then passing an instance of a task handler for the same type:

taskservice.AddTaskHandlerSafely[SortArgs](service, &SortTaskHandler{}).

func JobSnooze

func JobSnooze(duration time.Duration) error

Types

type AnyTask

type AnyTask struct {
	Id        string
	CreatedAt time.Time
	//MaxRetries      int
	//ScheduledAt     time.Time
	//Queue           string
	//Tags            []string
	Retried         int
	Scheduled       bool
	ScheduleId      string
	QstashMessageId string
	Args            interface{}
}

type ClientOption

type ClientOption func(*TaskClient)

func WithClientLogger

func WithClientLogger(l Logger) ClientOption

func WithClientStore

func WithClientStore(s TaskStore) ClientOption

type Container added in v0.5.0

type Container[T any] struct {
	Id string

	CreatedAt  time.Time
	InsertOpts InsertOpts
	Retried    int
	Scheduled  bool
	Args       T
	// contains filtered or unexported fields
}

type ErrorCode added in v0.1.0

type ErrorCode string

ErrorCode represents specific error types

const (
	ErrInvalidRequest    ErrorCode = "INVALID_REQUEST"
	ErrTransportCreation ErrorCode = "TRANSPORT_CREATION_FAILED"
	ErrClientCreation    ErrorCode = "CLIENT_CREATION_FAILED"
	ErrDeliveryFailed    ErrorCode = "DELIVERY_FAILED"
	ErrInvalidSchedule   ErrorCode = "INVALID_SCHEDULE"
	ErrBadResponse       ErrorCode = "BAD_RESPONSE"
)

type Event added in v0.5.0

type Event TaskArgs

type EventFanoutArgs added in v0.5.0

type EventFanoutArgs struct {
	Handlers  []string
	EventType string
	Payload   any
}

func (EventFanoutArgs) Kind added in v0.5.0

func (e EventFanoutArgs) Kind() string

type EventFanoutWorker added in v0.5.0

type EventFanoutWorker struct {
	TaskHandlerDefaults[EventFanoutArgs]
	// contains filtered or unexported fields
}

func (*EventFanoutWorker) ProcessTask added in v0.5.0

func (w *EventFanoutWorker) ProcessTask(ctx context.Context, task *Container[EventFanoutArgs]) error

type EventHandler added in v0.5.0

type EventHandler[T Event] interface {

	// Timeout is the maximum amount of time the task is allowed to run before
	// its context is cancelled. A timeout of zero (the default) means the task
	// will inherit the Service-level timeout. A timeout of -1 means the task's
	// context will never time out.
	Timeout(task *Container[T]) time.Duration

	// ProcessTask performs the task and returns an error if the task failed. The context
	// will be configured with a timeout according to the task handler settings and may
	// be cancelled for other reasons.
	//
	// If no error is returned, the task is assumed to have succeeded and will be
	// marked completed.
	//
	// It is important for any task handler to respect context cancellation to enable
	// the service to respond to shutdown requests; there is no way to cancel a
	// running task that does not respect context cancellation, other than
	// terminating the process.
	ProcessEvent(ctx context.Context, task *Container[T]) error
}

type Handler

type Handler interface {
	HandleEvent(context.Context, cloudevents.Event) error
}

type HandlerFunc

type HandlerFunc func(context.Context, cloudevents.Event) error

func (HandlerFunc) HandleEvent

func (f HandlerFunc) HandleEvent(ctx context.Context, event cloudevents.Event) error

type InsertOpts added in v0.5.0

type InsertOpts struct {
	MaxRetries  int
	Queue       string
	ScheduledAt time.Time
	Tags        []string
}

func (*InsertOpts) FromCloudEvent added in v0.5.0

func (o *InsertOpts) FromCloudEvent(ce cloudevents.Event) error

type Logger

type Logger interface {
	Debug(string, ...any)
	Warn(string, ...any)
	Info(string, ...any)
	Error(string, ...any)
}

type Middleware

type Middleware func(HandlerFunc) HandlerFunc

Middleware defines a function that wraps a HandlerFunc

type MiddlewareFunc

type MiddlewareFunc func(handler Handler) Handler

type RedisConfig

type RedisConfig struct {
	Addr     string
	Username string
	Password string
	DB       int
	Secure   bool
}

RedisConfig holds configuration for Redis-based task store

type RedisTaskStore

type RedisTaskStore struct {
	// contains filtered or unexported fields
}

func NewRedisTaskStore

func NewRedisTaskStore(cfg RedisConfig) (*RedisTaskStore, error)

func (*RedisTaskStore) AddTaskError

func (s *RedisTaskStore) AddTaskError(ctx context.Context, taskID string, taskError TaskError) error

func (*RedisTaskStore) CleanupOldTaskExecutions

func (s *RedisTaskStore) CleanupOldTaskExecutions(ctx context.Context, olderThan time.Duration) error

func (*RedisTaskStore) CreateTaskExecution

func (s *RedisTaskStore) CreateTaskExecution(ctx context.Context, task *TaskExecution) error

func (*RedisTaskStore) DeleteTaskExecution

func (s *RedisTaskStore) DeleteTaskExecution(ctx context.Context, taskID string) error

func (*RedisTaskStore) GetMostRecentTaskExecutions

func (s *RedisTaskStore) GetMostRecentTaskExecutions(ctx context.Context, limit int) ([]*TaskExecution, error)

func (*RedisTaskStore) GetTaskExecution

func (s *RedisTaskStore) GetTaskExecution(ctx context.Context, taskID string) (*TaskExecution, error)

func (*RedisTaskStore) ListTaskExecutions

func (s *RedisTaskStore) ListTaskExecutions(ctx context.Context, filter TaskFilter) ([]*TaskExecution, error)

func (*RedisTaskStore) TaskExists added in v0.0.9

func (s *RedisTaskStore) TaskExists(ctx context.Context, taskID string) (bool, error)

func (*RedisTaskStore) UpdateTaskSnoozedTask added in v0.0.5

func (s *RedisTaskStore) UpdateTaskSnoozedTask(ctx context.Context, taskID string, scheduledAt time.Time) error

func (*RedisTaskStore) UpdateTaskStatus

func (s *RedisTaskStore) UpdateTaskStatus(ctx context.Context, taskID string, status TaskStatus) error

type ServiceOption

type ServiceOption func(*TaskService)

func WithLogger

func WithLogger(l Logger) ServiceOption

func WithStore

func WithStore(s TaskStore) ServiceOption

type TaskArgs

type TaskArgs interface {
	// Kind is a string that uniquely identifies the type of job. This must be
	// provided on your job arguments struct.
	Kind() string
}

type TaskClient

type TaskClient struct {
	// contains filtered or unexported fields
}

func NewTaskClient

func NewTaskClient(transport Transport, opts ...ClientOption) *TaskClient

func (*TaskClient) StartTask

func (c *TaskClient) StartTask(ctx context.Context, args TaskArgs, opts *InsertOpts) (string, error)

func (*TaskClient) Use added in v0.3.6

func (t *TaskClient) Use(middlewares ...Middleware)

type TaskError

type TaskError struct {
	Message   string                 `json:"message"`
	Details   map[string]interface{} `json:"details,omitempty"`
	Timestamp time.Time              `json:"timestamp"`
}

TaskError represents an error that occurred during task execution

type TaskEvent added in v0.5.0

type TaskEvent struct {
	PayloadData any
	EventType   string
}

func (TaskEvent) Kind added in v0.5.0

func (e TaskEvent) Kind() string

func (TaskEvent) Payload added in v0.5.0

func (e TaskEvent) Payload() any

type TaskEventGen added in v0.5.0

type TaskEventGen[T Event] struct {
	Event       T
	HandlerName string
}

func (TaskEventGen[T]) Kind added in v0.5.0

func (e TaskEventGen[T]) Kind() string

type TaskExecution

type TaskExecution struct {
	// Core fields
	ID       string      `json:"id"`
	TaskKind string      `json:"task_kind"`
	Status   TaskStatus  `json:"status"`
	Args     interface{} `json:"args"`

	// Attempt tracking
	AttemptID string `json:"attempt_id"`
	//Attempt     int    `json:"attempt"`
	Retried    int `json:"retried"`
	MaxRetries int `json:"max_retries"`

	// External references
	QstashMessageID string `json:"qstash_message_id,omitempty"`
	ScheduleID      string `json:"schedule_id,omitempty"`

	// Timing information
	CreatedAt   time.Time `json:"created_at"`
	AttemptedAt time.Time `json:"attempted_at,omitempty"`
	ScheduledAt time.Time `json:"scheduled_at"`
	FinalizedAt time.Time `json:"finalized_at,omitempty"`

	// Error tracking
	Errors []TaskError `json:"errors,omitempty"`
	Queue  string      `json:"queue"`
}

TaskExecution represents a single execution attempt of a task

type TaskFilter

type TaskFilter struct {
	Status   *TaskStatus
	Queue    string
	FromDate time.Time
	ToDate   time.Time
	Limit    int
}

TaskFilter provides options for filtering task lists

type TaskHandler

type TaskHandler[T TaskArgs] interface {

	// Timeout is the maximum amount of time the task is allowed to run before
	// its context is cancelled. A timeout of zero (the default) means the task
	// will inherit the Service-level timeout. A timeout of -1 means the task's
	// context will never time out.
	Timeout(task *Container[T]) time.Duration

	// ProcessTask performs the task and returns an error if the task failed. The context
	// will be configured with a timeout according to the task handler settings and may
	// be cancelled for other reasons.
	//
	// If no error is returned, the task is assumed to have succeeded and will be
	// marked completed.
	//
	// It is important for any task handler to respect context cancellation to enable
	// the service to respond to shutdown requests; there is no way to cancel a
	// running task that does not respect context cancellation, other than
	// terminating the process.
	ProcessTask(ctx context.Context, task *Container[T]) error
}

TaskHandler is an interface that can perform a task with args of type T. A typical implementation will be a JSON-serializable `TaskArgs` struct that implements `Kind()`, along with a TaskHandler that embeds TaskHandlerDefaults and implements `ProcessTask()`. TaskHandlers may optionally override other methods to provide task-specific configuration for all tasks of that type:

type SleepArgs struct {
	Duration time.Duration `json:"duration"`
}

func (SleepArgs) Kind() string { return "sleep" }

type SleepTaskHandler struct {
	TaskHandlerDefaults[SleepArgs]
}

func (w *SleepTaskHandler) ProcessTask(ctx context.Context, task *Task[SleepArgs]) error {
	select {
	case <-ctx.Done():
		return ctx.Err()
	case <-time.After(task.Args.Duration):
		return nil
	}
}

In addition to fulfilling the TaskHandler interface, task handlers must be registered with the service using the AddTaskHandler function.

func ProcessTaskFunc

func ProcessTaskFunc[T TaskArgs](f func(context.Context, *Container[T]) error) TaskHandler[T]

ProcessTaskFunc wraps a function to implement the TaskHandler interface. A task args struct implementing TaskArgs will still be required to specify a Kind.

For example:

taskservice.AddTaskHandler(service, taskservice.ProcessTaskFunc(func(ctx context.Context, task *taskservice.Task[ProcessTaskFuncArgs]) error {
	fmt.Printf("Message: %s", task.Args.Message)
	return nil
}))

type TaskHandlerDefaults

type TaskHandlerDefaults[T TaskArgs] struct{}

TaskHandlerDefaults is an empty struct that can be embedded in your task handler struct to make it fulfill the TaskHandler interface with default values.

func (TaskHandlerDefaults[T]) NextRetry

func (w TaskHandlerDefaults[T]) NextRetry(*Container[T]) time.Time

NextRetry returns an empty time.Time{} to avoid setting any task or TaskHandler-specific overrides on the next retry time. This means that the Service-level retry policy schedule will be used instead.

func (TaskHandlerDefaults[T]) Timeout

func (w TaskHandlerDefaults[T]) Timeout(*Container[T]) time.Duration

Timeout returns the task-specific timeout. Override this method to set a task-specific timeout, otherwise the Service-level timeout will be applied.

type TaskService

type TaskService struct {
	// contains filtered or unexported fields
}

func NewTaskService

func NewTaskService(transport Transport, opts ...ServiceOption) *TaskService

NewTaskService initializes a new registry of available task handlers.

Use the top-level AddTaskHandler function combined with a TaskService registry to register each available task handler.

func (*TaskService) HandleEvent

func (w *TaskService) HandleEvent(ctx context.Context, ce cloudevents.Event) error

HandleEvent processes a CloudEvent with all registered middleware

func (*TaskService) PublishEvent added in v0.5.0

func (c *TaskService) PublishEvent(ctx context.Context, event Event, opts *InsertOpts) (string, error)

func (*TaskService) StartTask added in v0.5.0

func (c *TaskService) StartTask(ctx context.Context, args TaskArgs, opts *InsertOpts) (string, error)

func (*TaskService) Use

func (t *TaskService) Use(middlewares ...Middleware)

func (*TaskService) UseSend added in v0.5.0

func (t *TaskService) UseSend(middlewares ...Middleware)

type TaskStatus

type TaskStatus string

TaskStatus represents the current state of a task execution

const (
	TaskStatusPending TaskStatus = "PENDING"
	TaskStatusRunning TaskStatus = "RUNNING"
	TaskStatusSuccess TaskStatus = "SUCCESS"
	TaskStatusFailed  TaskStatus = "FAILED"
)

type TaskStore

type TaskStore interface {
	// Core operations
	TaskExists(ctx context.Context, taskID string) (bool, error)
	CreateTaskExecution(ctx context.Context, task *TaskExecution) error
	GetTaskExecution(ctx context.Context, taskID string) (*TaskExecution, error)
	DeleteTaskExecution(ctx context.Context, taskID string) error
	UpdateTaskStatus(ctx context.Context, taskID string, status TaskStatus) error
	UpdateTaskSnoozedTask(ctx context.Context, taskID string, scheduledAt time.Time) error

	AddTaskError(ctx context.Context, taskID string, err TaskError) error

	// Query operations
	ListTaskExecutions(ctx context.Context, filter TaskFilter) ([]*TaskExecution, error)
	GetMostRecentTaskExecutions(ctx context.Context, limit int) ([]*TaskExecution, error)

	// Optional: Cleanup operation if needed
	CleanupOldTaskExecutions(ctx context.Context, olderThan time.Duration) error
}

TaskStore defines the interface for task storage operations

type Transport

type Transport interface {
	Send(ctx context.Context, event cloudevents.Event, opts *InsertOpts) error
}

type UpstashClientOpts added in v0.5.0

type UpstashClientOpts func(c *UpstashTransport)

func WithDlq added in v0.5.0

func WithDlq(dlq string) UpstashClientOpts

func WithUpstashLogger added in v0.5.0

func WithUpstashLogger(logger Logger) UpstashClientOpts

type UpstashTaskError added in v0.1.0

type UpstashTaskError struct {
	Code      ErrorCode
	Operation string
	Message   string
	Cause     error
	Event     *v2.Event
	Metadata  map[string]interface{}
}

UpstashTaskError provides detailed information about task operation errors

func NewUpstashTaskError added in v0.1.0

func NewUpstashTaskError(code ErrorCode, operation string, message string, cause error) *UpstashTaskError

NewUpstashTaskError creates a new UpstashTaskError

func (*UpstashTaskError) Error added in v0.1.0

func (e *UpstashTaskError) Error() string

Error implements the error interface

func (*UpstashTaskError) Unwrap added in v0.1.0

func (e *UpstashTaskError) Unwrap() error

Unwrap provides access to the underlying error

func (*UpstashTaskError) WithEvent added in v0.1.0

func (e *UpstashTaskError) WithEvent(event v2.Event) *UpstashTaskError

WithEvent adds event data to the error

func (*UpstashTaskError) WithMetadata added in v0.1.0

func (e *UpstashTaskError) WithMetadata(key string, value interface{}) *UpstashTaskError

WithMetadata adds additional context to the error

type UpstashTransport

type UpstashTransport struct {
	// contains filtered or unexported fields
}

UpstashTransport handles communication with Upstash QStash

func NewUpstashTransport

func NewUpstashTransport(qstashToken, targetUrl string, opts ...UpstashClientOpts) (*UpstashTransport, error)

NewUpstashTransport creates a new UpstashTransport instance

func (*UpstashTransport) Send

func (c *UpstashTransport) Send(ctx context.Context, ce v2.Event, opts *InsertOpts) error

Send dispatches a CloudEvent to Upstash

Directories

Path Synopsis
cmd
demo/server command
tq command
uptask command
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL