parent
8424c4af83
commit
dbf807b047
@ -0,0 +1,154 @@ |
||||
package kubernetes |
||||
|
||||
import ( |
||||
"context" |
||||
"errors" |
||||
"fmt" |
||||
v1 "k8s.io/api/core/v1" |
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
||||
"k8s.io/client-go/kubernetes" |
||||
"k8s.io/client-go/rest" |
||||
"k8s.io/client-go/tools/clientcmd" |
||||
"os" |
||||
"path/filepath" |
||||
"strings" |
||||
|
||||
"github.com/go-kratos/kratos/v2/config" |
||||
) |
||||
|
||||
// Option is kubernetes option.
|
||||
type Option func(*options) |
||||
|
||||
type options struct { |
||||
// kubernetes namespace
|
||||
Namespace string |
||||
// kubernetes labelSelector example `app=test`
|
||||
LabelSelector string |
||||
// kubernetes fieldSelector example `app=test`
|
||||
FieldSelector string |
||||
// set KubeConfig out-of-cluster Use outside cluster
|
||||
KubeConfig string |
||||
// set master url
|
||||
Master string |
||||
// set TokenData
|
||||
TokenData string |
||||
} |
||||
|
||||
// Namespace with kubernetes namespace.
|
||||
func Namespace(ns string) Option { |
||||
return func(o *options) { |
||||
o.Namespace = ns |
||||
} |
||||
} |
||||
|
||||
// LabelSelector with kubernetes label selector.
|
||||
func LabelSelector(label string) Option { |
||||
return func(o *options) { |
||||
o.LabelSelector = label |
||||
} |
||||
} |
||||
|
||||
// FieldSelector with kubernetes field selector.
|
||||
func FieldSelector(field string) Option { |
||||
return func(o *options) { |
||||
o.FieldSelector = field |
||||
} |
||||
} |
||||
|
||||
// KubeConfig with kubernetes config.
|
||||
func KubeConfig(config string) Option { |
||||
return func(o *options) { |
||||
o.KubeConfig = config |
||||
} |
||||
} |
||||
|
||||
// Master with kubernetes master.
|
||||
func Master(master string) Option { |
||||
return func(o *options) { |
||||
o.Master = master |
||||
} |
||||
} |
||||
|
||||
type kube struct { |
||||
opts options |
||||
client *kubernetes.Clientset |
||||
} |
||||
|
||||
// NewSource new a kubernetes config source.
|
||||
func NewSource(opts ...Option) config.Source { |
||||
op := options{ |
||||
Namespace: "bcpm-configs", |
||||
LabelSelector: "app=bcpm-config-item", |
||||
TokenData: os.Getenv("SA_TOKEN_DATA"), |
||||
Master: os.Getenv("SA_MASTER_URL"), |
||||
} |
||||
for _, o := range opts { |
||||
o(&op) |
||||
} |
||||
return &kube{ |
||||
opts: op, |
||||
} |
||||
} |
||||
|
||||
func (k *kube) init() (err error) { |
||||
var config *rest.Config |
||||
if k.opts.KubeConfig != "" || k.opts.Master != "" { |
||||
if config, err = clientcmd.BuildConfigFromFlags(k.opts.Master, k.opts.KubeConfig); err != nil { |
||||
return err |
||||
} |
||||
config.BearerToken = k.opts.TokenData |
||||
config.Insecure = true |
||||
} else { |
||||
if config, err = rest.InClusterConfig(); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
if k.client, err = kubernetes.NewForConfig(config); err != nil { |
||||
return err |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (k *kube) load() (kvs []*config.KeyValue, err error) { |
||||
cmList, err := k.client. |
||||
CoreV1(). |
||||
ConfigMaps(k.opts.Namespace). |
||||
List(context.Background(), metav1.ListOptions{ |
||||
LabelSelector: k.opts.LabelSelector, |
||||
FieldSelector: k.opts.FieldSelector, |
||||
}) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
for _, cm := range cmList.Items { |
||||
kvs = append(kvs, k.configMap(cm)...) |
||||
} |
||||
return kvs, nil |
||||
} |
||||
|
||||
func (k *kube) configMap(cm v1.ConfigMap) (kvs []*config.KeyValue) { |
||||
for name, val := range cm.Data { |
||||
k := fmt.Sprintf("%s/%s/%s", k.opts.Namespace, cm.Name, name) |
||||
|
||||
kvs = append(kvs, &config.KeyValue{ |
||||
Key: k, |
||||
Value: []byte(val), |
||||
Format: strings.TrimPrefix(filepath.Ext(k), "."), |
||||
}) |
||||
} |
||||
return kvs |
||||
} |
||||
|
||||
func (k *kube) Load() ([]*config.KeyValue, error) { |
||||
if k.opts.Namespace == "" { |
||||
return nil, errors.New("options namespace not full") |
||||
} |
||||
if err := k.init(); err != nil { |
||||
return nil, err |
||||
} |
||||
return k.load() |
||||
} |
||||
|
||||
func (k *kube) Watch() (config.Watcher, error) { |
||||
return newWatcher(k) |
||||
} |
@ -0,0 +1,208 @@ |
||||
package kubernetes |
||||
|
||||
import ( |
||||
"context" |
||||
"log" |
||||
"path/filepath" |
||||
"reflect" |
||||
"strings" |
||||
"testing" |
||||
|
||||
v1 "k8s.io/api/core/v1" |
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
||||
"k8s.io/client-go/kubernetes" |
||||
"k8s.io/client-go/rest" |
||||
"k8s.io/client-go/tools/clientcmd" |
||||
"k8s.io/client-go/util/homedir" |
||||
|
||||
"github.com/go-kratos/kratos/v2/config" |
||||
) |
||||
|
||||
const ( |
||||
testKey = "test_config.json" |
||||
namespace = "default" |
||||
name = "test" |
||||
) |
||||
|
||||
var ( |
||||
keyPath = strings.Join([]string{namespace, name, testKey}, "/") |
||||
objectMeta = metav1.ObjectMeta{ |
||||
Name: name, |
||||
Namespace: namespace, |
||||
Labels: map[string]string{ |
||||
"app": "test", |
||||
}, |
||||
} |
||||
) |
||||
|
||||
func TestSource(t *testing.T) { |
||||
home := homedir.HomeDir() |
||||
s := NewSource( |
||||
Namespace("default"), |
||||
LabelSelector(""), |
||||
KubeConfig(filepath.Join(home, ".kube", "config")), |
||||
) |
||||
kvs, err := s.Load() |
||||
if err != nil { |
||||
t.Error(err) |
||||
} |
||||
for _, v := range kvs { |
||||
t.Log(v) |
||||
} |
||||
} |
||||
|
||||
func ExampleNewSource() { |
||||
conf := config.New( |
||||
config.WithSource( |
||||
NewSource( |
||||
Namespace("mesh"), |
||||
LabelSelector("app=test"), |
||||
KubeConfig(filepath.Join(homedir.HomeDir(), ".kube", "config")), |
||||
), |
||||
), |
||||
) |
||||
err := conf.Load() |
||||
if err != nil { |
||||
log.Panic(err) |
||||
} |
||||
} |
||||
|
||||
func TestConfig(t *testing.T) { |
||||
restConfig, err := rest.InClusterConfig() |
||||
home := homedir.HomeDir() |
||||
|
||||
options := []Option{ |
||||
Namespace(namespace), |
||||
LabelSelector("app=test"), |
||||
} |
||||
|
||||
if err != nil { |
||||
kubeconfig := filepath.Join(home, ".kube", "config") |
||||
restConfig, err = clientcmd.BuildConfigFromFlags("", kubeconfig) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
options = append(options, KubeConfig(kubeconfig)) |
||||
} |
||||
clientSet, err := kubernetes.NewForConfig(restConfig) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
|
||||
clientSetConfigMaps := clientSet.CoreV1().ConfigMaps(namespace) |
||||
|
||||
source := NewSource(options...) |
||||
if _, err = clientSetConfigMaps.Create(context.Background(), &v1.ConfigMap{ |
||||
ObjectMeta: objectMeta, |
||||
Data: map[string]string{ |
||||
testKey: "test config", |
||||
}, |
||||
}, metav1.CreateOptions{}); err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
|
||||
defer func() { |
||||
if err = clientSetConfigMaps.Delete(context.Background(), name, metav1.DeleteOptions{}); err != nil { |
||||
t.Error(err) |
||||
} |
||||
}() |
||||
kvs, err := source.Load() |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
if len(kvs) != 1 || kvs[0].Key != keyPath || string(kvs[0].Value) != "test config" { |
||||
t.Fatal("config error") |
||||
} |
||||
|
||||
w, err := source.Watch() |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
defer func() { |
||||
_ = w.Stop() |
||||
}() |
||||
// create also produce an event, discard it
|
||||
if _, err = w.Next(); err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
|
||||
if _, err = clientSetConfigMaps.Update(context.Background(), &v1.ConfigMap{ |
||||
ObjectMeta: objectMeta, |
||||
Data: map[string]string{ |
||||
testKey: "new config", |
||||
}, |
||||
}, metav1.UpdateOptions{}); err != nil { |
||||
t.Error(err) |
||||
} |
||||
|
||||
if kvs, err = w.Next(); err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
|
||||
if len(kvs) != 1 || kvs[0].Key != keyPath || string(kvs[0].Value) != "new config" { |
||||
t.Fatal("config error") |
||||
} |
||||
} |
||||
|
||||
func TestExtToFormat(t *testing.T) { |
||||
restConfig, err := rest.InClusterConfig() |
||||
home := homedir.HomeDir() |
||||
|
||||
options := []Option{ |
||||
Namespace(namespace), |
||||
LabelSelector("app=test"), |
||||
} |
||||
|
||||
if err != nil { |
||||
kubeconfig := filepath.Join(home, ".kube", "config") |
||||
restConfig, err = clientcmd.BuildConfigFromFlags("", kubeconfig) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
options = append(options, KubeConfig(kubeconfig)) |
||||
} |
||||
clientSet, err := kubernetes.NewForConfig(restConfig) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
|
||||
clientSetConfigMaps := clientSet.CoreV1().ConfigMaps(namespace) |
||||
|
||||
tc := `{"a":1}` |
||||
if _, err = clientSetConfigMaps.Create(context.Background(), &v1.ConfigMap{ |
||||
ObjectMeta: objectMeta, |
||||
Data: map[string]string{ |
||||
testKey: tc, |
||||
}, |
||||
}, metav1.CreateOptions{}); err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
|
||||
defer func() { |
||||
if err = clientSetConfigMaps.Delete(context.Background(), name, metav1.DeleteOptions{}); err != nil { |
||||
t.Error(err) |
||||
} |
||||
}() |
||||
|
||||
source := NewSource(options...) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
|
||||
kvs, err := source.Load() |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
if !reflect.DeepEqual(len(kvs), 1) { |
||||
t.Errorf("len(kvs) = %d", len(kvs)) |
||||
} |
||||
if !reflect.DeepEqual(keyPath, kvs[0].Key) { |
||||
t.Errorf("kvs[0].Key is %s", kvs[0].Key) |
||||
} |
||||
if !reflect.DeepEqual(tc, string(kvs[0].Value)) { |
||||
t.Errorf("kvs[0].Value is %s", kvs[0].Value) |
||||
} |
||||
if !reflect.DeepEqual("json", kvs[0].Format) { |
||||
t.Errorf("kvs[0].Format is %s", kvs[0].Format) |
||||
} |
||||
} |
@ -0,0 +1,61 @@ |
||||
package kubernetes |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
|
||||
v1 "k8s.io/api/core/v1" |
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
||||
"k8s.io/apimachinery/pkg/watch" |
||||
|
||||
"github.com/go-kratos/kratos/v2/config" |
||||
) |
||||
|
||||
type watcher struct { |
||||
k *kube |
||||
watcher watch.Interface |
||||
} |
||||
|
||||
func newWatcher(k *kube) (config.Watcher, error) { |
||||
w, err := k.client.CoreV1().ConfigMaps(k.opts.Namespace).Watch(context.Background(), metav1.ListOptions{ |
||||
LabelSelector: k.opts.LabelSelector, |
||||
FieldSelector: k.opts.FieldSelector, |
||||
}) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return &watcher{ |
||||
k: k, |
||||
watcher: w, |
||||
}, nil |
||||
} |
||||
|
||||
func (w *watcher) Next() ([]*config.KeyValue, error) { |
||||
ResultChan: |
||||
ch := <-w.watcher.ResultChan() |
||||
if ch.Object == nil { |
||||
// 重新获取watcher
|
||||
k8sWatcher, err := w.k.client.CoreV1().ConfigMaps(w.k.opts.Namespace).Watch(context.Background(), metav1.ListOptions{ |
||||
LabelSelector: w.k.opts.LabelSelector, |
||||
FieldSelector: w.k.opts.FieldSelector, |
||||
}) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
w.watcher = k8sWatcher |
||||
goto ResultChan |
||||
} |
||||
cm, ok := ch.Object.(*v1.ConfigMap) |
||||
if !ok { |
||||
return nil, fmt.Errorf("kubernetes Object not ConfigMap") |
||||
} |
||||
if ch.Type == "DELETED" { |
||||
return nil, fmt.Errorf("kubernetes configmap delete %s", cm.Name) |
||||
} |
||||
return w.k.configMap(*cm), nil |
||||
} |
||||
|
||||
func (w *watcher) Stop() error { |
||||
w.watcher.Stop() |
||||
return nil |
||||
} |
@ -0,0 +1,42 @@ |
||||
package kubernetes |
||||
|
||||
import ( |
||||
"context" |
||||
"path/filepath" |
||||
"testing" |
||||
"time" |
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
||||
"k8s.io/client-go/kubernetes" |
||||
"k8s.io/client-go/tools/clientcmd" |
||||
"k8s.io/client-go/util/homedir" |
||||
) |
||||
|
||||
func TestKube(t *testing.T) { |
||||
home := homedir.HomeDir() |
||||
config, err := clientcmd.BuildConfigFromFlags("", filepath.Join(home, ".kube", "config")) |
||||
if err != nil { |
||||
t.Error(err) |
||||
} |
||||
client, err := kubernetes.NewForConfig(config) |
||||
if err != nil { |
||||
t.Error(err) |
||||
} |
||||
cmWatcher, err := client.CoreV1().ConfigMaps("mesh").Watch(context.Background(), metav1.ListOptions{ |
||||
LabelSelector: "app=test", |
||||
// FieldSelector: "",
|
||||
}) |
||||
if err != nil { |
||||
t.Error(err) |
||||
} |
||||
go func() { |
||||
time.Sleep(5 * time.Second) |
||||
cmWatcher.Stop() |
||||
}() |
||||
for c := range cmWatcher.ResultChan() { |
||||
if c.Object == nil { |
||||
return |
||||
} |
||||
t.Log(c.Type, c.Object) |
||||
} |
||||
} |
Loading…
Reference in new issue