dbsync

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2026 License: MIT Imports: 27 Imported by: 0

README

dbsync — SQLite replication over QUIC

dbsync replicates a filtered SQLite database from a back-office (BO) to one or more front-office (FO) instances. The BO watches for changes, produces a filtered snapshot, and pushes it over QUIC. Each FO validates the snapshot and swaps its local read-only database atomically.

  BO (read-write)               FO (read-only)
  ┌────────────────┐            ┌────────────────┐
  │  Source DB      │            │  Public DB      │
  │  (full data)    │            │  (filtered)     │
  └───────┬────────┘            └───────▲────────┘
          │ watch + filter              │ atomic swap
          ▼                             │
  ┌────────────────┐   QUIC/TLS  ┌─────┴──────────┐
  │  Publisher      │───────────▶│  Subscriber     │
  │  (SYN1 + JSON   │            │  (verify hash,  │
  │   + raw bytes)  │            │   rename, open) │
  └────────────────┘            └────────────────┘

Quick start

Publisher (BO side)
// Static targets (simplest — push to a known FO).
pub := dbsync.NewPublisher(db, dbsync.NewStaticTargetProvider(
    dbsync.Target{Name: "fo-1", Strategy: "dbsync", Endpoint: "91.134.142.134:9443"},
), myFilter(), "db/public.db", tlsCfg)

go pub.Start(ctx)
// Dynamic targets via connectivity routes table.
pub := dbsync.NewPublisherWithRoutesDB(db, routesDB, myFilter(), "db/public.db", tlsCfg)
go pub.Start(ctx)
Subscriber (FO side)
sub := dbsync.NewSubscriber("db/public.db", ":9443", tlsCfg)
sub.OnSwap(func() {
    // Recreate services with sub.DB()
})
go sub.Start(ctx)

FilterSpec

FilterSpec controls which data is included in the public snapshot.

spec := dbsync.FilterSpec{
    // Copied integrally.
    FullTables: []string{"badges", "reputation_config"},

    // Only rows matching the WHERE clause.
    FilteredTables: map[string]string{
        "engagements": "visibility = 'public'",
        "templates":   "is_blacklisted = 0",
    },

    // Only listed columns kept (others set to NULL/zero). Optional WHERE.
    PartialTables: map[string]dbsync.PartialTable{
        "users": {
            Columns: []string{"user_id", "username", "display_name", "avatar_url"},
            Where:   "is_active = 1",
        },
    },
}

Tables not listed anywhere are dropped from the snapshot.

Environment variables

Variable Default Description
SYNC_LISTEN :9443 Subscriber QUIC listen address
SYNC_CERT TLS certificate file
SYNC_KEY TLS private key file
SYNC_SAVE_PATH db/public.db Publisher local snapshot path
ROUTES_DB_PATH Connectivity routes DB (for RoutesTargetProvider)
BO_URL Back-office URL (for FO auth proxy + redirects)

Wire format

Each QUIC stream carries one snapshot:

┌──────────┬───────────────┬──────────────┬──────────────────┐
│ "SYN1"   │ meta_len (4B) │ meta JSON    │ raw DB bytes     │
│ (4 bytes)│ big-endian    │ (variable)   │ (meta.Size bytes)│
└──────────┴───────────────┴──────────────┴──────────────────┘

Meta JSON (SnapshotMeta):

{"version": 1708012345000, "hash": "sha256hex...", "size": 131072, "timestamp": 1708012345}

TLS configuration

Production (self-signed CA):

// Server (subscriber):
tlsCfg, _ := dbsync.SyncTLSConfig("cert.pem", "key.pem")

// Client (publisher) — pin CA cert:
tlsCfg, _ := dbsync.SyncClientTLSConfigWithCA("ca.pem")

Development:

tlsCfg := dbsync.SyncClientTLSConfig(true) // insecureSkipVerify

Documentation

Overview

Package dbsync provides database replication from a back-office (BO) to one or more front-office (FO) instances. The BO produces filtered SQLite snapshots and pushes them over QUIC. Each FO receives the snapshot, validates its integrity, and swaps the local read-only database atomically.

The package integrates with hazyhaar_pkg/connectivity for dynamic target management and hazyhaar_pkg/watch for change detection.

Index

Examples

Constants

View Source
const (
	// ALPNProtocol identifies dbsync streams over QUIC, distinct from MCP.
	ALPNProtocol = "horos-dbsync-v1"

	// MagicBytes are sent at the start of every snapshot stream for framing.
	MagicBytes = "SYN1"

	// MaxSnapshotSize is a safety limit (512 MB).
	MaxSnapshotSize = 512 * 1024 * 1024
)

