You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
sonic/event/event_bus.go

71 lines
1.8 KiB
Go

2 years ago
package event
import (
"context"
"reflect"
"sync"
"go.uber.org/zap"
"github.com/go-sonic/sonic/log"
)
type Listener func(ctx context.Context, event Event) error
type Bus interface {
Publish(ctx context.Context, event Event)
Subscribe(eventType string, listener Listener)
UnSubscribe(eventType string, listener Listener)
}
type syncLocalBus struct {
listeners sync.Map
logger *zap.Logger
}
func NewSyncEventBus(logger *zap.Logger) Bus {
return &syncLocalBus{
logger: logger,
}
}
func (e *syncLocalBus) Publish(ctx context.Context, event Event) {
defer func() {
if err := recover(); err != nil {
log.CtxError(ctx, "event panic", zap.String("event", event.EventType()), zap.Stack("stack"), zap.Any("err", err))
}
}()
if listeners, ok := e.listeners.Load(event.EventType()); ok {
for _, listener := range listeners.([]Listener) {
err := listener(ctx, event)
if err != nil {
e.logger.Error("error in event listener", zap.Any("event", event.EventType()), zap.Error(err))
}
}
}
}
func (e *syncLocalBus) Subscribe(eventType string, listener Listener) {
if listeners, ok := e.listeners.Load(eventType); ok {
listeners = append(listeners.([]Listener), listener)
e.listeners.Store(eventType, listeners)
} else {
listeners := make([]Listener, 0)
listeners = append(listeners, listener)
e.listeners.Store(eventType, listeners)
}
}
func (e *syncLocalBus) UnSubscribe(eventType string, listener Listener) {
if listeners, ok := e.listeners.Load(eventType); ok && len(listeners.([]Listener)) > 0 {
target := reflect.ValueOf(listener).Pointer()
var filtered []Listener
for _, i := range listeners.([]Listener) {
if reflect.ValueOf(i).Pointer() != target {
filtered = append(filtered, i)
}
}
e.listeners.Store(eventType, filtered)
}
}