You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
155 lines
3.2 KiB
155 lines
3.2 KiB
1 year ago
|
package kubernetes
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
v1 "k8s.io/api/core/v1"
|
||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||
|
"k8s.io/client-go/kubernetes"
|
||
|
"k8s.io/client-go/rest"
|
||
|
"k8s.io/client-go/tools/clientcmd"
|
||
|
"os"
|
||
|
"path/filepath"
|
||
|
"strings"
|
||
|
|
||
|
"github.com/go-kratos/kratos/v2/config"
|
||
|
)
|
||
|
|
||
|
// Option is kubernetes option.
|
||
|
type Option func(*options)
|
||
|
|
||
|
type options struct {
|
||
|
// kubernetes namespace
|
||
|
Namespace string
|
||
|
// kubernetes labelSelector example `app=test`
|
||
|
LabelSelector string
|
||
|
// kubernetes fieldSelector example `app=test`
|
||
|
FieldSelector string
|
||
|
// set KubeConfig out-of-cluster Use outside cluster
|
||
|
KubeConfig string
|
||
|
// set master url
|
||
|
Master string
|
||
|
// set TokenData
|
||
|
TokenData string
|
||
|
}
|
||
|
|
||
|
// Namespace with kubernetes namespace.
|
||
|
func Namespace(ns string) Option {
|
||
|
return func(o *options) {
|
||
|
o.Namespace = ns
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// LabelSelector with kubernetes label selector.
|
||
|
func LabelSelector(label string) Option {
|
||
|
return func(o *options) {
|
||
|
o.LabelSelector = label
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// FieldSelector with kubernetes field selector.
|
||
|
func FieldSelector(field string) Option {
|
||
|
return func(o *options) {
|
||
|
o.FieldSelector = field
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// KubeConfig with kubernetes config.
|
||
|
func KubeConfig(config string) Option {
|
||
|
return func(o *options) {
|
||
|
o.KubeConfig = config
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Master with kubernetes master.
|
||
|
func Master(master string) Option {
|
||
|
return func(o *options) {
|
||
|
o.Master = master
|
||
|
}
|
||
|
}
|
||
|
|
||
|
type kube struct {
|
||
|
opts options
|
||
|
client *kubernetes.Clientset
|
||
|
}
|
||
|
|
||
|
// NewSource new a kubernetes config source.
|
||
|
func NewSource(opts ...Option) config.Source {
|
||
|
op := options{
|
||
|
Namespace: "bcpm-configs",
|
||
|
LabelSelector: "app=bcpm-config-item",
|
||
|
TokenData: os.Getenv("SA_TOKEN_DATA"),
|
||
|
Master: os.Getenv("SA_MASTER_URL"),
|
||
|
}
|
||
|
for _, o := range opts {
|
||
|
o(&op)
|
||
|
}
|
||
|
return &kube{
|
||
|
opts: op,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (k *kube) init() (err error) {
|
||
|
var config *rest.Config
|
||
|
if k.opts.KubeConfig != "" || k.opts.Master != "" {
|
||
|
if config, err = clientcmd.BuildConfigFromFlags(k.opts.Master, k.opts.KubeConfig); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
config.BearerToken = k.opts.TokenData
|
||
|
config.Insecure = true
|
||
|
} else {
|
||
|
if config, err = rest.InClusterConfig(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
if k.client, err = kubernetes.NewForConfig(config); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (k *kube) load() (kvs []*config.KeyValue, err error) {
|
||
|
cmList, err := k.client.
|
||
|
CoreV1().
|
||
|
ConfigMaps(k.opts.Namespace).
|
||
|
List(context.Background(), metav1.ListOptions{
|
||
|
LabelSelector: k.opts.LabelSelector,
|
||
|
FieldSelector: k.opts.FieldSelector,
|
||
|
})
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
for _, cm := range cmList.Items {
|
||
|
kvs = append(kvs, k.configMap(cm)...)
|
||
|
}
|
||
|
return kvs, nil
|
||
|
}
|
||
|
|
||
|
func (k *kube) configMap(cm v1.ConfigMap) (kvs []*config.KeyValue) {
|
||
|
for name, val := range cm.Data {
|
||
|
k := fmt.Sprintf("%s/%s/%s", k.opts.Namespace, cm.Name, name)
|
||
|
|
||
|
kvs = append(kvs, &config.KeyValue{
|
||
|
Key: k,
|
||
|
Value: []byte(val),
|
||
|
Format: strings.TrimPrefix(filepath.Ext(k), "."),
|
||
|
})
|
||
|
}
|
||
|
return kvs
|
||
|
}
|
||
|
|
||
|
func (k *kube) Load() ([]*config.KeyValue, error) {
|
||
|
if k.opts.Namespace == "" {
|
||
|
return nil, errors.New("options namespace not full")
|
||
|
}
|
||
|
if err := k.init(); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
return k.load()
|
||
|
}
|
||
|
|
||
|
func (k *kube) Watch() (config.Watcher, error) {
|
||
|
return newWatcher(k)
|
||
|
}
|