Documentation
¶
Overview ¶
Example (Worker) ¶
package main
import "github.com/infinitytracking/beanstalkworker"
import "context"
import "os"
import "os/signal"
import "syscall"
import "log"
import "fmt"
import "time"
func main() {
//Setup context for cancelling beanstalk Worker.
ctx, cancel := context.WithCancel(context.Background())
//Start up signal handler that will cleanly shutdown beanstalk Worker.
go signalHandler(cancel)
//Define a new Worker process - how to connect to the beanstalkd server.
bsWorker := beanstalkworker.NewWorker("127.0.0.1:11300")
//Optional custom logger - see below.
bsWorker.SetLogger(&MyLogger{})
//Set concurrent Worker threads to 2.
bsWorker.SetNumWorkers(2)
//Job is deleted from the queue if unmarshal error appears. We can
//decide to bury or release (default behaviour) it as well.
bsWorker.SetUnmarshalErrorAction(beanstalkworker.ActionDeleteJob)
//Define a common value (example a shared database connection)
commonVar := "some common value"
//Add one or more subcriptions to specific tubes with a handler function.
bsWorker.Subscribe("job1", func(jobMgr beanstalkworker.JobManager, jobData Job1Data) {
//Create a fresh handler struct per job (this ensures fresh state for each job).
handler := &Job1Handler{
JobManager: jobMgr, //Embed the JobManager into the handler.
commonVar: commonVar, //Pass the commonVar into the handler.
}
handler.Run(jobData)
})
//Run the beanstalk Worker, this blocks until the context is cancelled.
//It will also handle reconnecting to beanstalkd server automatically.
bsWorker.Run(ctx)
}
// signalHandler catches OS signals for program to end.
func signalHandler(cancel context.CancelFunc) {
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGINT)
for {
<-sigc
log.Print("Got signal, cancelling context")
cancel()
}
}
//Custom Logging Example
// MyLogger provides custom logging.
type MyLogger struct {
}
// Info logs a custom info message regarding the job.
func (l *MyLogger) Info(v ...interface{}) {
log.Print("MyInfo: ", fmt.Sprint(v...))
}
// Infof logs a custom info message regarding the job.
func (l *MyLogger) Infof(format string, v ...interface{}) {
format = "MyInfof: " + format
log.Print(fmt.Sprintf(format, v...))
}
// Error logs a custom error message regarding the job.
func (l *MyLogger) Error(v ...interface{}) {
log.Print("MyError: ", fmt.Sprint(v...))
}
// Errorf logs a custom error message regarding the job.
func (l *MyLogger) Errorf(format string, v ...interface{}) {
format = "MyErrorf: " + format
log.Print(fmt.Sprintf(format, v...))
}
//Job Handler
// Job1Handler contains the business logic to handle the Job1 type jobs.
type Job1Handler struct {
beanstalkworker.JobManager
commonVar string
}
// Job1Data is a struct that represents the Job1 data that arrives from the queue.
type Job1Data struct {
SomeField string `json:"someField"`
SomeOtherField int `json:"someOtherField"`
}
// LogError example of overriding a function provided in beanstalkworker.JobManager
// and calling the underlying function in order to add context.
func (handler *Job1Handler) LogError(a ...interface{}) {
handler.JobManager.LogError("Job1 error: ", fmt.Sprint(a...))
}
// Run is executed by the beanstalk Worker when a Job1 type job is received.
func (handler *Job1Handler) Run(jobData Job1Data) {
handler.LogInfo("Starting job with commonVar value: ", handler.commonVar)
handler.LogInfo("Job Data received: ", jobData)
handler.LogInfo("Job Priority: ", handler.GetPriority())
handler.LogInfo("Job Releases: ", handler.GetReleases())
handler.LogInfo("Job Reserves: ", handler.GetReserves())
handler.LogInfo("Job Age: ", handler.GetAge())
handler.LogInfo("Job Delay: ", handler.GetDelay())
handler.LogInfo("Job Timeouts: ", handler.GetTimeouts())
handler.LogInfo("Job Tube: ", handler.GetTube())
// Retrieve the server's hostname where the job is running
conn := handler.GetConn()
stats, err := conn.Stats()
if err != nil {
handler.Release()
return
}
handler.LogInfo("Hostname: ", stats["hostname"])
//Simulate job processing time
time.Sleep(2 * time.Second)
if handler.GetTimeouts() == 0 {
handler.LogInfo("Simulating a timeout by not releasing/deleting job")
return
}
if handler.GetReserves() == 2 {
handler.LogInfo("Release without setting custom delay or priority")
handler.Release()
return
}
handler.SetReturnDelay(5 * time.Second) //Optional return delay (defaults to current delay)
handler.SetReturnPriority(5) //Optional return priority (defaults to current priority)
if handler.GetReleases() >= 3 {
handler.Delete()
handler.LogError("Deleting job as too many releases")
return
}
handler.LogInfo("Releasing job to be retried...")
handler.Release() //Pretend job process failed and needs retrying
}
Index ¶
- Constants
- type CustomLogger
- type Handler
- type JobManager
- type Logger
- type MockJob
- func (job *MockJob) Delete()
- func (job *MockJob) GetAge() time.Duration
- func (job *MockJob) GetConn() *beanstalk.Conn
- func (job *MockJob) GetDelay() time.Duration
- func (job *MockJob) GetPriority() uint32
- func (job *MockJob) GetReleases() uint32
- func (job *MockJob) GetReserves() uint32
- func (job *MockJob) GetTimeouts() uint32
- func (job *MockJob) GetTube() string
- func (job *MockJob) LogError(a ...interface{})
- func (job *MockJob) LogInfo(a ...interface{})
- func (job *MockJob) Release()
- func (job *MockJob) SetReturnDelay(delay time.Duration)
- func (job *MockJob) SetReturnPriority(prio uint32)
- func (job *MockJob) Touch()
- type MockLogger
- type MockWorker
- type RawJob
- func (job *RawJob) Bury()
- func (job *RawJob) Delete()
- func (job *RawJob) GetAge() time.Duration
- func (job *RawJob) GetConn() *beanstalk.Conn
- func (job *RawJob) GetDelay() time.Duration
- func (job *RawJob) GetPriority() uint32
- func (job *RawJob) GetReleases() uint32
- func (job *RawJob) GetReserves() uint32
- func (job *RawJob) GetTimeouts() uint32
- func (job *RawJob) GetTube() string
- func (job *RawJob) LogError(a ...interface{})
- func (job *RawJob) LogInfo(a ...interface{})
- func (job *RawJob) Release()
- func (job *RawJob) SetReturnDelay(delay time.Duration)
- func (job *RawJob) SetReturnPriority(prio uint32)
- func (job *RawJob) Touch()
- type Worker
- type WorkerClient
Examples ¶
Constants ¶
const ( ActionDeleteJob = "delete" ActionBuryJob = "bury" ActionReleaseJob = "release" )
Actions the user can choose in case of an unmarshal error.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CustomLogger ¶
type CustomLogger interface {
Info(v ...interface{})
Infof(format string, args ...interface{})
Error(v ...interface{})
Errorf(format string, args ...interface{})
}
CustomLogger provides support for the creation of custom logging.
type JobManager ¶
type JobManager interface {
Delete()
Touch()
Release()
LogError(a ...interface{})
LogInfo(a ...interface{})
GetAge() time.Duration
GetPriority() uint32
GetReleases() uint32
GetReserves() uint32
GetTimeouts() uint32
GetDelay() time.Duration
GetTube() string
GetConn() *beanstalk.Conn
SetReturnPriority(prio uint32)
SetReturnDelay(delay time.Duration)
}
JobManager interface represents a way to handle a job's lifecycle.
type Logger ¶
type Logger struct {
Info func(v ...interface{})
Infof func(format string, v ...interface{})
Error func(v ...interface{})
Errorf func(format string, v ...interface{})
}
Logger provides support for standard logging.
func NewDefaultLogger ¶
func NewDefaultLogger() *Logger
NewDefaultLogger creates a new Logger initialised to use the global log package.
type MockJob ¶
type MockJob struct {
// contains filtered or unexported fields
}
func NewMockJob ¶
func NewWillDeleteMockJob ¶
func NewWillReleaseMockJob ¶
func NewWillTouchMockJob ¶
func (*MockJob) GetPriority ¶
func (*MockJob) GetReleases ¶
func (*MockJob) GetReserves ¶
func (*MockJob) GetTimeouts ¶
func (*MockJob) SetReturnDelay ¶
func (*MockJob) SetReturnPriority ¶
type MockLogger ¶
type MockLogger struct {
}
MockLogger A custom logger that must implement Info() Infof(), Error() and Errorf() to implement CustomLogger
func (*MockLogger) Error ¶
func (l *MockLogger) Error(v ...interface{})
func (*MockLogger) Errorf ¶
func (l *MockLogger) Errorf(format string, v ...interface{})
func (*MockLogger) Info ¶
func (l *MockLogger) Info(v ...interface{})
func (*MockLogger) Infof ¶
func (l *MockLogger) Infof(format string, v ...interface{})
type MockWorker ¶
type MockWorker struct {
// contains filtered or unexported fields
}
func (*MockWorker) Run ¶
func (w *MockWorker) Run(ctx context.Context)
func (*MockWorker) SetLogger ¶
func (w *MockWorker) SetLogger(cl CustomLogger)
func (*MockWorker) SetNumWorkers ¶
func (w *MockWorker) SetNumWorkers(numWorkers int)
func (*MockWorker) SetUnmarshalErrorAction ¶
func (w *MockWorker) SetUnmarshalErrorAction(action string)
func (*MockWorker) Subscribe ¶
func (w *MockWorker) Subscribe(tube string, cb Handler)
type RawJob ¶
type RawJob struct {
// contains filtered or unexported fields
}
RawJob represents the raw job data that is returned by beanstalkd.
func NewEmptyJob ¶
func NewEmptyJob(cl CustomLogger) *RawJob
NewEmptyJob initialises a new empty RawJob with a custom logger. Useful for testing methods that log messages on the job.
func (*RawJob) GetPriority ¶
GetPriority gets the priority of the job.
func (*RawJob) GetReleases ¶
GetReleases gets the count of release of the job.
func (*RawJob) GetReserves ¶
GetReserves gets the count of reserves of the job.
func (*RawJob) GetTimeouts ¶
GetTimeouts gets the count of timeouts of the job.
func (*RawJob) LogError ¶
func (job *RawJob) LogError(a ...interface{})
LogError function logs an error message regarding the job.
func (*RawJob) LogInfo ¶
func (job *RawJob) LogInfo(a ...interface{})
LogInfo function logs an info message regarding the job.
func (*RawJob) Release ¶
func (job *RawJob) Release()
Release function releases the job from the queue.
func (*RawJob) SetReturnDelay ¶
SetReturnDelay sets the return delay to use if a job is released back to queue.
func (*RawJob) SetReturnPriority ¶
SetReturnPriority sets the return priority to use if a job is released or buried.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker represents a single process that is connecting to beanstalkd and is consuming jobs from one or more tubes.
func NewWorker ¶
NewWorker creates a new Worker process, but does not actually connect to beanstalkd server yet.
func (*Worker) Run ¶
Run starts one or more Worker threads based on the numWorkers value. If numWorkers is set to zero or less then 1 Worker is started.
func (*Worker) SetLogger ¶
func (w *Worker) SetLogger(cl CustomLogger)
SetLogger switches logging to use a custom Logger.
func (*Worker) SetNumWorkers ¶
SetNumWorkers sets the number of concurrent workers threads that should be started. Each thread establishes a separate connection to the beanstalkd server.
func (*Worker) SetUnmarshalErrorAction ¶
SetUnmarshalErrorAction defines what to do if there is an unmarshal error.
type WorkerClient ¶
type WorkerClient interface {
Subscribe(tube string, cb Handler)
SetNumWorkers(numWorkers int)
SetLogger(cl CustomLogger)
SetUnmarshalErrorAction(action string)
Run(ctx context.Context)
}
func NewMockWillDeleteWorker ¶
func NewMockWillDeleteWorker(tube, jobStr string) WorkerClient
func NewMockWillReleaseWorker ¶
func NewMockWillReleaseWorker(tube, jobStr string) WorkerClient
func NewMockWillTouchWorker ¶
func NewMockWillTouchWorker(tube, jobStr string) WorkerClient
func NewMockWorker ¶
func NewMockWorker() WorkerClient