ALPN and wire-format constants.

Variables

This section is empty.

Functions

func DBSyncFactory

func DBSyncFactory(pub *Publisher) connectivity.TransportFactory

DBSyncFactory returns a connectivity.TransportFactory for the "dbsync" strategy. When the router calls a dbsync route, it returns the status of the sync (last version, hash, etc.) rather than performing a data operation.

This allows the LLM to check sync health via:

resp, _ := router.Call(ctx, "dbsync:fo-1", nil)
// resp contains {"last_version": N, "status": "ok"}

The factory requires a reference to the Publisher so it can report status.

func ListenSnapshots

func ListenSnapshots(ctx context.Context, addr string, tlsCfg *tls.Config, handler func(SnapshotMeta, io.Reader) error) error

ListenSnapshots accepts incoming QUIC connections carrying snapshots. For each connection it reads the wire format and calls handler with the parsed metadata and a reader for the raw database bytes. Blocks until ctx is cancelled.

func PushSnapshot

func PushSnapshot(ctx context.Context, endpoint string, tlsCfg *tls.Config, meta SnapshotMeta, snapshotPath string) error

PushSnapshot sends a snapshot file to a remote subscriber over QUIC.

Wire format:

  1. Magic bytes "SYN1" (4 bytes)
  2. Meta length (4 bytes big-endian uint32)
  3. Meta JSON (variable)
  4. Snapshot bytes (gzip-compressed if meta.Compressed, raw otherwise)

func RedirectHandler

func RedirectHandler(boURL string) http.HandlerFunc

RedirectHandler returns an http.HandlerFunc that redirects the client to the BO URL instead of proxying. This is a simpler alternative when the BO is directly accessible from the user's browser.

func SubscriberFactory

func SubscriberFactory(sub *Subscriber) connectivity.TransportFactory

SubscriberFactory returns a connectivity.TransportFactory for FO-side status reporting. Similar to DBSyncFactory but reports subscriber state.

func SyncClientTLSConfig

func SyncClientTLSConfig(insecureSkipVerify bool) *tls.Config

SyncClientTLSConfig returns a TLS config for connecting to a dbsync endpoint.

WARNING: when insecureSkipVerify is true, the client accepts any server certificate. This is only appropriate for local development and testing. In production, use SyncClientTLSConfigWithCA to pin the CA certificate.

func SyncClientTLSConfigWithCA

func SyncClientTLSConfigWithCA(caCertFile string) (*tls.Config, error)

SyncClientTLSConfigWithCA returns a TLS config that trusts the given CA certificate file. Use this in production when the dbsync server uses a self-signed or internal CA certificate.

func SyncTLSConfig

func SyncTLSConfig(certFile, keyFile string) (*tls.Config, error)

SyncTLSConfig builds a TLS config with the dbsync ALPN protocol. The cert is added to RootCAs so that self-signed certs are trusted when this config is used as a client (publisher dialing subscriber).

func SyncTLSConfigMutual

func SyncTLSConfigMutual(certFile, keyFile, caCertFile string) (*tls.Config, error)

SyncTLSConfigMutual builds a TLS config for the subscriber (listener) that requires the publisher to present a valid client certificate signed by the given CA. This prevents rogue clients from pushing snapshots.

certFile/keyFile: the subscriber's own certificate. caCertFile: the CA certificate that signed the publisher's certificate.

func ValidateFilterSpec

func ValidateFilterSpec(spec FilterSpec) error

ValidateFilterSpec checks that WHERE clauses in the spec do not contain dangerous SQL patterns (multi-statement, DDL, etc.). This is a defense-in-depth measure — FilterSpec is expected to be set by the deployer, not end users.

Types

type AuthProxy deprecated

type AuthProxy struct {

	// HealthCheck is an optional callback that returns whether the BO is
	// reachable. When set and returning false, auth handlers fail fast
	// instead of waiting for the HTTP timeout.
	HealthCheck func() bool
	// contains filtered or unexported fields
}

Deprecated: Use github.com/hazyhaar/pkg/authproxy instead. This type is kept for backward compatibility with existing callers.

AuthProxy calls the BO internal auth API and translates the JSON response into cookies + redirects for the FO domain. The user never sees the BO URL.

func NewAuthProxy

func NewAuthProxy(boURL, cookieDomain string, secure bool) *AuthProxy

NewAuthProxy creates an auth proxy that calls BO internal API endpoints.

func (*AuthProxy) ForgotPasswordHandler

func (p *AuthProxy) ForgotPasswordHandler(setFlash func(http.ResponseWriter, string, string)) http.HandlerFunc

ForgotPasswordHandler returns an http.HandlerFunc for POST /forgot-password on the FO. It reads the form, calls BO /api/internal/auth/forgot-password, and redirects.

func (*AuthProxy) LoginHandler

func (p *AuthProxy) LoginHandler(setFlash func(http.ResponseWriter, string, string)) http.HandlerFunc

LoginHandler returns an http.HandlerFunc for POST /login on the FO. It reads the form, calls BO /api/internal/auth/login, sets the cookie, and redirects.

func (*AuthProxy) RegisterHandler

func (p *AuthProxy) RegisterHandler(setFlash func(http.ResponseWriter, string, string)) http.HandlerFunc

RegisterHandler returns an http.HandlerFunc for POST /register on the FO. It reads the form, calls BO /api/internal/auth/register, and redirects.

func (*AuthProxy) ResetPasswordHandler

func (p *AuthProxy) ResetPasswordHandler(setFlash func(http.ResponseWriter, string, string)) http.HandlerFunc

ResetPasswordHandler returns an http.HandlerFunc for POST /reset-password on the FO. It reads the form, calls BO /api/internal/auth/reset-password, and redirects.

type BOHealthChecker

type BOHealthChecker struct {
	// contains filtered or unexported fields
}

BOHealthChecker periodically pings the back-office to track reachability. The cached result is used by auth proxies and health endpoints to fail fast instead of waiting for a 10s HTTP timeout every time.

func NewBOHealthChecker

func NewBOHealthChecker(boURL string, interval time.Duration) *BOHealthChecker

NewBOHealthChecker creates a health checker that pings boURL+"/health" every interval. Call Start to begin the check loop.

func (*BOHealthChecker) Healthy

func (h *BOHealthChecker) Healthy() bool

Healthy returns the cached reachability status. Returns false if no check has been performed yet.

func (*BOHealthChecker) Start

func (h *BOHealthChecker) Start(ctx context.Context)

Start begins the periodic health check loop. Blocks until ctx is cancelled.

func (*BOHealthChecker) Status

func (h *BOHealthChecker) Status() map[string]any

Status returns a JSON-serializable summary.

type FilterSpec

type FilterSpec struct {
	// FullTables are copied integrally without modification.
	FullTables []string

	// FilteredTables maps table name → WHERE clause. Only rows matching
	// the clause are included.
	FilteredTables map[string]string

	// PartialTables maps table name → column selection + optional WHERE.
	// Columns not listed are dropped (set to their zero value).
	PartialTables map[string]PartialTable
}

FilterSpec defines which tables and columns are included in the public snapshot pushed to FO instances. Anything not listed is excluded.

Example
package main

import (
	"fmt"

	"github.com/hazyhaar/pkg/dbsync"
)

func main() {
	spec := dbsync.FilterSpec{
		// Tables copied without modification.
		FullTables: []string{"badges", "reputation_config", "rate_limits", "maintenance"},

		// Tables with row-level filtering.
		FilteredTables: map[string]string{
			"engagements": "visibility = 'public'",
			"templates":   "is_blacklisted = 0",
		},

		// Tables with column-level filtering and optional WHERE.
		PartialTables: map[string]dbsync.PartialTable{
			"users": {
				Columns: []string{"user_id", "username", "display_name", "avatar_url"},
				Where:   "is_active = 1",
			},
		},
	}

	err := dbsync.ValidateFilterSpec(spec)
	if err != nil {
		fmt.Println("invalid filter:", err)
	}
}

type Option

type Option func(*options)

Option configures Publisher or Subscriber.

func WithCompression

func WithCompression() Option

WithCompression enables gzip compression of snapshot payloads before push. Typically reduces transfer size by 60-70% for SQLite databases.

func WithDriverName

func WithDriverName(name string) Option

WithDriverName sets the SQL driver name used when opening databases. Defaults to "sqlite". Use "sqlite-trace" to enable SQL tracing.

func WithMaxAge

func WithMaxAge(d time.Duration) Option

WithMaxAge sets the maximum acceptable age for incoming snapshots. The subscriber rejects snapshots whose timestamp is older than this duration, protecting against rollback/replay attacks.

func WithWatchDebounce

func WithWatchDebounce(d time.Duration) Option

WithWatchDebounce sets the debounce window after a change is detected.

func WithWatchInterval

func WithWatchInterval(d time.Duration) Option

WithWatchInterval sets the database polling interval.

type PartialTable

type PartialTable struct {
	Columns []string
	Where   string
}

