server

package
v1.9.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 22, 2025 License: AGPL-3.0 Imports: 23 Imported by: 40

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	AlwaysMatch = func(value string) bool {
		return true
	}
)
View Source
var CommandSet = map[string]command{
	"END":    end,
	"PUSH":   push,
	"PUSHB":  pushBulk,
	"FETCH":  fetch,
	"ACK":    ack,
	"FAIL":   fail,
	"BEAT":   heartbeat,
	"INFO":   info,
	"FLUSH":  flush,
	"MUTATE": mutate,
	"BATCH":  batch,
	"TRACK":  track,
	"QUEUE":  queue,
}
View Source
var DefaultMaxPoolSize uint64 = 2000

This is the ultimate scalability limitation in Faktory, we only allow this many connections to Redis.

Functions

This section is empty.

Types

type ClientBeat added in v1.5.0

type ClientBeat struct {
	CurrentState string `json:"current_state"`
	Wid          string `json:"wid"`
	RssKb        int64  `json:"rss_kb"`
}

type ClientData added in v1.5.0

type ClientData struct {
	StartedAt time.Time

	Hostname     string   `json:"hostname"`
	Wid          string   `json:"wid"`
	PasswordHash string   `json:"pwdhash"`
	Username     string   `json:"username"`
	Labels       []string `json:"labels"`
	Pid          int      `json:"pid"`
	RssKb        int64    `json:"rss_kb"`

	Version uint8 `json:"v"`
	// contains filtered or unexported fields
}

This represents a single client process. It may have many network connections open to Faktory.

A client can be a producer AND/OR consumer of jobs. Typically a process will either only produce jobs (like a webapp pushing jobs) or produce/consume jobs (like a faktory worker process where a job can create other jobs while executing).

Each Faktory worker process should send a BEAT command every 15 seconds. Only consumers should send a BEAT. If Faktory does not receive a BEAT from a worker process within 60 seconds, it expires and is removed from the Busy page.

From Faktory's POV, the worker can BEAT again and resume normal operations, e.g. due to a network partition. If a process dies, it will be removed after 1 minute and its jobs recovered after the job reservation timeout has passed (typically 30 minutes).

A worker process has a simple three-state lifecycle:

running -> quiet -> terminate

- Running means the worker is alive and processing jobs. - Quiet means the worker should stop FETCHing new jobs but continue working on existing jobs. It should not exit, even if no jobs are processing. - Terminate means the worker should exit within N seconds, where N is recommended to be 30 seconds. In practice, faktory_worker_ruby waits up to 25 seconds and any threads that are still busy are forcefully killed and their associated jobs reported as FAILed so they will be retried shortly.

A worker process should never stop sending BEAT. Even after "quiet" or "terminate", the BEAT should continue, only stopping due to process exit(). Workers should never move backward in state - you cannot "unquiet" a worker, it must be restarted.

Workers will typically also respond to standard Unix signals. faktory_worker_ruby uses TSTP ("Threads SToP") as the quiet signal and TERM as the terminate signal.

func (*ClientData) ConnectionCount added in v1.6.1

func (worker *ClientData) ConnectionCount() int

func (*ClientData) IsConsumer added in v1.5.0

func (worker *ClientData) IsConsumer() bool

func (*ClientData) IsQuiet added in v1.5.0

func (worker *ClientData) IsQuiet() bool

func (*ClientData) Signal added in v1.5.0

func (worker *ClientData) Signal(newstate WorkerState)

* Send "quiet" or "terminate" to the given client * worker process. Other signals are undefined.

type Connection

type Connection struct {
	context.Context
	// contains filtered or unexported fields
}

Represents a connection to a faktory client.

faktory reuses the same wire protocol as Redis: RESP. It's a nice trade-off between human-readable and efficient. Shout out to antirez for his nice design document on it. https://redis.io/topics/protocol

func (*Connection) Close

func (c *Connection) Close() error

func (*Connection) Error

func (c *Connection) Error(cmd string, err error) error

func (*Connection) Number

func (c *Connection) Number(val int) error

func (*Connection) Ok

func (c *Connection) Ok() error

func (*Connection) Result

func (c *Connection) Result(msg []byte) error

type RuntimeStats

type RuntimeStats struct {
	StartedAt   time.Time
	Connections uint64
	Commands    uint64
}

type Server

type Server struct {
	Options *ServerOptions
	Stats   *RuntimeStats

	TLSPublicCert string
	TLSPrivateKey string

	Subsystems []Subsystem
	// contains filtered or unexported fields
}

func NewServer

func NewServer(opts *ServerOptions) (*Server, error)

func (*Server) AddTask added in v1.5.0

func (s *Server) AddTask(everySec int64, task Taskable)

func (*Server) Boot added in v1.5.0

func (s *Server) Boot() error

func (*Server) CurrentState added in v1.5.0

func (s *Server) CurrentState() (*client.FaktoryState, error)

func (*Server) Heartbeats

func (s *Server) Heartbeats() map[string]*ClientData

func (*Server) Manager added in v1.5.0

func (s *Server) Manager() manager.Manager

func (*Server) Register added in v1.5.0

func (s *Server) Register(x Subsystem)

register a global handler to be called when the Server instance has finished booting but before it starts listening.

func (*Server) Reload added in v1.5.0

func (s *Server) Reload()

func (*Server) Run added in v1.5.0

func (s *Server) Run() error

func (*Server) Stop

func (s *Server) Stop(onStop func())

func (*Server) Stopper added in v1.5.0

func (s *Server) Stopper() chan bool

func (*Server) Store

func (s *Server) Store() storage.Store

type ServerOptions

type ServerOptions struct {
	GlobalConfig     map[string]any
	Binding          string
	StorageDirectory string
	RedisSock        string
	ConfigDirectory  string
	Environment      string
	Password         string
	PoolSize         uint64
}

func (*ServerOptions) Config added in v1.5.0

func (so *ServerOptions) Config(subsys string, key string, defval any) any

func (*ServerOptions) String added in v1.5.0

func (so *ServerOptions) String(subsys string, key string, defval string) string

type Subsystem added in v1.5.0

type Subsystem interface {
	Name() string

	// Called when the server is configured but before it starts accepting client connections.
	Start(*Server) error

	// Called every time Faktory reloads the global config for the Server.
	// Each subsystem is responsible for diffing its own config and making
	// necessary changes.
	Reload(*Server) error
}

type Taskable added in v1.5.0

type Taskable interface {
	Name() string
	Execute(context.Context) error
	Stats(context.Context) map[string]any
}

type WorkerState added in v1.5.0

type WorkerState int
const (
	Running WorkerState = iota
	Quiet
	Terminate
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL