Documentation
¶
Index ¶
- func AddEventHandler[E Event](service *TaskService, handlerName string, handler EventHandler[E])
- func AddEventHandlerSafely[E Event](service *TaskService, handlerName string, handler EventHandler[E]) error
- func AddTaskHandler[T TaskArgs](service *TaskService, handler TaskHandler[T])
- func AddTaskHandlerSafely[T TaskArgs](service *TaskService, handler TaskHandler[T]) error
- func JobSnooze(duration time.Duration) error
- type AnyTask
- type ClientOption
- type Container
- type ErrorCode
- type Event
- type EventFanoutArgs
- type EventFanoutWorker
- type EventHandler
- type Handler
- type HandlerFunc
- type InsertOpts
- type Logger
- type Middleware
- type MiddlewareFunc
- type RedisConfig
- type RedisTaskStore
- func (s *RedisTaskStore) AddTaskError(ctx context.Context, taskID string, taskError TaskError) error
- func (s *RedisTaskStore) CleanupOldTaskExecutions(ctx context.Context, olderThan time.Duration) error
- func (s *RedisTaskStore) CreateTaskExecution(ctx context.Context, task *TaskExecution) error
- func (s *RedisTaskStore) DeleteTaskExecution(ctx context.Context, taskID string) error
- func (s *RedisTaskStore) GetMostRecentTaskExecutions(ctx context.Context, limit int) ([]*TaskExecution, error)
- func (s *RedisTaskStore) GetTaskExecution(ctx context.Context, taskID string) (*TaskExecution, error)
- func (s *RedisTaskStore) ListTaskExecutions(ctx context.Context, filter TaskFilter) ([]*TaskExecution, error)
- func (s *RedisTaskStore) TaskExists(ctx context.Context, taskID string) (bool, error)
- func (s *RedisTaskStore) UpdateTaskSnoozedTask(ctx context.Context, taskID string, scheduledAt time.Time) error
- func (s *RedisTaskStore) UpdateTaskStatus(ctx context.Context, taskID string, status TaskStatus) error
- type ServiceOption
- type TaskArgs
- type TaskClient
- type TaskError
- type TaskEvent
- type TaskEventGen
- type TaskExecution
- type TaskFilter
- type TaskHandler
- type TaskHandlerDefaults
- type TaskService
- func (w *TaskService) HandleEvent(ctx context.Context, ce cloudevents.Event) error
- func (c *TaskService) PublishEvent(ctx context.Context, event Event, opts *InsertOpts) (string, error)
- func (c *TaskService) StartTask(ctx context.Context, args TaskArgs, opts *InsertOpts) (string, error)
- func (t *TaskService) Use(middlewares ...Middleware)
- func (t *TaskService) UseSend(middlewares ...Middleware)
- type TaskStatus
- type TaskStore
- type Transport
- type UpstashClientOpts
- type UpstashTaskError
- type UpstashTransport
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AddEventHandler ¶ added in v0.5.0
func AddEventHandler[E Event](service *TaskService, handlerName string, handler EventHandler[E])
func AddEventHandlerSafely ¶ added in v0.5.0
func AddEventHandlerSafely[E Event](service *TaskService, handlerName string, handler EventHandler[E]) error
func AddTaskHandler ¶
func AddTaskHandler[T TaskArgs](service *TaskService, handler TaskHandler[T])
AddTaskHandler registers a TaskHandler on the provided TaskService bundle. Each TaskHandler must be registered so that the Service knows it should handle a specific kind of task (as returned by its `Kind()` method).
Use by explicitly specifying a TaskArgs type and then passing an instance of a task handler for the same type:
taskservice.AddTaskHandler(taskservice, &SortTaskHandler{})
Note that AddTaskHandler can panic in some situations, such as if the task handler is already registered or if its configuration is otherwise invalid. This default probably makes sense for most applications because you wouldn't want to start an application with invalid hardcoded runtime configuration. If you want to avoid panics, use AddTaskHandlerSafely instead.
func AddTaskHandlerSafely ¶
func AddTaskHandlerSafely[T TaskArgs](service *TaskService, handler TaskHandler[T]) error
AddTaskHandlerSafely registers a task handler on the provided TaskService bundle. Unlike AddTaskHandler, AddTaskHandlerSafely does not panic and instead returns an error if the task handler is already registered or if its configuration is invalid.
Use by explicitly specifying a TaskArgs type and then passing an instance of a task handler for the same type:
taskservice.AddTaskHandlerSafely[SortArgs](service, &SortTaskHandler{}).
Types ¶
type ClientOption ¶
type ClientOption func(*TaskClient)
func WithClientLogger ¶
func WithClientLogger(l Logger) ClientOption
func WithClientStore ¶
func WithClientStore(s TaskStore) ClientOption
type ErrorCode ¶ added in v0.1.0
type ErrorCode string
ErrorCode represents specific error types
const ( ErrInvalidRequest ErrorCode = "INVALID_REQUEST" ErrTransportCreation ErrorCode = "TRANSPORT_CREATION_FAILED" ErrClientCreation ErrorCode = "CLIENT_CREATION_FAILED" ErrDeliveryFailed ErrorCode = "DELIVERY_FAILED" ErrInvalidSchedule ErrorCode = "INVALID_SCHEDULE" ErrBadResponse ErrorCode = "BAD_RESPONSE" )
type EventFanoutArgs ¶ added in v0.5.0
func (EventFanoutArgs) Kind ¶ added in v0.5.0
func (e EventFanoutArgs) Kind() string
type EventFanoutWorker ¶ added in v0.5.0
type EventFanoutWorker struct {
TaskHandlerDefaults[EventFanoutArgs]
// contains filtered or unexported fields
}
func (*EventFanoutWorker) ProcessTask ¶ added in v0.5.0
func (w *EventFanoutWorker) ProcessTask(ctx context.Context, task *Container[EventFanoutArgs]) error
type EventHandler ¶ added in v0.5.0
type EventHandler[T Event] interface { // Timeout is the maximum amount of time the task is allowed to run before // its context is cancelled. A timeout of zero (the default) means the task // will inherit the Service-level timeout. A timeout of -1 means the task's // context will never time out. Timeout(task *Container[T]) time.Duration // ProcessTask performs the task and returns an error if the task failed. The context // will be configured with a timeout according to the task handler settings and may // be cancelled for other reasons. // // If no error is returned, the task is assumed to have succeeded and will be // marked completed. // // It is important for any task handler to respect context cancellation to enable // the service to respond to shutdown requests; there is no way to cancel a // running task that does not respect context cancellation, other than // terminating the process. ProcessEvent(ctx context.Context, task *Container[T]) error }
type HandlerFunc ¶
type HandlerFunc func(context.Context, cloudevents.Event) error
func (HandlerFunc) HandleEvent ¶
func (f HandlerFunc) HandleEvent(ctx context.Context, event cloudevents.Event) error
type InsertOpts ¶ added in v0.5.0
func (*InsertOpts) FromCloudEvent ¶ added in v0.5.0
func (o *InsertOpts) FromCloudEvent(ce cloudevents.Event) error
type Middleware ¶
type Middleware func(HandlerFunc) HandlerFunc
Middleware defines a function that wraps a HandlerFunc
type MiddlewareFunc ¶
type RedisConfig ¶
RedisConfig holds configuration for Redis-based task store
type RedisTaskStore ¶
type RedisTaskStore struct {
// contains filtered or unexported fields
}
func NewRedisTaskStore ¶
func NewRedisTaskStore(cfg RedisConfig) (*RedisTaskStore, error)
func (*RedisTaskStore) AddTaskError ¶
func (*RedisTaskStore) CleanupOldTaskExecutions ¶
func (*RedisTaskStore) CreateTaskExecution ¶
func (s *RedisTaskStore) CreateTaskExecution(ctx context.Context, task *TaskExecution) error
func (*RedisTaskStore) DeleteTaskExecution ¶
func (s *RedisTaskStore) DeleteTaskExecution(ctx context.Context, taskID string) error
func (*RedisTaskStore) GetMostRecentTaskExecutions ¶
func (s *RedisTaskStore) GetMostRecentTaskExecutions(ctx context.Context, limit int) ([]*TaskExecution, error)
func (*RedisTaskStore) GetTaskExecution ¶
func (s *RedisTaskStore) GetTaskExecution(ctx context.Context, taskID string) (*TaskExecution, error)
func (*RedisTaskStore) ListTaskExecutions ¶
func (s *RedisTaskStore) ListTaskExecutions(ctx context.Context, filter TaskFilter) ([]*TaskExecution, error)
func (*RedisTaskStore) TaskExists ¶ added in v0.0.9
func (*RedisTaskStore) UpdateTaskSnoozedTask ¶ added in v0.0.5
func (*RedisTaskStore) UpdateTaskStatus ¶
func (s *RedisTaskStore) UpdateTaskStatus(ctx context.Context, taskID string, status TaskStatus) error
type ServiceOption ¶
type ServiceOption func(*TaskService)
func WithLogger ¶
func WithLogger(l Logger) ServiceOption
func WithStore ¶
func WithStore(s TaskStore) ServiceOption
type TaskArgs ¶
type TaskArgs interface {
// Kind is a string that uniquely identifies the type of job. This must be
// provided on your job arguments struct.
Kind() string
}
type TaskClient ¶
type TaskClient struct {
// contains filtered or unexported fields
}
func NewTaskClient ¶
func NewTaskClient(transport Transport, opts ...ClientOption) *TaskClient
func (*TaskClient) StartTask ¶
func (c *TaskClient) StartTask(ctx context.Context, args TaskArgs, opts *InsertOpts) (string, error)
func (*TaskClient) Use ¶ added in v0.3.6
func (t *TaskClient) Use(middlewares ...Middleware)
type TaskError ¶
type TaskError struct {
Message string `json:"message"`
Details map[string]interface{} `json:"details,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
TaskError represents an error that occurred during task execution
type TaskEventGen ¶ added in v0.5.0
func (TaskEventGen[T]) Kind ¶ added in v0.5.0
func (e TaskEventGen[T]) Kind() string
type TaskExecution ¶
type TaskExecution struct {
// Core fields
ID string `json:"id"`
TaskKind string `json:"task_kind"`
Status TaskStatus `json:"status"`
Args interface{} `json:"args"`
// Attempt tracking
AttemptID string `json:"attempt_id"`
//Attempt int `json:"attempt"`
Retried int `json:"retried"`
MaxRetries int `json:"max_retries"`
// External references
QstashMessageID string `json:"qstash_message_id,omitempty"`
ScheduleID string `json:"schedule_id,omitempty"`
// Timing information
CreatedAt time.Time `json:"created_at"`
AttemptedAt time.Time `json:"attempted_at,omitempty"`
ScheduledAt time.Time `json:"scheduled_at"`
FinalizedAt time.Time `json:"finalized_at,omitempty"`
// Error tracking
Errors []TaskError `json:"errors,omitempty"`
Queue string `json:"queue"`
}
TaskExecution represents a single execution attempt of a task
type TaskFilter ¶
type TaskFilter struct {
Status *TaskStatus
Queue string
FromDate time.Time
ToDate time.Time
Limit int
}
TaskFilter provides options for filtering task lists
type TaskHandler ¶
type TaskHandler[T TaskArgs] interface { // Timeout is the maximum amount of time the task is allowed to run before // its context is cancelled. A timeout of zero (the default) means the task // will inherit the Service-level timeout. A timeout of -1 means the task's // context will never time out. Timeout(task *Container[T]) time.Duration // ProcessTask performs the task and returns an error if the task failed. The context // will be configured with a timeout according to the task handler settings and may // be cancelled for other reasons. // // If no error is returned, the task is assumed to have succeeded and will be // marked completed. // // It is important for any task handler to respect context cancellation to enable // the service to respond to shutdown requests; there is no way to cancel a // running task that does not respect context cancellation, other than // terminating the process. ProcessTask(ctx context.Context, task *Container[T]) error }
TaskHandler is an interface that can perform a task with args of type T. A typical implementation will be a JSON-serializable `TaskArgs` struct that implements `Kind()`, along with a TaskHandler that embeds TaskHandlerDefaults and implements `ProcessTask()`. TaskHandlers may optionally override other methods to provide task-specific configuration for all tasks of that type:
type SleepArgs struct {
Duration time.Duration `json:"duration"`
}
func (SleepArgs) Kind() string { return "sleep" }
type SleepTaskHandler struct {
TaskHandlerDefaults[SleepArgs]
}
func (w *SleepTaskHandler) ProcessTask(ctx context.Context, task *Task[SleepArgs]) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(task.Args.Duration):
return nil
}
}
In addition to fulfilling the TaskHandler interface, task handlers must be registered with the service using the AddTaskHandler function.
func ProcessTaskFunc ¶
ProcessTaskFunc wraps a function to implement the TaskHandler interface. A task args struct implementing TaskArgs will still be required to specify a Kind.
For example:
taskservice.AddTaskHandler(service, taskservice.ProcessTaskFunc(func(ctx context.Context, task *taskservice.Task[ProcessTaskFuncArgs]) error {
fmt.Printf("Message: %s", task.Args.Message)
return nil
}))
type TaskHandlerDefaults ¶
type TaskHandlerDefaults[T TaskArgs] struct{}
TaskHandlerDefaults is an empty struct that can be embedded in your task handler struct to make it fulfill the TaskHandler interface with default values.
func (TaskHandlerDefaults[T]) NextRetry ¶
func (w TaskHandlerDefaults[T]) NextRetry(*Container[T]) time.Time
NextRetry returns an empty time.Time{} to avoid setting any task or TaskHandler-specific overrides on the next retry time. This means that the Service-level retry policy schedule will be used instead.
type TaskService ¶
type TaskService struct {
// contains filtered or unexported fields
}
func NewTaskService ¶
func NewTaskService(transport Transport, opts ...ServiceOption) *TaskService
NewTaskService initializes a new registry of available task handlers.
Use the top-level AddTaskHandler function combined with a TaskService registry to register each available task handler.
func (*TaskService) HandleEvent ¶
func (w *TaskService) HandleEvent(ctx context.Context, ce cloudevents.Event) error
HandleEvent processes a CloudEvent with all registered middleware
func (*TaskService) PublishEvent ¶ added in v0.5.0
func (c *TaskService) PublishEvent(ctx context.Context, event Event, opts *InsertOpts) (string, error)
func (*TaskService) StartTask ¶ added in v0.5.0
func (c *TaskService) StartTask(ctx context.Context, args TaskArgs, opts *InsertOpts) (string, error)
func (*TaskService) Use ¶
func (t *TaskService) Use(middlewares ...Middleware)
func (*TaskService) UseSend ¶ added in v0.5.0
func (t *TaskService) UseSend(middlewares ...Middleware)
type TaskStatus ¶
type TaskStatus string
TaskStatus represents the current state of a task execution
const ( TaskStatusPending TaskStatus = "PENDING" TaskStatusRunning TaskStatus = "RUNNING" TaskStatusSuccess TaskStatus = "SUCCESS" TaskStatusFailed TaskStatus = "FAILED" )
type TaskStore ¶
type TaskStore interface {
// Core operations
TaskExists(ctx context.Context, taskID string) (bool, error)
CreateTaskExecution(ctx context.Context, task *TaskExecution) error
GetTaskExecution(ctx context.Context, taskID string) (*TaskExecution, error)
DeleteTaskExecution(ctx context.Context, taskID string) error
UpdateTaskStatus(ctx context.Context, taskID string, status TaskStatus) error
UpdateTaskSnoozedTask(ctx context.Context, taskID string, scheduledAt time.Time) error
AddTaskError(ctx context.Context, taskID string, err TaskError) error
// Query operations
ListTaskExecutions(ctx context.Context, filter TaskFilter) ([]*TaskExecution, error)
GetMostRecentTaskExecutions(ctx context.Context, limit int) ([]*TaskExecution, error)
// Optional: Cleanup operation if needed
CleanupOldTaskExecutions(ctx context.Context, olderThan time.Duration) error
}
TaskStore defines the interface for task storage operations
type Transport ¶
type Transport interface {
Send(ctx context.Context, event cloudevents.Event, opts *InsertOpts) error
}
type UpstashClientOpts ¶ added in v0.5.0
type UpstashClientOpts func(c *UpstashTransport)
func WithDlq ¶ added in v0.5.0
func WithDlq(dlq string) UpstashClientOpts
func WithUpstashLogger ¶ added in v0.5.0
func WithUpstashLogger(logger Logger) UpstashClientOpts
type UpstashTaskError ¶ added in v0.1.0
type UpstashTaskError struct {
Code ErrorCode
Operation string
Message string
Cause error
Event *v2.Event
Metadata map[string]interface{}
}
UpstashTaskError provides detailed information about task operation errors
func NewUpstashTaskError ¶ added in v0.1.0
func NewUpstashTaskError(code ErrorCode, operation string, message string, cause error) *UpstashTaskError
NewUpstashTaskError creates a new UpstashTaskError
func (*UpstashTaskError) Error ¶ added in v0.1.0
func (e *UpstashTaskError) Error() string
Error implements the error interface
func (*UpstashTaskError) Unwrap ¶ added in v0.1.0
func (e *UpstashTaskError) Unwrap() error
Unwrap provides access to the underlying error
func (*UpstashTaskError) WithEvent ¶ added in v0.1.0
func (e *UpstashTaskError) WithEvent(event v2.Event) *UpstashTaskError
WithEvent adds event data to the error
func (*UpstashTaskError) WithMetadata ¶ added in v0.1.0
func (e *UpstashTaskError) WithMetadata(key string, value interface{}) *UpstashTaskError
WithMetadata adds additional context to the error
type UpstashTransport ¶
type UpstashTransport struct {
// contains filtered or unexported fields
}
UpstashTransport handles communication with Upstash QStash
func NewUpstashTransport ¶
func NewUpstashTransport(qstashToken, targetUrl string, opts ...UpstashClientOpts) (*UpstashTransport, error)
NewUpstashTransport creates a new UpstashTransport instance
func (*UpstashTransport) Send ¶
func (c *UpstashTransport) Send(ctx context.Context, ce v2.Event, opts *InsertOpts) error
Send dispatches a CloudEvent to Upstash