PartialTable describes a table where only selected columns are published and an optional WHERE clause restricts rows.

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher watches a source database for changes, produces filtered snapshots, and pushes them to all FO subscribers returned by the TargetProvider.

func NewPublisher

func NewPublisher(db *sql.DB, targets TargetProvider, filter FilterSpec, savePath string, tlsCfg *tls.Config, opts ...Option) *Publisher

NewPublisher creates a Publisher that resolves push targets via the given TargetProvider. For services that push to a fixed set of FOs, use NewStaticTargetProvider. For dynamic routing via a connectivity routes DB, use NewRoutesTargetProvider.

Parameters:

  • db: the source (BO) database to snapshot
  • targets: provides the list of FO endpoints to push to
  • filter: defines which tables/columns are included
  • savePath: path for the local "save chaude" snapshot file
  • tlsCfg: TLS config for QUIC push (use SyncClientTLSConfig for dev)
Example (Static)
package main

import (
	"context"
	"database/sql"

	"github.com/hazyhaar/pkg/dbsync"
)

func main() {
	var db *sql.DB // your source database

	tlsCfg := dbsync.SyncClientTLSConfig(true) // dev mode

	pub := dbsync.NewPublisher(db, dbsync.NewStaticTargetProvider(
		dbsync.Target{Name: "fo-1", Strategy: "dbsync", Endpoint: "10.0.0.2:9443"},
	), dbsync.FilterSpec{
		FullTables: []string{"posts", "categories"},
	}, "db/public.db", tlsCfg)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	go func() { _ = pub.Start(ctx) }()
	// Publisher now watches db and pushes snapshots to fo-1.

	_ = pub // use pub
}

func NewPublisherWithRoutesDB

func NewPublisherWithRoutesDB(db, routesDB *sql.DB, filter FilterSpec, savePath string, tlsCfg *tls.Config, opts ...Option) *Publisher

NewPublisherWithRoutesDB creates a Publisher that reads push targets from a connectivity routes table. This is the original constructor signature kept for backward compatibility with repvow and other existing callers.

Example
package main

import (
	"context"
	"database/sql"

	"github.com/hazyhaar/pkg/dbsync"
)

func main() {
	var db, routesDB *sql.DB // source + connectivity routes databases

	tlsCfg := dbsync.SyncClientTLSConfig(true)

	pub := dbsync.NewPublisherWithRoutesDB(db, routesDB, dbsync.FilterSpec{
		FullTables: []string{"posts"},
		FilteredTables: map[string]string{
			"comments": "is_hidden = 0",
		},
	}, "db/public.db", tlsCfg)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	go func() { _ = pub.Start(ctx) }()

	_ = pub // use pub
}

func (*Publisher) LastMeta

func (p *Publisher) LastMeta() *SnapshotMeta

LastMeta returns the metadata of the most recently produced snapshot.

func (*Publisher) Ping

func (p *Publisher) Ping(ctx context.Context) error

Ping verifies that the source database is accessible.

func (*Publisher) Start

func (p *Publisher) Start(ctx context.Context) error

Start watches the source database for changes and pushes snapshots to FOs. Blocks until ctx is cancelled.

func (*Publisher) Status

func (p *Publisher) Status() map[string]any

Status returns a JSON-serializable status summary.

type RoutesTargetProvider

type RoutesTargetProvider struct {
	// contains filtered or unexported fields
}

RoutesTargetProvider reads dbsync targets from a connectivity routes table.

func NewRoutesTargetProvider

func NewRoutesTargetProvider(db *sql.DB) *RoutesTargetProvider

NewRoutesTargetProvider creates a TargetProvider backed by the connectivity routes table in db. It queries rows where strategy IN ('dbsync', 'noop') AND service_name LIKE 'dbsync:%'.

func (*RoutesTargetProvider) Targets

func (p *RoutesTargetProvider) Targets(ctx context.Context) ([]Target, error)

Targets queries the routes table for dbsync endpoints.

type SnapshotMeta

type SnapshotMeta struct {
	Version    int64  `json:"version"`
	Hash       string `json:"hash"`       // SHA-256 hex of uncompressed data
	Size       int64  `json:"size"`       // uncompressed file size in bytes
	Timestamp  int64  `json:"timestamp"`  // unix epoch seconds
	Compressed bool   `json:"compressed"` // true if payload is gzip-compressed
}

SnapshotMeta is sent as the first message on the QUIC stream before the raw database bytes follow.

func ProduceSnapshot

func ProduceSnapshot(srcDB *sql.DB, dstPath string, spec FilterSpec) (*SnapshotMeta, error)

ProduceSnapshot creates a filtered copy of srcDB at dstPath.

