lightqueue

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 8, 2025 License: MIT Imports: 15 Imported by: 0

README

LightQueue

A lightweight, high-performance message queue library for Go with SQLite persistence, priority handling, and built-in reliability features.

Features

  • 🚀 High Performance: In-memory priority heap with batch SQLite persistence
  • 🔒 Reliable: Message acknowledgment, retry logic, and dead letter queue
  • ⚡ Lightweight: No external dependencies beyond SQLite
  • 🎯 Priority Support: Process high-priority messages first
  • ⏰ TTL & Cleanup: Automatic message expiration and cleanup
  • 📊 Monitoring: Built-in statistics and health monitoring
  • 🔄 Fault Tolerant: Graceful shutdown and message recovery

Installation

go get github.com/SachinHg/lightqueue

Quick Start

package main

import (
    "fmt"
    "log"
    "time"
    
    "github.com/SachinHg/lightqueue"
)

func main() {
    // Create queue configuration
    config := &lightqueue.QueueConfig{
        Name:           "my_queue",
        DataDir:        "./queue_data",
        BufferSize:     100,
        FlushInterval:  1 * time.Second,
        MaxRetries:     3,
        MessageTTL:     1 * time.Hour,
        EnableMetrics:  true,
    }
    
    // Create and start queue
    queue, err := lightqueue.NewQueue(config)
    if err != nil {
        log.Fatal(err)
    }
    defer queue.Stop()
    
    // Publish a message
    err = queue.Publish("notifications", "Hello, World!", 1)
    if err != nil {
        log.Fatal(err)
    }
    
    // Consume and process message
    message, err := queue.Consume("notifications", "worker-1")
    if err != nil {
        log.Fatal(err)
    }
    
    fmt.Printf("Received: %v\n", message.Data)
    
    // Acknowledge successful processing
    queue.Acknowledge(message.ID)
}

Real-World Use Cases

1. 🎮 Game Event Processing

Perfect for handling game events, achievements, and player notifications with priority levels.

type GameEvent struct {
    PlayerID string `json:"player_id"`
    Event    string `json:"event"`
    Data     map[string]interface{} `json:"data"`
}

func setupGameEventQueue() *lightqueue.Queue {
    config := &lightqueue.QueueConfig{
        Name:           "game_events",
        DataDir:        "./game_data",
        BufferSize:     500,
        FlushInterval:  2 * time.Second,
        MaxRetries:     2,
        MessageTTL:     30 * time.Minute,
    }
    
    queue, _ := lightqueue.NewQueue(config)
    return queue
}

func publishGameEvent(queue *lightqueue.Queue, event GameEvent, priority int) {
    // Priority: 5=Critical (player death), 3=Important (level up), 1=Normal (item pickup)
    queue.Publish("game_events", event, priority)
}

func processGameEvents(queue *lightqueue.Queue) {
    for {
        message, err := queue.Consume("game_events", "game-processor")
        if err != nil {
            time.Sleep(100 * time.Millisecond)
            continue
        }
        
        event := message.Data.(GameEvent)
        if processEvent(event) {
            queue.Acknowledge(message.ID)
        } else {
            queue.Reject(message.ID, "processing failed")
        }
    }
}
2. 📧 Email Service with Priority

Handle email notifications with different priority levels and retry logic.

type EmailJob struct {
    To       string `json:"to"`
    Subject  string `json:"subject"`
    Body     string `json:"body"`
    Template string `json:"template"`
}

func setupEmailQueue() *lightqueue.Queue {
    config := &lightqueue.QueueConfig{
        Name:           "email_service",
        DataDir:        "./email_data",
        BufferSize:     200,
        FlushInterval:  5 * time.Second,
        MaxRetries:     5, // Important for email delivery
        MessageTTL:     24 * time.Hour,
    }
    
    queue, _ := lightqueue.NewQueue(config)
    return queue
}

