Documentation
¶
Index ¶
- Variables
- type ClientBeat
- type ClientData
- type Connection
- type RuntimeStats
- type Server
- func (s *Server) AddTask(everySec int64, task Taskable)
- func (s *Server) Boot() error
- func (s *Server) CurrentState() (*client.FaktoryState, error)
- func (s *Server) Heartbeats() map[string]*ClientData
- func (s *Server) Manager() manager.Manager
- func (s *Server) Register(x Subsystem)
- func (s *Server) Reload()
- func (s *Server) Run() error
- func (s *Server) Stop(onStop func())
- func (s *Server) Stopper() chan bool
- func (s *Server) Store() storage.Store
- type ServerOptions
- type Subsystem
- type Taskable
- type WorkerState
Constants ¶
This section is empty.
Variables ¶
var ( AlwaysMatch = func(value string) bool { return true } )
var CommandSet = map[string]command{
"END": end,
"PUSH": push,
"PUSHB": pushBulk,
"FETCH": fetch,
"ACK": ack,
"FAIL": fail,
"BEAT": heartbeat,
"INFO": info,
"FLUSH": flush,
"MUTATE": mutate,
"BATCH": batch,
"TRACK": track,
"QUEUE": queue,
}
var DefaultMaxPoolSize uint64 = 2000
This is the ultimate scalability limitation in Faktory, we only allow this many connections to Redis.
Functions ¶
This section is empty.
Types ¶
type ClientBeat ¶ added in v1.5.0
type ClientData ¶ added in v1.5.0
type ClientData struct {
StartedAt time.Time
Hostname string `json:"hostname"`
Wid string `json:"wid"`
PasswordHash string `json:"pwdhash"`
Username string `json:"username"`
Labels []string `json:"labels"`
Pid int `json:"pid"`
RssKb int64 `json:"rss_kb"`
Version uint8 `json:"v"`
// contains filtered or unexported fields
}
This represents a single client process. It may have many network connections open to Faktory.
A client can be a producer AND/OR consumer of jobs. Typically a process will either only produce jobs (like a webapp pushing jobs) or produce/consume jobs (like a faktory worker process where a job can create other jobs while executing).
Each Faktory worker process should send a BEAT command every 15 seconds. Only consumers should send a BEAT. If Faktory does not receive a BEAT from a worker process within 60 seconds, it expires and is removed from the Busy page.
From Faktory's POV, the worker can BEAT again and resume normal operations, e.g. due to a network partition. If a process dies, it will be removed after 1 minute and its jobs recovered after the job reservation timeout has passed (typically 30 minutes).
A worker process has a simple three-state lifecycle:
running -> quiet -> terminate
- Running means the worker is alive and processing jobs. - Quiet means the worker should stop FETCHing new jobs but continue working on existing jobs. It should not exit, even if no jobs are processing. - Terminate means the worker should exit within N seconds, where N is recommended to be 30 seconds. In practice, faktory_worker_ruby waits up to 25 seconds and any threads that are still busy are forcefully killed and their associated jobs reported as FAILed so they will be retried shortly.
A worker process should never stop sending BEAT. Even after "quiet" or "terminate", the BEAT should continue, only stopping due to process exit(). Workers should never move backward in state - you cannot "unquiet" a worker, it must be restarted.
Workers will typically also respond to standard Unix signals. faktory_worker_ruby uses TSTP ("Threads SToP") as the quiet signal and TERM as the terminate signal.
func (*ClientData) ConnectionCount ¶ added in v1.6.1
func (worker *ClientData) ConnectionCount() int
func (*ClientData) IsConsumer ¶ added in v1.5.0
func (worker *ClientData) IsConsumer() bool
func (*ClientData) IsQuiet ¶ added in v1.5.0
func (worker *ClientData) IsQuiet() bool
func (*ClientData) Signal ¶ added in v1.5.0
func (worker *ClientData) Signal(newstate WorkerState)
* Send "quiet" or "terminate" to the given client * worker process. Other signals are undefined.
type Connection ¶
Represents a connection to a faktory client.
faktory reuses the same wire protocol as Redis: RESP. It's a nice trade-off between human-readable and efficient. Shout out to antirez for his nice design document on it. https://redis.io/topics/protocol
func (*Connection) Close ¶
func (c *Connection) Close() error
func (*Connection) Number ¶
func (c *Connection) Number(val int) error
func (*Connection) Ok ¶
func (c *Connection) Ok() error
func (*Connection) Result ¶
func (c *Connection) Result(msg []byte) error
type RuntimeStats ¶
type Server ¶
type Server struct {
Options *ServerOptions
Stats *RuntimeStats
TLSPublicCert string
TLSPrivateKey string
Subsystems []Subsystem
// contains filtered or unexported fields
}
func NewServer ¶
func NewServer(opts *ServerOptions) (*Server, error)
func (*Server) CurrentState ¶ added in v1.5.0
func (s *Server) CurrentState() (*client.FaktoryState, error)
func (*Server) Heartbeats ¶
func (s *Server) Heartbeats() map[string]*ClientData
type ServerOptions ¶
type ServerOptions struct {
GlobalConfig map[string]any
Binding string
StorageDirectory string
RedisSock string
ConfigDirectory string
Environment string
Password string
PoolSize uint64
}
type Subsystem ¶ added in v1.5.0
type Subsystem interface {
Name() string
// Called when the server is configured but before it starts accepting client connections.
Start(*Server) error
// Called every time Faktory reloads the global config for the Server.
// Each subsystem is responsible for diffing its own config and making
// necessary changes.
Reload(*Server) error
}
type WorkerState ¶ added in v1.5.0
type WorkerState int
const ( Running WorkerState = iota Quiet Terminate )