You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
kratos/contrib/config/polaris/watcher.go

81 lines
1.7 KiB

package config
import (
"fmt"
"path/filepath"
"strings"
"github.com/polarismesh/polaris-go"
"github.com/polarismesh/polaris-go/pkg/model"
"github.com/go-kratos/kratos/v2/config"
"github.com/go-kratos/kratos/v2/log"
)
type Watcher struct {
configFile polaris.ConfigFile
fullPath string
}
type eventChan struct {
closed bool
event chan model.ConfigFileChangeEvent
}
var eventChanMap = make(map[string]eventChan)
func getFullPath(namespace string, fileGroup string, fileName string) string {
return fmt.Sprintf("%s/%s/%s", namespace, fileGroup, fileName)
}
func receive(event model.ConfigFileChangeEvent) {
meta := event.ConfigFileMetadata
ec := eventChanMap[getFullPath(meta.GetNamespace(), meta.GetFileGroup(), meta.GetFileName())]
defer func() {
if err := recover(); err != nil {
log.Error(err)
}
}()
if !ec.closed {
ec.event <- event
}
}
func newWatcher(configFile polaris.ConfigFile) *Watcher {
configFile.AddChangeListener(receive)
fullPath := getFullPath(configFile.GetNamespace(), configFile.GetFileGroup(), configFile.GetFileName())
if _, ok := eventChanMap[fullPath]; !ok {
eventChanMap[fullPath] = eventChan{
closed: false,
event: make(chan model.ConfigFileChangeEvent),
}
}
w := &Watcher{
configFile: configFile,
fullPath: fullPath,
}
return w
}
func (w *Watcher) Next() ([]*config.KeyValue, error) {
ec := eventChanMap[w.fullPath]
event := <-ec.event
return []*config.KeyValue{
{
Key: w.configFile.GetFileName(),
Value: []byte(event.NewValue),
Format: strings.TrimPrefix(filepath.Ext(w.configFile.GetFileName()), "."),
},
}, nil
}
func (w *Watcher) Stop() error {
ec := eventChanMap[w.fullPath]
if !ec.closed {
ec.closed = true
close(ec.event)
}
return nil
}