func sendEmails() {
    queue := setupEmailQueue()
    defer queue.Stop()
    
    // High priority: Password reset
    queue.Publish("emails", EmailJob{
        To: "[email protected]", 
        Subject: "Password Reset",
        Template: "password_reset",
    }, 10)
    
    // Medium priority: Order confirmation
    queue.Publish("emails", EmailJob{
        To: "[email protected]",
        Subject: "Order Confirmation",
        Template: "order_confirm",
    }, 5)
    
    // Low priority: Newsletter
    queue.Publish("emails", EmailJob{
        To: "[email protected]",
        Subject: "Weekly Newsletter",
        Template: "newsletter",
    }, 1)
    
    // Process emails
    go func() {
        for {
            message, err := queue.Consume("emails", "email-worker")
            if err != nil {
                time.Sleep(1 * time.Second)
                continue
            }
            
            job := message.Data.(EmailJob)
            if sendEmail(job) {
                queue.Acknowledge(message.ID)
            } else {
                queue.Reject(message.ID, "SMTP error")
            }
        }
    }()
}
3. 🖼️ Image Processing Pipeline

Process user uploads with different priorities and handle large workloads.

type ImageTask struct {
    ImageID    string   `json:"image_id"`
    UserID     string   `json:"user_id"`
    Operations []string `json:"operations"` // resize, compress, watermark
    Sizes      []string `json:"sizes"`      // thumbnail, medium, large
}

func setupImageProcessing() {
    queue, _ := lightqueue.NewQueue(&lightqueue.QueueConfig{
        Name:           "image_processor",
        DataDir:        "./image_data",
        BufferSize:     1000,
        FlushInterval:  3 * time.Second,
        MaxRetries:     3,
        MessageTTL:     2 * time.Hour,
    })
    
    // Premium user uploads (high priority)
    queue.Publish("images", ImageTask{
        ImageID: "img_123",
        UserID:  "premium_user",
        Operations: []string{"resize", "compress", "watermark"},
        Sizes: []string{"thumbnail", "medium", "large"},
    }, 8)
    
    // Regular user uploads (normal priority)
    queue.Publish("images", ImageTask{
        ImageID: "img_456", 
        UserID:  "regular_user",
        Operations: []string{"resize", "compress"},
        Sizes: []string{"thumbnail", "medium"},
    }, 3)
    
    // Start multiple workers
    for i := 0; i < 4; i++ {
        go imageWorker(queue, fmt.Sprintf("worker-%d", i))
    }
}

func imageWorker(queue *lightqueue.Queue, workerID string) {
    for {
        message, err := queue.Consume("images", workerID)
        if err != nil {
            time.Sleep(500 * time.Millisecond)
            continue
        }
        
        task := message.Data.(ImageTask)
        if processImage(task) {
            queue.Acknowledge(message.ID)
        } else {
            queue.Reject(message.ID, "processing failed")
        }
    }
}
4. 📊 Analytics Event Collection

Collect and process analytics events with batching and priority handling.

type AnalyticsEvent struct {
    UserID    string                 `json:"user_id"`
    Event     string                 `json:"event"`
    Properties map[string]interface{} `json:"properties"`
    Timestamp time.Time              `json:"timestamp"`
}

func setupAnalyticsQueue() *lightqueue.Queue {
    config := &lightqueue.QueueConfig{
        Name:           "analytics",
        DataDir:        "./analytics_data",
        BufferSize:     2000, // High volume
        FlushInterval:  10 * time.Second,
        MaxRetries:     2,
        MessageTTL:     6 * time.Hour,
    }
    
    queue, _ := lightqueue.NewQueue(config)
    return queue
}

func trackEvent(queue *lightqueue.Queue, event AnalyticsEvent) {
    priority := 1 // Default priority
    
    // Higher priority for conversion events
    if event.Event == "purchase" || event.Event == "signup" {
        priority = 7
    }
    
    queue.Publish("analytics", event, priority)
}