Steps:

  1. Validate WHERE clauses in the FilterSpec
  2. VACUUM INTO a temporary file (consistent snapshot of entire DB)
  3. Open the copy with the plain "sqlite" driver
  4. Drop tables not in the FilterSpec whitelist
  5. Apply WHERE clauses (FilteredTables)
  6. Truncate non-selected columns (PartialTables)
  7. VACUUM to compact
  8. SHA-256 hash the result

type StaticTargetProvider

type StaticTargetProvider struct {
	// contains filtered or unexported fields
}

StaticTargetProvider returns a fixed list of targets.

func NewStaticTargetProvider

func NewStaticTargetProvider(targets ...Target) *StaticTargetProvider

NewStaticTargetProvider creates a TargetProvider from a fixed list of targets.

func (*StaticTargetProvider) Targets

func (p *StaticTargetProvider) Targets(_ context.Context) ([]Target, error)

Targets returns the static list.

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber listens for incoming snapshot pushes over QUIC, verifies integrity, and performs an atomic swap of the local read-only database.

func NewSubscriber

func NewSubscriber(dbPath, listenAddr string, tlsCfg *tls.Config, opts ...Option) *Subscriber

NewSubscriber creates a Subscriber that listens on listenAddr for snapshot pushes and maintains a read-only database at dbPath.

Example
package main

import (
	"context"
	"log/slog"

	"github.com/hazyhaar/pkg/dbsync"
)

func main() {
	tlsCfg := dbsync.SyncClientTLSConfig(true) // would be SyncTLSConfig in prod

	sub := dbsync.NewSubscriber("db/public.db", ":9443", tlsCfg)
	sub.OnSwap(func() {
		slog.Info("database swapped", "version", sub.Version())
		// Recreate service instances with sub.DB()
	})

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	go func() { _ = sub.Start(ctx) }()

	_ = sub // use sub
}

func (*Subscriber) Close

func (s *Subscriber) Close() error

Close closes the current database connection.

func (*Subscriber) DB

func (s *Subscriber) DB() *sql.DB

DB returns the current read-only database connection. May return nil if no snapshot has been received yet.

func (*Subscriber) LastSwapAt

func (s *Subscriber) LastSwapAt() int64

LastSwapAt returns the unix timestamp of the last successful database swap, or 0 if no swap has occurred yet.

func (*Subscriber) OnSwap

func (s *Subscriber) OnSwap(fn func())

OnSwap registers a callback invoked after a successful database swap. Callbacks are called synchronously in registration order.

func (*Subscriber) Ping

func (s *Subscriber) Ping(ctx context.Context) error

Ping reports whether the subscriber is healthy. It returns nil if a snapshot has been received and the database is accessible, or an error describing what is missing (no snapshot, DB unreachable).

func (*Subscriber) StaleSince

func (s *Subscriber) StaleSince() time.Duration

StaleSince returns how long since the last successful swap. Returns -1 if no swap has occurred yet.

func (*Subscriber) Start

func (s *Subscriber) Start(ctx context.Context) error

Start listens for incoming snapshots over QUIC. Blocks until ctx is cancelled.

func (*Subscriber) Status

func (s *Subscriber) Status() map[string]any

Status returns a JSON-serializable status summary.

func (*Subscriber) Version

func (s *Subscriber) Version() int64

Version returns the version of the last received snapshot.

type Target

type Target struct {
	Name     string // identifier, e.g. "fo-1"
	Strategy string // "dbsync" or "noop"
	Endpoint string // "ip:port" for QUIC dial
}

Target describes a single FO endpoint that receives snapshots.

type TargetProvider

type TargetProvider interface {
	Targets(ctx context.Context) ([]Target, error)
}

TargetProvider returns the list of sync targets that a Publisher should push snapshots to. Implementations range from a static list to a database-backed route table.

type WriteProxy

type WriteProxy struct {
	// contains filtered or unexported fields
}

WriteProxy forwards HTTP requests from FO to the BO endpoint. In FO mode, write operations (POST, PUT, DELETE) are proxied to the BO which owns the read-write database.

func NewWriteProxy

func NewWriteProxy(boEndpoint string, tlsCfg *tls.Config) (*WriteProxy, error)

NewWriteProxy creates a reverse proxy that forwards requests to boEndpoint. boEndpoint must be a valid HTTP(S) URL, e.g. "https://bo.internal:8443". Returns an error if the endpoint URL cannot be parsed.

func (*WriteProxy) Handler

func (p *WriteProxy) Handler() http.Handler

Handler returns an http.Handler that proxies all requests to the BO.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL