package consul

import (
	"github.com/go-kratos/kratos/v2/config"
	"github.com/hashicorp/consul/api"
	"github.com/hashicorp/consul/api/watch"
)

type watcher struct {
	source    *source
	ch        chan interface{}
	closeChan chan struct{}
	wp        *watch.Plan
}

func (w *watcher) handle(idx uint64, data interface{}) {
	if data == nil {
		return
	}

	_, ok := data.(api.KVPairs)
	if !ok {
		return
	}

	w.ch <- struct{}{}
}

func newWatcher(s *source) (*watcher, error) {
	w := &watcher{
		source:    s,
		ch:        make(chan interface{}),
		closeChan: make(chan struct{}),
	}

	wp, err := watch.Parse(map[string]interface{}{"type": "keyprefix", "prefix": s.options.path})
	if err != nil {
		return nil, err
	}

	wp.Handler = w.handle
	w.wp = wp

	// wp.Run is a blocking call and will prevent newWatcher from returning
	go func() {
		err := wp.RunWithClientAndHclog(s.client, nil)
		if err != nil {
			panic(err)
		}
	}()

	return w, nil
}

func (w *watcher) Next() ([]*config.KeyValue, error) {
	select {
	case _, ok := <-w.ch:
		if !ok {
			return nil, nil
		}
		return w.source.Load()
	case <-w.closeChan:
		return nil, nil
	}
}

func (w *watcher) Stop() error {
	w.wp.Stop()
	close(w.closeChan)
	return nil
}