func processAnalytics(queue *lightqueue.Queue) {
    batch := make([]AnalyticsEvent, 0, 50)
    ticker := time.NewTicker(30 * time.Second)
    
    for {
        select {
        case <-ticker.C:
            if len(batch) > 0 {
                sendToAnalyticsDB(batch)
                batch = batch[:0]
            }
            
        default:
            message, err := queue.Consume("analytics", "analytics-processor")
            if err != nil {
                time.Sleep(100 * time.Millisecond)
                continue
            }
            
            event := message.Data.(AnalyticsEvent)
            batch = append(batch, event)
            queue.Acknowledge(message.ID)
            
            // Send batch when full
            if len(batch) >= 50 {
                sendToAnalyticsDB(batch)
                batch = batch[:0]
            }
        }
    }
}
5. 🤖 AI/ML Model Training Queue

Manage AI model training jobs with priority and resource allocation.

type TrainingJob struct {
    JobID       string            `json:"job_id"`
    ModelType   string            `json:"model_type"`
    DatasetPath string            `json:"dataset_path"`
    Parameters  map[string]interface{} `json:"parameters"`
    UserID      string            `json:"user_id"`
    GPURequired bool              `json:"gpu_required"`
}

func setupTrainingQueue() *lightqueue.Queue {
    config := &lightqueue.QueueConfig{
        Name:           "ml_training",
        DataDir:        "./training_data",
        BufferSize:     100,
        FlushInterval:  30 * time.Second,
        MaxRetries:     2,
        MessageTTL:     48 * time.Hour, // Long-running jobs
    }
    
    queue, _ := lightqueue.NewQueue(config)
    return queue
}

func submitTrainingJob(queue *lightqueue.Queue, job TrainingJob) {
    priority := 3 // Default priority
    
    // Higher priority for premium users or urgent models
    if job.UserID == "premium_user" || job.ModelType == "critical_model" {
        priority = 9
    }
    
    // Lower priority for experimental jobs
    if job.ModelType == "experimental" {
        priority = 1
    }
    
    queue.Publish("training", job, priority)
}

func trainingWorker(queue *lightqueue.Queue) {
    for {
        message, err := queue.Consume("training", "gpu-worker-1")
        if err != nil {
            time.Sleep(5 * time.Second)
            continue
        }
        
        job := message.Data.(TrainingJob)
        
        if trainModel(job) {
            queue.Acknowledge(message.ID)
        } else {
            queue.Reject(message.ID, "training failed")
        }
    }
}

Configuration

QueueConfig Options
type QueueConfig struct {
    Name           string        // Queue name (required)
    DataDir        string        // Directory for SQLite database
    BufferSize     int           // In-memory buffer size before flush
    FlushInterval  time.Duration // How often to flush to database
    MaxRetries     int           // Max retry attempts for failed messages
    RetryDelay     time.Duration // Delay between retries
    MessageTTL     time.Duration // Time-to-live for messages
    EnableMetrics  bool          // Enable statistics collection
}
Default Configuration
func DefaultQueueConfig() *QueueConfig {
    return &QueueConfig{
        Name:           "default_queue",
        DataDir:        "./queue_data",
        BufferSize:     100,
        FlushInterval:  5 * time.Second,
        MaxRetries:     3,
        RetryDelay:     1 * time.Minute,
        MessageTTL:     24 * time.Hour,
        EnableMetrics:  true,
    }
}

API Reference

Core Methods
Publishing Messages
// Publish a message with priority
err := queue.Publish(topic, data, priority)
Consuming Messages
// Consume next highest priority message
message, err := queue.Consume(topic, consumerName)
Message Acknowledgment
// Acknowledge successful processing
err := queue.Acknowledge(messageID)

// Reject message (will retry or go to dead letter)
err := queue.Reject(messageID, reason)
Queue Management
// Get queue statistics
stats := queue.GetStats()

// Force flush to database
err := queue.Flush()

// Cleanup expired messages
err := queue.CleanupExpiredMessages()

// Graceful shutdown
queue.Stop()
Message Structure
type Message struct {
    ID          string      `json:"id"`
    Topic       string      `json:"topic"`
    Data        interface{} `json:"data"`
    Priority    int         `json:"priority"`
    Timestamp   time.Time   `json:"timestamp"`
    Attempts    int         `json:"attempts"`
    MaxAttempts int         `json:"max_attempts"`
    Status      MessageStatus `json:"status"`
    Consumer    string      `json:"consumer"`
    ExpiresAt   *time.Time  `json:"expires_at"`
}

Monitoring and Statistics

stats := queue.GetStats()
fmt.Printf("Pending: %d\n", stats.PendingCount)
fmt.Printf("Processing: %d\n", stats.ProcessingCount)
fmt.Printf("Completed: %d\n", stats.CompletedCount)
fmt.Printf("Failed: %d\n", stats.FailedCount)
fmt.Printf("Retry Count: %d\n", stats.RetryCount)

Best Practices

1. Priority Assignment
  • 10: Critical system messages
  • 7-9: High priority user actions
  • 4-6: Normal business operations
  • 1-3: Background tasks and cleanup
2. Error Handling
message, err := queue.Consume("topic", "worker")
if err != nil {
    // Handle consumption errors
    log.Printf("Failed to consume: %v", err)
    time.Sleep(1 * time.Second)
    continue
}

// Process message
if err := processMessage(message); err != nil {
    // Reject on processing failure (will retry)
    queue.Reject(message.ID, err.Error())
} else {
    // Acknowledge on success
    queue.Acknowledge(message.ID)
}
3. Graceful Shutdown
func main() {
    queue, _ := lightqueue.NewQueue(config)
    
    // Handle shutdown signals
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, syscall.SIGTERM)
    
    go func() {
        <-c
        log.Println("Shutting down gracefully...")
        queue.Stop()
        os.Exit(0)
    }()
    
    // Your application logic
}
4. Multiple Workers
// Start multiple consumer workers
for i := 0; i < numWorkers; i++ {
    go func(workerID int) {
        for {
            message, err := queue.Consume("work", fmt.Sprintf("worker-%d", workerID))
            if err != nil {
                time.Sleep(time.Second)
                continue
            }
            
            // Process message
            processWork(message)
            queue.Acknowledge(message.ID)
        }
    }(i)
}

Performance Considerations

  • Batch Size: Larger BufferSize improves write performance but uses more memory
  • Flush Interval: Shorter intervals provide better durability, longer intervals improve performance
  • Workers: Scale consumers based on processing time and throughput requirements
  • TTL: Set appropriate message TTL to prevent database bloat

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BatchProcessor

type BatchProcessor struct {
	// contains filtered or unexported fields
}

BatchProcessor handles message batching

func NewBatchProcessor

func NewBatchProcessor(batchSize int, batchTimeout time.Duration) *BatchProcessor

NewBatchProcessor creates a new batch processor

func (*BatchProcessor) Stop

func (bp *BatchProcessor) Stop()

Stop gracefully stops the batch processor

func (*BatchProcessor) Submit

func (bp *BatchProcessor) Submit(msg *Message) error

Submit adds a message to the batch processor

type CircuitBreaker

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker prevents cascade failures

func NewCircuitBreaker

func NewCircuitBreaker(failureThreshold int, resetTimeout time.Duration) *CircuitBreaker

NewCircuitBreaker creates a new circuit breaker

func (*CircuitBreaker) Execute

func (cb *CircuitBreaker) Execute(operation func() error) error

Execute runs a function with circuit breaker protection

func (*CircuitBreaker) GetState

func (cb *CircuitBreaker) GetState() CircuitState

GetState returns the current circuit breaker state

func (*CircuitBreaker) GetStats

func (cb *CircuitBreaker) GetStats() map[string]interface{}

GetStats returns circuit breaker statistics

type CircuitState

type CircuitState string

CircuitState represents circuit breaker state

const (
	CircuitClosed CircuitState = "closed" // Normal operation
	CircuitOpen   CircuitState = "open"   // Failing, reject requests
	CircuitHalf   CircuitState = "half"   // Testing recovery
)

type ConnectionPool

type ConnectionPool struct {
	// contains filtered or unexported fields
}

ConnectionPool manages database connections

func NewConnectionPool

func NewConnectionPool(maxConnections int, dbPath string) (*ConnectionPool, error)

NewConnectionPool creates a new database connection pool

func (*ConnectionPool) Close

func (cp *ConnectionPool) Close()

Close closes all connections in the pool

func (*ConnectionPool) GetConnection

func (cp *ConnectionPool) GetConnection() (*sql.DB, error)

GetConnection gets a database connection from the pool

func (*ConnectionPool) GetStats

func (cp *ConnectionPool) GetStats() map[string]interface{}

GetStats returns connection pool statistics

func (*ConnectionPool) ReturnConnection

func (cp *ConnectionPool) ReturnConnection(conn *sql.DB)

ReturnConnection returns a connection to the pool

type Consumer

type Consumer struct {
	Name           string                   `json:"name"`
	Group          string                   `json:"group"`
	PendingEntries map[string]*PendingEntry `json:"pending_entries"`
	LastDelivered  string                   `json:"last_delivered"`
	// contains filtered or unexported fields
}

Consumer represents a message consumer

type HealthMonitor

type HealthMonitor struct {
	// contains filtered or unexported fields
}

HealthMonitor tracks queue health and performance

func NewHealthMonitor

func NewHealthMonitor(checkInterval time.Duration) *HealthMonitor

NewHealthMonitor creates a new health monitor

func (*HealthMonitor) AddError

func (hm *HealthMonitor) AddError(err string)

AddError adds an error to the health status

func (*HealthMonitor) AddWarning

func (hm *HealthMonitor) AddWarning(warning string)

AddWarning adds a warning to the health status

func (*HealthMonitor) GetHealth

func (hm *HealthMonitor) GetHealth() *HealthStatus

GetHealth returns the current health status

func (*HealthMonitor) Stop

func (hm *HealthMonitor) Stop()

Stop stops the health monitor

type HealthStatus

type HealthStatus struct {
	Status    string           `json:"status"`
	Message   string           `json:"message"`
	Timestamp time.Time        `json:"timestamp"`
	Metrics   map[string]int64 `json:"metrics"`
	Errors    []string         `json:"errors"`
	Warnings  []string         `json:"warnings"`
}

HealthStatus represents queue health

type MemoryManager

type MemoryManager struct {
	// contains filtered or unexported fields
}

MemoryManager controls memory usage and prevents OOM

func NewMemoryManager

func NewMemoryManager(maxMemoryMB int, threshold float64) *MemoryManager

NewMemoryManager creates a new memory manager

func (*MemoryManager) AllocateMemory

func (mm *MemoryManager) AllocateMemory(mb int) error

AllocateMemory allocates memory and checks limits

func (*MemoryManager) FreeMemory

func (mm *MemoryManager) FreeMemory(mb int)

FreeMemory frees allocated memory

func (*MemoryManager) GetStats

func (mm *MemoryManager) GetStats() map[string]interface{}

GetStats returns memory manager statistics

func (*MemoryManager) Stop

func (mm *MemoryManager) Stop()

Stop stops the memory manager

type Message

type Message struct {
	ID          string        `json:"id"`
	Topic       string        `json:"topic"`
	Data        interface{}   `json:"data"`
	Priority    int           `json:"priority"` // Higher = more important
	Timestamp   time.Time     `json:"timestamp"`
	Attempts    int           `json:"attempts"`
	MaxAttempts int           `json:"max_attempts"`
	Status      MessageStatus `json:"status"`
	Consumer    string        `json:"consumer"` // Which consumer has it
	ExpiresAt   *time.Time    `json:"expires_at"`
	RetryAt     *time.Time    `json:"retry_at"` // When to retry (for exponential backoff)
	// contains filtered or unexported fields
}

Message represents a queue message

type MessageHeap

type MessageHeap []*Message

MessageHeap implements heap.Interface for priority queue

func (MessageHeap) FindByID

func (h MessageHeap) FindByID(id string) *Message

FindByID finds a message by ID without removing it

func (MessageHeap) Len

func (h MessageHeap) Len() int

func (MessageHeap) Less

func (h MessageHeap) Less(i, j int) bool

func (MessageHeap) Peek

func (h MessageHeap) Peek() *Message

Peek returns the highest priority message without removing it

