taskengine

package
v0.2.0-beta.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 15, 2025 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (
	WorkerPolicyParallel workerPolicy = iota
	WorkerPolicySerial
	WorkerPolicySkipIfBusy
)
View Source
const ContextKey contextKey = "go-taskengine/contextKey"

Variables

View Source
var (
	ErrorPolicyMismatch        = errors.New("policy mismatch")
	ErrorJobNameMismatch       = errors.New("job name mismatch")
	ErrorTriggerMismatch       = errors.New("trigger mismatch")
	ErrorTaskAlreadyRegistered = errors.New("task is already registered")
)

Functions

func WithTimeout

func WithTimeout(timeout time.Duration) taskOption

Types

type Context

type Context struct {
	// contains filtered or unexported fields
}

func (*Context) CurrentTick

func (c *Context) CurrentTick() time.Time

func (*Context) Deadline

func (c *Context) Deadline() (deadline time.Time, ok bool)

func (*Context) Done

func (c *Context) Done() <-chan struct{}

func (*Context) Err

func (c *Context) Err() error

func (*Context) LastTick

func (c *Context) LastTick() time.Time

func (*Context) Logger

func (c *Context) Logger() Logger

func (*Context) TaskName

func (c *Context) TaskName() string

func (*Context) Value

func (c *Context) Value(key any) any

type Dispatcher

type Dispatcher interface {
	Size() int
	Close()
	Enqueue(tick *Tick) error
	Dequeue() <-chan *Tick
	Capacity() int
}

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

func New

func New(store store.Store, options ...EngineOption) (*Engine, error)

func (*Engine) RegisterTask

func (e *Engine) RegisterTask(
	task *Task,
	policy workerPolicy,
	trigger Trigger,
	catchUpEnabled bool,
	maxExecutionLag int,
) error

func (*Engine) RemoveTask

func (e *Engine) RemoveTask(name string) error

func (*Engine) Run

func (e *Engine) Run() error

func (*Engine) Shutdown

func (e *Engine) Shutdown() error

func (*Engine) ShutdownTask

func (e *Engine) ShutdownTask(name string) error

func (*Engine) Start

func (e *Engine) Start()

func (*Engine) StartTask

func (e *Engine) StartTask(name string) error

type EngineOption

type EngineOption func(*Engine)

func WithLoggerFactory

func WithLoggerFactory(factory LoggerFactory) EngineOption

func WithShutdownTimeout

func WithShutdownTimeout(timeout time.Duration) EngineOption

type Job

type Job = func(ctx *Context) error

type Logger

type Logger interface {
	Info(msg string)
	Infof(format string, args ...any)

	Warn(msg string)
	Warnf(format string, args ...any)

	Error(msg string)
	Errorf(format string, args ...any)
}

func DefaultLoggerFactory

func DefaultLoggerFactory(module string) Logger

type LoggerFactory

type LoggerFactory func(module string) Logger

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

func (*Scheduler) Pause

func (s *Scheduler) Pause()

func (*Scheduler) Resume

func (s *Scheduler) Resume()

func (*Scheduler) Run

func (s *Scheduler) Run(ctx context.Context) error

func (*Scheduler) Status

func (s *Scheduler) Status() schedulerState

type Task

type Task struct {
	// contains filtered or unexported fields
}

func NewTask

func NewTask(name string, job Job, options ...taskOption) (*Task, error)

func (*Task) Execute

func (t *Task) Execute(parentCtx context.Context, tick *Tick)

func (*Task) Name

func (t *Task) Name() string

type Tick

type Tick struct {
	// contains filtered or unexported fields
}

type Trigger

type Trigger interface {
	Next(lastRun time.Time) (time.Time, error)
	String() string
}

func NewCronTrigger

func NewCronTrigger(expr string, runOnStart bool) (Trigger, error)

func NewIntervalTrigger

func NewIntervalTrigger(interval time.Duration, runOnStart bool) (Trigger, error)

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func (*Worker) Run

func (w *Worker) Run(ctx context.Context)

func (*Worker) Status

func (w *Worker) Status() workerState

type WorkerSupervisor

type WorkerSupervisor struct {
	// contains filtered or unexported fields
}

func (*WorkerSupervisor) PauseScheduler

func (ws *WorkerSupervisor) PauseScheduler()

func (*WorkerSupervisor) ResumeScheduler

func (ws *WorkerSupervisor) ResumeScheduler()

func (*WorkerSupervisor) SchedulerStatus

func (ws *WorkerSupervisor) SchedulerStatus() schedulerState

func (*WorkerSupervisor) Shutdown

func (ws *WorkerSupervisor) Shutdown()

func (*WorkerSupervisor) Start

func (ws *WorkerSupervisor) Start(ctx context.Context)

func (*WorkerSupervisor) WorkerStatus

func (ws *WorkerSupervisor) WorkerStatus() workerState

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL