You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
kratos/contrib/opensergo/opensergo.go

139 lines
3.4 KiB

package opensergo
import (
"encoding/json"
"io/ioutil"
"net"
"net/url"
"os"
"strconv"
"time"
"github.com/go-kratos/kratos/v2"
v1 "github.com/opensergo/opensergo-go/proto/service_contract/v1"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/reflect/protoregistry"
)
type Option func(*options)
func WithEndpoint(endpoint string) Option {
return func(o *options) {
o.Endpoint = endpoint
}
}
type options struct {
Endpoint string `json:"endpoint"`
}
func (o *options) ParseJSON(data []byte) error {
return json.Unmarshal(data, o)
}
type OpenSergo struct {
mdClient v1.MetadataServiceClient
}
func New(opts ...Option) (*OpenSergo, error) {
opt := options{
Endpoint: os.Getenv("OPENSERGO_ENDPOINT"),
}
// https://github.com/opensergo/opensergo-specification/blob/main/specification/en/README.md
if v := os.Getenv("OPENSERGO_BOOTSTRAP"); v != "" {
if err := opt.ParseJSON([]byte(v)); err != nil {
return nil, err
}
}
if v := os.Getenv("OPENSERGO_BOOTSTRAP_CONFIG"); v != "" {
b, err := ioutil.ReadFile(v)
if err != nil {
return nil, err
}
if err := opt.ParseJSON(b); err != nil {
return nil, err
}
}
for _, o := range opts {
o(&opt)
}
dialCtx := context.Background()
dialCtx, cancel := context.WithTimeout(dialCtx, time.Second)
defer cancel()
conn, err := grpc.DialContext(dialCtx, opt.Endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}
return &OpenSergo{
mdClient: v1.NewMetadataServiceClient(conn),
}, nil
}
func (s *OpenSergo) ReportMetadata(ctx context.Context, app kratos.AppInfo) error {
services, err := s.listServiceDescriptors()
if err != nil {
return err
}
serviceMetadata := &v1.ServiceMetadata{
ServiceContract: &v1.ServiceContract{
Services: services,
},
}
for _, endpoint := range app.Endpoint() {
u, err := url.Parse(endpoint) //nolint
if err != nil {
return err
}
host, port, err := net.SplitHostPort(u.Host)
if err != nil {
return err
}
portValue, err := strconv.Atoi(port)
if err != nil {
return err
}
serviceMetadata.Protocols = append(serviceMetadata.Protocols, u.Scheme)
serviceMetadata.ListeningAddresses = append(serviceMetadata.ListeningAddresses, &v1.SocketAddress{
Address: host,
PortValue: uint32(portValue),
})
}
_, err = s.mdClient.ReportMetadata(ctx, &v1.ReportMetadataRequest{
AppName: app.Name(),
ServiceMetadata: []*v1.ServiceMetadata{serviceMetadata},
})
return err
}
func (s *OpenSergo) listServiceDescriptors() (services []*v1.ServiceDescriptor, err error) {
protoregistry.GlobalFiles.RangeFiles(func(fd protoreflect.FileDescriptor) bool {
for i := 0; i < fd.Services().Len(); i++ {
var (
methods []*v1.MethodDescriptor
sd = fd.Services().Get(i)
)
for j := 0; j < sd.Methods().Len(); j++ {
md := sd.Methods().Get(j)
mName := string(md.Name())
inputType := string(md.Input().FullName())
outputType := string(md.Output().FullName())
methodDesc := v1.MethodDescriptor{
Name: mName,
InputTypes: []string{inputType},
OutputTypes: []string{outputType},
}
methods = append(methods, &methodDesc)
}
services = append(services, &v1.ServiceDescriptor{
Name: string(sd.Name()),
Methods: methods,
})
}
return true
})
return
}