func (*MessageHeap) Pop

func (h *MessageHeap) Pop() interface{}

func (*MessageHeap) Push

func (h *MessageHeap) Push(x interface{})

func (*MessageHeap) RemoveByID

func (h *MessageHeap) RemoveByID(id string) bool

RemoveByID removes a message by ID

func (MessageHeap) Swap

func (h MessageHeap) Swap(i, j int)

func (*MessageHeap) UpdatePriority

func (h *MessageHeap) UpdatePriority(id string, newPriority int) bool

UpdatePriority updates the priority of a message and reorders the heap

type MessageStatus

type MessageStatus string

MessageStatus represents the current status of a message

const (
	StatusPending    MessageStatus = "pending"
	StatusProcessing MessageStatus = "processing"
	StatusCompleted  MessageStatus = "completed"
	StatusFailed     MessageStatus = "failed"
	StatusRetry      MessageStatus = "retry"
	StatusScheduled  MessageStatus = "scheduled" // Scheduled for retry with backoff
)

type PendingEntry

type PendingEntry struct {
	MessageID     string    `json:"message_id"`
	Consumer      string    `json:"consumer"`
	DeliveryTime  time.Time `json:"delivery_time"`
	DeliveryCount int       `json:"delivery_count"`
}

PendingEntry tracks pending messages for acknowledgment

type Queue

type Queue struct {
	Name string
	// contains filtered or unexported fields
}

Queue is the main queue implementation

func NewQueue

func NewQueue(config *QueueConfig) (*Queue, error)

NewQueue creates a new queue

func (*Queue) Acknowledge

func (q *Queue) Acknowledge(messageID string) error

Acknowledge marks a message as completed

func (*Queue) CleanupExpiredMessages

func (q *Queue) CleanupExpiredMessages() error

CleanupExpiredMessages removes expired messages from database and moves them to dead letter

func (*Queue) Consume

func (q *Queue) Consume(topic string, consumerName string) (*Message, error)

Consume gets the next message (fast in-memory) with production scaling

func (*Queue) Flush

func (q *Queue) Flush() error

Flush forces an immediate flush of the write buffer to database

func (*Queue) GetHealth

func (q *Queue) GetHealth() *HealthStatus

GetHealth returns comprehensive health status

func (*Queue) GetProductionStats

func (q *Queue) GetProductionStats() map[string]interface{}

GetProductionStats returns comprehensive production metrics

func (*Queue) GetStats

func (q *Queue) GetStats() *QueueStats

GetStats returns queue statistics

func (*Queue) GracefulShutdown

func (q *Queue) GracefulShutdown(timeout time.Duration) error

GracefulShutdown performs a graceful shutdown of all components

func (*Queue) Publish

func (q *Queue) Publish(topic string, data interface{}, priority int) error

Publish adds a message to the queue (fast in-memory)

func (*Queue) Reject

func (q *Queue) Reject(messageID string, reason string) error

Reject marks a message as failed (will be retried or moved to dead letter)

func (*Queue) ScaleDown

func (q *Queue) ScaleDown(reduceWorkers int) error

ScaleDown decreases worker pool size for low load

func (*Queue) ScaleUp

func (q *Queue) ScaleUp(additionalWorkers int) error

ScaleUp increases worker pool size for high load

func (*Queue) Stop

func (q *Queue) Stop()

Stop gracefully shuts down the queue

type QueueConfig

