Documentation
¶
Index ¶
- Constants
- type Message
- type TimeQueue
- func (tq *TimeQueue) Drain() []Message
- func (tq *TimeQueue) Messages() <-chan Message
- func (tq *TimeQueue) Push(at time.Time, data interface{}) *Message
- func (tq *TimeQueue) PushAll(messages ...*Message)
- func (tq *TimeQueue) Remove(m *Message) bool
- func (tq *TimeQueue) Start() bool
- func (tq *TimeQueue) Stop() bool
Examples ¶
Constants ¶
const (
DefaultCapacity = 0
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Message ¶
type Message struct {
// contains filtered or unexported fields
}
Message is a container type that associates a Time with some arbitrary data. A Message is "released" from a TimeQueue as close to Time At as possible.
Message zero values are not in a valid state. You should use NewMessage to create Message instances.
func NewMessage ¶ added in v0.2.0
NewMessage returns a Message with at and data set on their corresponding fields.
You should use this function to create Messages instead of using a struct initializer.
func (*Message) Data ¶
func (m *Message) Data() interface{}
Data returns the data associated with m.
This will usually be used after receiving a Message from a TimeQueue in order to process the Message appropriately.
func (Message) Len ¶ added in v0.2.0
func (mh Message) Len() int
Len is the heap.Interface implementation. It returns len(mh).
func (Message) Pop ¶ added in v0.2.0
func (mh Message) Pop() interface{}
Pop is the heap.Interface implementation.
type TimeQueue ¶
type TimeQueue struct {
// contains filtered or unexported fields
}
Example ¶
package main
import (
"context"
"fmt"
"time"
"github.com/aqua/timequeue"
)
func main() {
now := time.Now()
tq := timequeue.NewTimeQueue()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
stopped := make(chan struct{})
go func() {
defer close(stopped)
<-ctx.Done()
tq.Stop()
}()
doneProducing := make(chan struct{})
go func() {
defer close(doneProducing)
const count = 10
toPush := make([]*timequeue.Message, count)
for i := 0; i < count; i++ {
m := timequeue.NewMessage(now.Add(time.Duration(i)), i+1)
toPush[i] = m
}
tq.PushAll(toPush...)
}()
doneConsuming := make(chan struct{})
go func() {
defer close(doneConsuming)
for {
select {
case <-stopped:
return
case m := <-tq.Messages():
fmt.Println(m.Data().(int))
}
}
}()
<-doneProducing
<-stopped
<-doneConsuming
}
Output: 1 2 3 4 5 6 7 8 9 10
func NewTimeQueue ¶ added in v0.2.0
func NewTimeQueue() *TimeQueue