Documentation
¶
Index ¶
- Constants
- func NewGrpcServer(extraServerOptions ...grpc.ServerOption) *grpc.Server
- func PerformDnsServiceDiscovery(ctx context.Context, config *DnsServiceDiscoveryConfig, ...)
- func SetupBadgerStorageMetrics()
- func Write(entry Entry, cluster *ClusterDescription) error
- type ClockServer
- type ClusterDescription
- type ClusterServer
- func (s *ClusterServer) Get(ctx context.Context, req *api.GetRequest) (*api.GetResponse, error)
- func (s *ClusterServer) Info(ctx context.Context, req *api.ClusterInfoRequest) (*api.ClusterInfoResponse, error)
- func (s *ClusterServer) Set(ctx context.Context, req *api.SetRequest) (*api.SetResponse, error)
- type ConnManager
- type ConnPool
- type CoordinatorConfig
- type DnsServiceDiscoveryConfig
- type Entry
- type FoundNode
- type NodeServer
- func (s *NodeServer) Get(ctx context.Context, req *api.GetRequest) (*api.NodeGetResponse, error)
- func (s *NodeServer) Health(ctx context.Context, _ *api.HealthRequest) (*api.HealthResponse, error)
- func (s *NodeServer) Info(_ *api.InfoRequest, stream api.Node_InfoServer) error
- func (s *NodeServer) Set(ctx context.Context, req *api.NodeSetRequest) (*api.SetResponse, error)
- type RoundRobinConnPool
- func (r *RoundRobinConnPool) Close() []error
- func (r *RoundRobinConnPool) Conn() *grpc.ClientConn
- func (p *RoundRobinConnPool) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, ...) error
- func (p *RoundRobinConnPool) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, ...) (grpc.ClientStream, error)
- func (r *RoundRobinConnPool) PoolSize() int
- type Storage
- type StorageNodeConfig
- type StorageNodeDescription
Constants ¶
View Source
const ( MAX_MSG_SIZE = 64 << 20 PING_RATE = 5 * time.Second PING_TIMEOUT = 5 * time.Second )
Variables ¶
This section is empty.
Functions ¶
func NewGrpcServer ¶
func NewGrpcServer(extraServerOptions ...grpc.ServerOption) *grpc.Server
func PerformDnsServiceDiscovery ¶
func PerformDnsServiceDiscovery(ctx context.Context, config *DnsServiceDiscoveryConfig, clusterDesc *ClusterDescription)
FIXME: Add TTL-style system to remove expired records
func SetupBadgerStorageMetrics ¶
func SetupBadgerStorageMetrics()
func Write ¶
func Write(entry Entry, cluster *ClusterDescription) error
Types ¶
type ClockServer ¶
type ClockServer struct {
api.UnimplementedClockServer
// contains filtered or unexported fields
}
Based on http://rystsov.info/2018/10/01/tso.html
func NewClockServer ¶
func NewClockServer(epochPath string) (*ClockServer, error)
func (*ClockServer) Get ¶
func (s *ClockServer) Get(ctx context.Context, req *api.ClockGetRequest) (*api.ClockGetResponse, error)
func (*ClockServer) Set ¶
func (s *ClockServer) Set(ctx context.Context, req *api.ClockSetRequest) (*api.ClockSetResponse, error)
type ClusterDescription ¶
type ClusterDescription struct {
sync.Mutex
ReplicationLevel int
RendezvousHashingSeed uint32
StorageNodes map[string]*StorageNodeDescription
// FIXME: Move connmanager off this metadata struct and onto more of a
// 'live' state struct.
ConnManager *ConnManager
}
func NewClusterDescription ¶
func NewClusterDescription(cfg *CoordinatorConfig, connManager *ConnManager) *ClusterDescription
func (*ClusterDescription) FindNodesForKey ¶
func (c *ClusterDescription) FindNodesForKey(key string) []FoundNode
type ClusterServer ¶
type ClusterServer struct {
api.UnimplementedClusterServer
// contains filtered or unexported fields
}
func NewClusterServer ¶
func NewClusterServer(clusterDesc *ClusterDescription) *ClusterServer
func (*ClusterServer) Get ¶
func (s *ClusterServer) Get(ctx context.Context, req *api.GetRequest) (*api.GetResponse, error)
func (*ClusterServer) Info ¶
func (s *ClusterServer) Info(ctx context.Context, req *api.ClusterInfoRequest) (*api.ClusterInfoResponse, error)
func (*ClusterServer) Set ¶
func (s *ClusterServer) Set(ctx context.Context, req *api.SetRequest) (*api.SetResponse, error)
type ConnManager ¶
type ConnManager struct {
sync.Mutex
PoolSize int
RemoveUnusedAfter time.Duration
Pools map[string]ConnPool
LastUsed map[string]time.Time
}
func NewConnManager ¶
func NewConnManager(poolSize int, removeUnusedAfter time.Duration) *ConnManager
func (*ConnManager) Get ¶
func (m *ConnManager) Get(address string) (conn grpc.ClientConnInterface, ok bool)
func (*ConnManager) Run ¶
func (m *ConnManager) Run(ctx context.Context)
type ConnPool ¶
type ConnPool interface {
grpc.ClientConnInterface
Conn() *grpc.ClientConn
PoolSize() int
Close() []error
}
type CoordinatorConfig ¶
type CoordinatorConfig struct {
RendezvousHashingSeed uint32 `yaml:"rendezvous_hashing_seed"`
ReplicationLevel int `yaml:"replication_level"`
BindAddress string `yaml:"bind_address"`
StorageNodeIds []string `yaml:"storage_node_ids"`
// Map from storage node IDs to "hostname:port"
StaticServiceDiscovery map[string]string `yaml:"static_service_discovery"`
DnsServiceDiscovery *DnsServiceDiscoveryConfig `yaml:"dns_service_discovery"`
SizeOfConnectionPools int `yaml:"size_of_connection_pools"`
RemoveUnusedConnectionPoolsAfter time.Duration `yaml:"remove_unused_connection_pools_after"`
}
func LoadCoordinatorConfig ¶
func LoadCoordinatorConfig(path string) (*CoordinatorConfig, error)
type FoundNode ¶
type FoundNode struct {
CombinedHash uint64
Node *StorageNodeDescription
}
type NodeServer ¶
type NodeServer struct {
api.UnimplementedNodeServer
// contains filtered or unexported fields
}
func NewNodeServer ¶
func NewNodeServer(nodeId string, storage *Storage) *NodeServer
func (*NodeServer) Get ¶
func (s *NodeServer) Get(ctx context.Context, req *api.GetRequest) (*api.NodeGetResponse, error)
func (*NodeServer) Health ¶
func (s *NodeServer) Health(ctx context.Context, _ *api.HealthRequest) (*api.HealthResponse, error)
func (*NodeServer) Info ¶
func (s *NodeServer) Info(_ *api.InfoRequest, stream api.Node_InfoServer) error
func (*NodeServer) Set ¶
func (s *NodeServer) Set(ctx context.Context, req *api.NodeSetRequest) (*api.SetResponse, error)
type RoundRobinConnPool ¶
type RoundRobinConnPool struct {
Index int32
Size int
Connections []*grpc.ClientConn
}
func NewRoundRobinConnPool ¶
func (*RoundRobinConnPool) Close ¶
func (r *RoundRobinConnPool) Close() []error
func (*RoundRobinConnPool) Conn ¶
func (r *RoundRobinConnPool) Conn() *grpc.ClientConn
func (*RoundRobinConnPool) Invoke ¶
func (p *RoundRobinConnPool) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error
func (*RoundRobinConnPool) NewStream ¶
func (p *RoundRobinConnPool) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error)
func (*RoundRobinConnPool) PoolSize ¶
func (r *RoundRobinConnPool) PoolSize() int
type Storage ¶
func NewStorage ¶
type StorageNodeConfig ¶
type StorageNodeConfig struct {
Id string `yaml:"id"`
BindAddress string `yaml:"bind_address"`
BindMetricsAddress string `yaml:"bind_metrics_address"`
ClockEpochFilePath string `yaml:"clock_epoch_file_path"`
BadgerDbFolder string `yaml:"badger_db_folder"`
}
func LoadStorageNodeConfig ¶
func LoadStorageNodeConfig(path string) (*StorageNodeConfig, error)
type StorageNodeDescription ¶
func NewStorageNodeDescription ¶
func NewStorageNodeDescription(id string, rendezvousHashingSeed uint32) *StorageNodeDescription
func (*StorageNodeDescription) Found ¶
func (s *StorageNodeDescription) Found() bool
Source Files
¶
Click to show internal directories.
Click to hide internal directories.