type QueueConfig struct {
	Name          string        `json:"name"`
	DataDir       string        `json:"data_dir"`
	BufferSize    int           `json:"buffer_size"`
	FlushInterval time.Duration `json:"flush_interval"`
	MaxRetries    int           `json:"max_retries"`
	RetryDelay    time.Duration `json:"retry_delay"`
	MessageTTL    time.Duration `json:"message_ttl"`
	EnableMetrics bool          `json:"enable_metrics"`

	// 🚀 PRODUCTION SCALING CONFIGURATION
	WorkerPoolSize    int  `json:"worker_pool_size"`   // Number of concurrent workers
	MaxConnections    int  `json:"max_connections"`    // Max database connections
	BatchSize         int  `json:"batch_size"`         // Messages per batch
	MaxMemoryMB       int  `json:"max_memory_mb"`      // Memory limit in MB
	EnableCompression bool `json:"enable_compression"` // Compress message data
	AsyncOperations   bool `json:"async_operations"`   // Use async DB operations
	CircuitBreaker    bool `json:"circuit_breaker"`    // Enable circuit breaker
	HealthCheck       bool `json:"health_check"`       // Enable health monitoring

	// 🚨 DEAD LETTER QUEUE CONFIGURATION
	DLQMaxSize         int           `json:"dlq_max_size"`          // Max DLQ entries (0 = unlimited)
	DLQMaxAge          time.Duration `json:"dlq_max_age"`           // Max age for DLQ entries (0 = no expiry)
	DLQCleanupInterval time.Duration `json:"dlq_cleanup_interval"`  // How often to clean DLQ
	DLQEnableAutoPrune bool          `json:"dlq_enable_auto_prune"` // Enable automatic DLQ pruning

	// 🔄 RETRY CONFIGURATION
	RetryBaseDelay  time.Duration `json:"retry_base_delay"` // Base delay for exponential backoff
	RetryMaxDelay   time.Duration `json:"retry_max_delay"`  // Maximum retry delay
	RetryMultiplier float64       `json:"retry_multiplier"` // Exponential backoff multiplier
	RetryJitter     bool          `json:"retry_jitter"`     // Add jitter to prevent thundering herd

	// 🧹 MAINTENANCE CONFIGURATION
	VacuumInterval   time.Duration `json:"vacuum_interval"`    // How often to run VACUUM (0 = disabled)
	EnableAutoVacuum bool          `json:"enable_auto_vacuum"` // Enable automatic VACUUM

	// ⚡ DURABILITY CONFIGURATION
	DurabilityMode  string `json:"durability_mode"`   // "full", "normal", "off" - SQLite synchronous mode
	WriteBufferSize int    `json:"write_buffer_size"` // Size of write buffer before flush
	FlushOnPublish  bool   `json:"flush_on_publish"`  // Force flush on every publish (slower but safer)
	EnableFsync     bool   `json:"enable_fsync"`      // Enable fsync after writes (slower but safer)
}

QueueConfig holds queue configuration

func DefaultQueueConfig

func DefaultQueueConfig() *QueueConfig

DefaultQueueConfig returns default configuration

type QueueStats

type QueueStats struct {
	Name            string    `json:"name"`
	PendingCount    int       `json:"pending_count"`
	ProcessingCount int       `json:"processing_count"`
	CompletedCount  int       `json:"completed_count"`
	FailedCount     int       `json:"failed_count"`
	RetryCount      int       `json:"retry_count"`
	BufferSize      int       `json:"buffer_size"`
	ConsumerCount   int       `json:"consumer_count"`
	LastFlush       time.Time `json:"last_flush"`
	LastMessage     time.Time `json:"last_message"`
	TotalMessages   int       `json:"total_messages"`

	// 🚨 DEAD LETTER QUEUE STATISTICS
	DLQCount       int       `json:"dlq_count"`        // Current DLQ entries
	DLQTotalCount  int       `json:"dlq_total_count"`  // Total DLQ entries ever created
	DLQLastCleanup time.Time `json:"dlq_last_cleanup"` // Last DLQ cleanup time
	DLQPrunedCount int       `json:"dlq_pruned_count"` // Number of entries pruned
}

QueueStats represents queue statistics

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool manages concurrent message processing

func NewWorkerPool

func NewWorkerPool(workerCount int) *WorkerPool

NewWorkerPool creates a new worker pool for concurrent processing

func (*WorkerPool) GetStats

func (wp *WorkerPool) GetStats() map[string]interface{}

GetStats returns worker pool statistics

func (*WorkerPool) Stop

func (wp *WorkerPool) Stop()

Stop gracefully stops the worker pool

func (*WorkerPool) Submit

func (wp *WorkerPool) Submit(msg *Message) error

Submit adds a message to the work queue

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL