You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
sonic/util/count_cache.go

75 lines
1.5 KiB
Go

package util
import (
"sync"
"time"
)
type CounterCache[K comparable] struct {
rwMu sync.RWMutex
countCache map[K]int64
batchIncr func(countCache map[K]int64)
singleIncr func(key K, count int64)
refreshDuration time.Duration
}
func NewCounterCache[K comparable](refreshDuration time.Duration, batchIncr func(map[K]int64), singleIncr func(K, int64)) *CounterCache[K] {
c := &CounterCache[K]{
countCache: make(map[K]int64),
batchIncr: batchIncr,
singleIncr: singleIncr,
refreshDuration: refreshDuration,
}
go c.startFlushTicker()
return c
}
func (c *CounterCache[K]) IncrBy(key K, value int64) int64 {
val := c.incrCacheBy(key, value)
return val
}
func (c *CounterCache[K]) incrCacheBy(key K, value int64) int64 {
c.rwMu.Lock()
defer c.rwMu.Unlock()
count := c.countCache[key]
count += value
c.countCache[key] = count
return count
}
func (c *CounterCache[K]) Get(key K) int64 {
cacheVal := c.get(key)
return cacheVal
}
func (c *CounterCache[K]) get(key K) int64 {
c.rwMu.RLock()
defer c.rwMu.RUnlock()
return c.countCache[key]
}
func (c *CounterCache[K]) startFlushTicker() {
ticker := time.NewTicker(c.refreshDuration)
defer ticker.Stop()
for range ticker.C {
c.flush()
}
}
func (c *CounterCache[K]) flush() {
var oldCountCache map[K]int64
c.rwMu.Lock()
oldCountCache = c.countCache
c.countCache = make(map[K]int64)
c.rwMu.Unlock()
if c.batchIncr != nil {
c.batchIncr(oldCountCache)
return
}
for key, value := range oldCountCache {
c.singleIncr(key, value)
}
}