Documentation
¶
Overview ¶
Package dbsync provides database replication from a back-office (BO) to one or more front-office (FO) instances. The BO produces filtered SQLite snapshots and pushes them over QUIC. Each FO receives the snapshot, validates its integrity, and swaps the local read-only database atomically.
The package integrates with hazyhaar_pkg/connectivity for dynamic target management and hazyhaar_pkg/watch for change detection.
Index ¶
- Constants
- func DBSyncFactory(pub *Publisher) connectivity.TransportFactory
- func ListenSnapshots(ctx context.Context, addr string, tlsCfg *tls.Config, ...) error
- func PushSnapshot(ctx context.Context, endpoint string, tlsCfg *tls.Config, meta SnapshotMeta, ...) error
- func RedirectHandler(boURL string) http.HandlerFunc
- func SubscriberFactory(sub *Subscriber) connectivity.TransportFactory
- func SyncClientTLSConfig(insecureSkipVerify bool) *tls.Config
- func SyncClientTLSConfigWithCA(caCertFile string) (*tls.Config, error)
- func SyncTLSConfig(certFile, keyFile string) (*tls.Config, error)
- func SyncTLSConfigMutual(certFile, keyFile, caCertFile string) (*tls.Config, error)
- func ValidateFilterSpec(spec FilterSpec) error
- type AuthProxydeprecated
- func (p *AuthProxy) ForgotPasswordHandler(setFlash func(http.ResponseWriter, string, string)) http.HandlerFunc
- func (p *AuthProxy) LoginHandler(setFlash func(http.ResponseWriter, string, string)) http.HandlerFunc
- func (p *AuthProxy) RegisterHandler(setFlash func(http.ResponseWriter, string, string)) http.HandlerFunc
- func (p *AuthProxy) ResetPasswordHandler(setFlash func(http.ResponseWriter, string, string)) http.HandlerFunc
- type BOHealthChecker
- type FilterSpec
- type Option
- type PartialTable
- type Publisher
- type RoutesTargetProvider
- type SnapshotMeta
- type StaticTargetProvider
- type Subscriber
- func (s *Subscriber) Close() error
- func (s *Subscriber) DB() *sql.DB
- func (s *Subscriber) LastSwapAt() int64
- func (s *Subscriber) OnSwap(fn func())
- func (s *Subscriber) Ping(ctx context.Context) error
- func (s *Subscriber) StaleSince() time.Duration
- func (s *Subscriber) Start(ctx context.Context) error
- func (s *Subscriber) Status() map[string]any
- func (s *Subscriber) Version() int64
- type Target
- type TargetProvider
- type WriteProxy
Examples ¶
Constants ¶
const ( // ALPNProtocol identifies dbsync streams over QUIC, distinct from MCP. ALPNProtocol = "horos-dbsync-v1" // MagicBytes are sent at the start of every snapshot stream for framing. MagicBytes = "SYN1" // MaxSnapshotSize is a safety limit (512 MB). MaxSnapshotSize = 512 * 1024 * 1024 )
ALPN and wire-format constants.
Variables ¶
This section is empty.
Functions ¶
func DBSyncFactory ¶
func DBSyncFactory(pub *Publisher) connectivity.TransportFactory
DBSyncFactory returns a connectivity.TransportFactory for the "dbsync" strategy. When the router calls a dbsync route, it returns the status of the sync (last version, hash, etc.) rather than performing a data operation.
This allows the LLM to check sync health via:
resp, _ := router.Call(ctx, "dbsync:fo-1", nil)
// resp contains {"last_version": N, "status": "ok"}
The factory requires a reference to the Publisher so it can report status.
func ListenSnapshots ¶
func ListenSnapshots(ctx context.Context, addr string, tlsCfg *tls.Config, handler func(SnapshotMeta, io.Reader) error) error
ListenSnapshots accepts incoming QUIC connections carrying snapshots. For each connection it reads the wire format and calls handler with the parsed metadata and a reader for the raw database bytes. Blocks until ctx is cancelled.
func PushSnapshot ¶
func PushSnapshot(ctx context.Context, endpoint string, tlsCfg *tls.Config, meta SnapshotMeta, snapshotPath string) error
PushSnapshot sends a snapshot file to a remote subscriber over QUIC.
Wire format:
- Magic bytes "SYN1" (4 bytes)
- Meta length (4 bytes big-endian uint32)
- Meta JSON (variable)
- Snapshot bytes (gzip-compressed if meta.Compressed, raw otherwise)
func RedirectHandler ¶
func RedirectHandler(boURL string) http.HandlerFunc
RedirectHandler returns an http.HandlerFunc that redirects the client to the BO URL instead of proxying. This is a simpler alternative when the BO is directly accessible from the user's browser.
func SubscriberFactory ¶
func SubscriberFactory(sub *Subscriber) connectivity.TransportFactory
SubscriberFactory returns a connectivity.TransportFactory for FO-side status reporting. Similar to DBSyncFactory but reports subscriber state.
func SyncClientTLSConfig ¶
SyncClientTLSConfig returns a TLS config for connecting to a dbsync endpoint.
WARNING: when insecureSkipVerify is true, the client accepts any server certificate. This is only appropriate for local development and testing. In production, use SyncClientTLSConfigWithCA to pin the CA certificate.
func SyncClientTLSConfigWithCA ¶
SyncClientTLSConfigWithCA returns a TLS config that trusts the given CA certificate file. Use this in production when the dbsync server uses a self-signed or internal CA certificate.
func SyncTLSConfig ¶
SyncTLSConfig builds a TLS config with the dbsync ALPN protocol. The cert is added to RootCAs so that self-signed certs are trusted when this config is used as a client (publisher dialing subscriber).
func SyncTLSConfigMutual ¶
SyncTLSConfigMutual builds a TLS config for the subscriber (listener) that requires the publisher to present a valid client certificate signed by the given CA. This prevents rogue clients from pushing snapshots.
certFile/keyFile: the subscriber's own certificate. caCertFile: the CA certificate that signed the publisher's certificate.
func ValidateFilterSpec ¶
func ValidateFilterSpec(spec FilterSpec) error
ValidateFilterSpec checks that WHERE clauses in the spec do not contain dangerous SQL patterns (multi-statement, DDL, etc.). This is a defense-in-depth measure — FilterSpec is expected to be set by the deployer, not end users.
Types ¶
type AuthProxy
deprecated
type AuthProxy struct {
// HealthCheck is an optional callback that returns whether the BO is
// reachable. When set and returning false, auth handlers fail fast
// instead of waiting for the HTTP timeout.
HealthCheck func() bool
// contains filtered or unexported fields
}
Deprecated: Use github.com/hazyhaar/pkg/authproxy instead. This type is kept for backward compatibility with existing callers.
AuthProxy calls the BO internal auth API and translates the JSON response into cookies + redirects for the FO domain. The user never sees the BO URL.
func NewAuthProxy ¶
NewAuthProxy creates an auth proxy that calls BO internal API endpoints.
func (*AuthProxy) ForgotPasswordHandler ¶
func (p *AuthProxy) ForgotPasswordHandler(setFlash func(http.ResponseWriter, string, string)) http.HandlerFunc
ForgotPasswordHandler returns an http.HandlerFunc for POST /forgot-password on the FO. It reads the form, calls BO /api/internal/auth/forgot-password, and redirects.
func (*AuthProxy) LoginHandler ¶
func (p *AuthProxy) LoginHandler(setFlash func(http.ResponseWriter, string, string)) http.HandlerFunc
LoginHandler returns an http.HandlerFunc for POST /login on the FO. It reads the form, calls BO /api/internal/auth/login, sets the cookie, and redirects.
func (*AuthProxy) RegisterHandler ¶
func (p *AuthProxy) RegisterHandler(setFlash func(http.ResponseWriter, string, string)) http.HandlerFunc
RegisterHandler returns an http.HandlerFunc for POST /register on the FO. It reads the form, calls BO /api/internal/auth/register, and redirects.
func (*AuthProxy) ResetPasswordHandler ¶
func (p *AuthProxy) ResetPasswordHandler(setFlash func(http.ResponseWriter, string, string)) http.HandlerFunc
ResetPasswordHandler returns an http.HandlerFunc for POST /reset-password on the FO. It reads the form, calls BO /api/internal/auth/reset-password, and redirects.
type BOHealthChecker ¶
type BOHealthChecker struct {
// contains filtered or unexported fields
}
BOHealthChecker periodically pings the back-office to track reachability. The cached result is used by auth proxies and health endpoints to fail fast instead of waiting for a 10s HTTP timeout every time.
func NewBOHealthChecker ¶
func NewBOHealthChecker(boURL string, interval time.Duration) *BOHealthChecker
NewBOHealthChecker creates a health checker that pings boURL+"/health" every interval. Call Start to begin the check loop.
func (*BOHealthChecker) Healthy ¶
func (h *BOHealthChecker) Healthy() bool
Healthy returns the cached reachability status. Returns false if no check has been performed yet.
func (*BOHealthChecker) Start ¶
func (h *BOHealthChecker) Start(ctx context.Context)
Start begins the periodic health check loop. Blocks until ctx is cancelled.
func (*BOHealthChecker) Status ¶
func (h *BOHealthChecker) Status() map[string]any
Status returns a JSON-serializable summary.
type FilterSpec ¶
type FilterSpec struct {
// FullTables are copied integrally without modification.
FullTables []string
// FilteredTables maps table name → WHERE clause. Only rows matching
// the clause are included.
FilteredTables map[string]string
// PartialTables maps table name → column selection + optional WHERE.
// Columns not listed are dropped (set to their zero value).
PartialTables map[string]PartialTable
}
FilterSpec defines which tables and columns are included in the public snapshot pushed to FO instances. Anything not listed is excluded.
Example ¶
package main
import (
"fmt"
"github.com/hazyhaar/pkg/dbsync"
)
func main() {
spec := dbsync.FilterSpec{
// Tables copied without modification.
FullTables: []string{"badges", "reputation_config", "rate_limits", "maintenance"},
// Tables with row-level filtering.
FilteredTables: map[string]string{
"engagements": "visibility = 'public'",
"templates": "is_blacklisted = 0",
},
// Tables with column-level filtering and optional WHERE.
PartialTables: map[string]dbsync.PartialTable{
"users": {
Columns: []string{"user_id", "username", "display_name", "avatar_url"},
Where: "is_active = 1",
},
},
}
err := dbsync.ValidateFilterSpec(spec)
if err != nil {
fmt.Println("invalid filter:", err)
}
}
type Option ¶
type Option func(*options)
Option configures Publisher or Subscriber.
func WithCompression ¶
func WithCompression() Option
WithCompression enables gzip compression of snapshot payloads before push. Typically reduces transfer size by 60-70% for SQLite databases.
func WithDriverName ¶
WithDriverName sets the SQL driver name used when opening databases. Defaults to "sqlite". Use "sqlite-trace" to enable SQL tracing.
func WithMaxAge ¶
WithMaxAge sets the maximum acceptable age for incoming snapshots. The subscriber rejects snapshots whose timestamp is older than this duration, protecting against rollback/replay attacks.
func WithWatchDebounce ¶
WithWatchDebounce sets the debounce window after a change is detected.
func WithWatchInterval ¶
WithWatchInterval sets the database polling interval.
type PartialTable ¶
PartialTable describes a table where only selected columns are published and an optional WHERE clause restricts rows.
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher watches a source database for changes, produces filtered snapshots, and pushes them to all FO subscribers returned by the TargetProvider.
func NewPublisher ¶
func NewPublisher(db *sql.DB, targets TargetProvider, filter FilterSpec, savePath string, tlsCfg *tls.Config, opts ...Option) *Publisher
NewPublisher creates a Publisher that resolves push targets via the given TargetProvider. For services that push to a fixed set of FOs, use NewStaticTargetProvider. For dynamic routing via a connectivity routes DB, use NewRoutesTargetProvider.
Parameters:
- db: the source (BO) database to snapshot
- targets: provides the list of FO endpoints to push to
- filter: defines which tables/columns are included
- savePath: path for the local "save chaude" snapshot file
- tlsCfg: TLS config for QUIC push (use SyncClientTLSConfig for dev)
Example (Static) ¶
package main
import (
"context"
"database/sql"
"github.com/hazyhaar/pkg/dbsync"
)
func main() {
var db *sql.DB // your source database
tlsCfg := dbsync.SyncClientTLSConfig(true) // dev mode
pub := dbsync.NewPublisher(db, dbsync.NewStaticTargetProvider(
dbsync.Target{Name: "fo-1", Strategy: "dbsync", Endpoint: "10.0.0.2:9443"},
), dbsync.FilterSpec{
FullTables: []string{"posts", "categories"},
}, "db/public.db", tlsCfg)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() { _ = pub.Start(ctx) }()
// Publisher now watches db and pushes snapshots to fo-1.
_ = pub // use pub
}
func NewPublisherWithRoutesDB ¶
func NewPublisherWithRoutesDB(db, routesDB *sql.DB, filter FilterSpec, savePath string, tlsCfg *tls.Config, opts ...Option) *Publisher
NewPublisherWithRoutesDB creates a Publisher that reads push targets from a connectivity routes table. This is the original constructor signature kept for backward compatibility with repvow and other existing callers.
Example ¶
package main
import (
"context"
"database/sql"
"github.com/hazyhaar/pkg/dbsync"
)
func main() {
var db, routesDB *sql.DB // source + connectivity routes databases
tlsCfg := dbsync.SyncClientTLSConfig(true)
pub := dbsync.NewPublisherWithRoutesDB(db, routesDB, dbsync.FilterSpec{
FullTables: []string{"posts"},
FilteredTables: map[string]string{
"comments": "is_hidden = 0",
},
}, "db/public.db", tlsCfg)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() { _ = pub.Start(ctx) }()
_ = pub // use pub
}
func (*Publisher) LastMeta ¶
func (p *Publisher) LastMeta() *SnapshotMeta
LastMeta returns the metadata of the most recently produced snapshot.
type RoutesTargetProvider ¶
type RoutesTargetProvider struct {
// contains filtered or unexported fields
}
RoutesTargetProvider reads dbsync targets from a connectivity routes table.
func NewRoutesTargetProvider ¶
func NewRoutesTargetProvider(db *sql.DB) *RoutesTargetProvider
NewRoutesTargetProvider creates a TargetProvider backed by the connectivity routes table in db. It queries rows where strategy IN ('dbsync', 'noop') AND service_name LIKE 'dbsync:%'.
type SnapshotMeta ¶
type SnapshotMeta struct {
Version int64 `json:"version"`
Hash string `json:"hash"` // SHA-256 hex of uncompressed data
Size int64 `json:"size"` // uncompressed file size in bytes
Timestamp int64 `json:"timestamp"` // unix epoch seconds
Compressed bool `json:"compressed"` // true if payload is gzip-compressed
}
SnapshotMeta is sent as the first message on the QUIC stream before the raw database bytes follow.
func ProduceSnapshot ¶
func ProduceSnapshot(srcDB *sql.DB, dstPath string, spec FilterSpec) (*SnapshotMeta, error)
ProduceSnapshot creates a filtered copy of srcDB at dstPath.
Steps:
- Validate WHERE clauses in the FilterSpec
- VACUUM INTO a temporary file (consistent snapshot of entire DB)
- Open the copy with the plain "sqlite" driver
- Drop tables not in the FilterSpec whitelist
- Apply WHERE clauses (FilteredTables)
- Truncate non-selected columns (PartialTables)
- VACUUM to compact
- SHA-256 hash the result
type StaticTargetProvider ¶
type StaticTargetProvider struct {
// contains filtered or unexported fields
}
StaticTargetProvider returns a fixed list of targets.
func NewStaticTargetProvider ¶
func NewStaticTargetProvider(targets ...Target) *StaticTargetProvider
NewStaticTargetProvider creates a TargetProvider from a fixed list of targets.
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber listens for incoming snapshot pushes over QUIC, verifies integrity, and performs an atomic swap of the local read-only database.
func NewSubscriber ¶
func NewSubscriber(dbPath, listenAddr string, tlsCfg *tls.Config, opts ...Option) *Subscriber
NewSubscriber creates a Subscriber that listens on listenAddr for snapshot pushes and maintains a read-only database at dbPath.
Example ¶
package main
import (
"context"
"log/slog"
"github.com/hazyhaar/pkg/dbsync"
)
func main() {
tlsCfg := dbsync.SyncClientTLSConfig(true) // would be SyncTLSConfig in prod
sub := dbsync.NewSubscriber("db/public.db", ":9443", tlsCfg)
sub.OnSwap(func() {
slog.Info("database swapped", "version", sub.Version())
// Recreate service instances with sub.DB()
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() { _ = sub.Start(ctx) }()
_ = sub // use sub
}
func (*Subscriber) Close ¶
func (s *Subscriber) Close() error
Close closes the current database connection.
func (*Subscriber) DB ¶
func (s *Subscriber) DB() *sql.DB
DB returns the current read-only database connection. May return nil if no snapshot has been received yet.
func (*Subscriber) LastSwapAt ¶
func (s *Subscriber) LastSwapAt() int64
LastSwapAt returns the unix timestamp of the last successful database swap, or 0 if no swap has occurred yet.
func (*Subscriber) OnSwap ¶
func (s *Subscriber) OnSwap(fn func())
OnSwap registers a callback invoked after a successful database swap. Callbacks are called synchronously in registration order.
func (*Subscriber) Ping ¶
func (s *Subscriber) Ping(ctx context.Context) error
Ping reports whether the subscriber is healthy. It returns nil if a snapshot has been received and the database is accessible, or an error describing what is missing (no snapshot, DB unreachable).
func (*Subscriber) StaleSince ¶
func (s *Subscriber) StaleSince() time.Duration
StaleSince returns how long since the last successful swap. Returns -1 if no swap has occurred yet.
func (*Subscriber) Start ¶
func (s *Subscriber) Start(ctx context.Context) error
Start listens for incoming snapshots over QUIC. Blocks until ctx is cancelled.
func (*Subscriber) Status ¶
func (s *Subscriber) Status() map[string]any
Status returns a JSON-serializable status summary.
func (*Subscriber) Version ¶
func (s *Subscriber) Version() int64
Version returns the version of the last received snapshot.
type Target ¶
type Target struct {
Name string // identifier, e.g. "fo-1"
Strategy string // "dbsync" or "noop"
Endpoint string // "ip:port" for QUIC dial
}
Target describes a single FO endpoint that receives snapshots.
type TargetProvider ¶
TargetProvider returns the list of sync targets that a Publisher should push snapshots to. Implementations range from a static list to a database-backed route table.
type WriteProxy ¶
type WriteProxy struct {
// contains filtered or unexported fields
}
WriteProxy forwards HTTP requests from FO to the BO endpoint. In FO mode, write operations (POST, PUT, DELETE) are proxied to the BO which owns the read-write database.
func NewWriteProxy ¶
func NewWriteProxy(boEndpoint string, tlsCfg *tls.Config) (*WriteProxy, error)
NewWriteProxy creates a reverse proxy that forwards requests to boEndpoint. boEndpoint must be a valid HTTP(S) URL, e.g. "https://bo.internal:8443". Returns an error if the endpoint URL cannot be parsed.
func (*WriteProxy) Handler ¶
func (p *WriteProxy) Handler() http.Handler
Handler returns an http.Handler that proxies all requests to the BO.