Compare commits

...

2 Commits

Author SHA1 Message Date
longXboy b7379f0c6e add xds unmarshal 3 years ago
longXboy 9b34658a5c add xds client 3 years ago
  1. 3
      go.mod
  2. 12
      go.sum
  3. 214
      xds/client/v3/client.go
  4. 214
      xds/client/v3/controller.go
  5. 149
      xds/resource/type.go
  6. 63
      xds/resource/version.go

@ -3,6 +3,7 @@ module github.com/go-kratos/kratos/v2
go 1.16
require (
github.com/envoyproxy/go-control-plane v0.10.1
github.com/fsnotify/fsnotify v1.4.9
github.com/go-kratos/aegis v0.1.1
github.com/go-playground/form/v4 v4.2.0
@ -16,7 +17,7 @@ require (
go.opentelemetry.io/otel/trace v1.0.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
google.golang.org/genproto v0.0.0-20210805201207-89edb61ffb67
google.golang.org/grpc v1.42.0
google.golang.org/grpc v1.43.0
google.golang.org/protobuf v1.27.1
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
)

@ -13,6 +13,8 @@ github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XP
github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1 h1:zH8ljVhhq7yC0MIeUL/IviMtY8hx2mK8cN9wEYb8ggw=
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@ -22,6 +24,9 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ=
github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0=
github.com/envoyproxy/go-control-plane v0.10.1 h1:cgDRLG7bs59Zd+apAWuzLQL95obVYAymNJek76W3mgw=
github.com/envoyproxy/go-control-plane v0.10.1/go.mod h1:AY7fTTXNdv/aJ2O5jwpxAPOWUZ7hQAEvzN5Pf27BkQQ=
github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
@ -108,6 +113,7 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@ -124,7 +130,9 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@ -163,8 +171,8 @@ google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.39.0/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE=
google.golang.org/grpc v1.42.0 h1:XT2/MFpuPFsEX2fWh3YQtHkZ+WYZFQRfaUgLZYj/p6A=
google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
google.golang.org/grpc v1.43.0 h1:Eeu7bZtDZ2DpRCsLhUlcrLnvYaMK1Gz86a+hMVvELmM=
google.golang.org/grpc v1.43.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=

@ -0,0 +1,214 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package v3 provides xDS v3 transport protocol specific functionality.
package v3
import (
"context"
"fmt"
"sync"
"github.com/go-kratos/kratos/v2/errors"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/xds/resource"
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
v3adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
statuspb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/protobuf/proto"
)
const (
reasonSendError = "send_error"
)
// BuildOptions contains options to be passed to client builders.
type BuildOptions struct {
// NodeProto contains the Node proto to be used in xDS requests. The actual
// type depends on the transport protocol version used.
NodeProto proto.Message
// // Backoff returns the amount of time to backoff before retrying broken
// // streams.
// Backoff func(int) time.Duration
// Logger provides enhanced logging capabilities.
Logger *log.Helper
}
var (
resourceTypeToURL = map[resource.ResourceType]string{
resource.ListenerResource: resource.V3ListenerURL,
resource.RouteConfigResource: resource.V3RouteConfigURL,
resource.ClusterResource: resource.V3ClusterURL,
resource.EndpointsResource: resource.V3EndpointsURL,
}
)
// NewClient new xds client
func NewClient(cc *grpc.ClientConn, opts BuildOptions) (*Client, error) {
nodeProto, ok := opts.NodeProto.(*v3corepb.Node)
if !ok {
return nil, fmt.Errorf("xds: unsupported Node proto type: %T, want %T", opts.NodeProto, v3corepb.Node{})
}
c := &Client{
nodeProto: nodeProto,
logger: opts.Logger,
watchMp: make(map[resource.ResourceType]map[string]bool),
versionMp: make(map[resource.ResourceType]string),
nonceMp: make(map[resource.ResourceType]string),
cc: cc,
}
c.ctx, c.cancel = context.WithCancel(context.Background())
return c, nil
}
type adsStream v3adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient
// client performs the actual xDS RPCs using the xDS v3 API. It creates a
// single ADS stream on which the different types of xDS requests and responses
// are multiplexed.
type Client struct {
nodeProto *v3corepb.Node
logger *log.Helper
cc *grpc.ClientConn // Connection to the management server.
watchMp map[resource.ResourceType]map[string]bool
versionMp map[resource.ResourceType]string
nonceMp map[resource.ResourceType]string
ackCh chan *ackAction
watchCh chan *watchAction
ctx context.Context
cancel context.CancelFunc
lk sync.RWMutex
stream adsStream
}
func (c *Client) newStream(ctx context.Context, cc *grpc.ClientConn) (adsStream, error) {
return v3adsgrpc.NewAggregatedDiscoveryServiceClient(cc).StreamAggregatedResources(ctx, grpc.WaitForReady(true))
}
func (c *Client) SendAck(stream adsStream, rType resource.ResourceType, version, nonce, errMsg string) error {
c.lk.Lock()
c.nonceMp[rType] = nonce
s, ok := c.watchMp[rType]
if !ok || len(s) == 0 {
c.lk.Unlock()
// We don't send the request ack if there's no active watch (this can be
// either the server sends responses before any request, or the watch is
// canceled while the ackAction is in queue), because there's no resource
// name. And if we send a request with empty resource name list, the
// server may treat it as a wild card and send us everything.
return errors.NotFound(resource.UnknownResource.String(), rType.String())
}
target := mapToSlice(s)
if version == "" {
// This is a nack, get the previous acked version.
version = c.versionMp[rType]
// version will still be an empty string if rType isn't
// found in versionMap, this can happen if there wasn't any ack
// before.
} else {
c.versionMp[rType] = version
}
c.lk.Unlock()
return c.sendRequest(stream, target, rType, version, nonce, errMsg)
}
func (c *Client) WatchResource(s adsStream, rType resource.ResourceType, resource string, remove bool) error {
c.lk.Lock()
var current map[string]bool
current, ok := c.watchMp[rType]
if !ok {
current = make(map[string]bool)
c.watchMp[rType] = current
}
if remove {
delete(current, resource)
if len(current) == 0 {
delete(c.watchMp, rType)
}
} else {
current[resource] = true
}
target := mapToSlice(current)
// We don't reset version or nonce when a new watch is started. The version
// and nonce from previous response are carried by the request unless the
// stream is recreated.
ver := c.versionMp[rType]
nonce := c.nonceMp[rType]
c.lk.Unlock()
return c.sendRequest(s, target, rType, ver, nonce, "")
}
// sendRequest sends out a DiscoveryRequest for the given resourceNames, of type
// rType, on the provided stream.
//
// version is the ack version to be sent with the request
// - If this is the new request (not an ack/nack), version will be empty.
// - If this is an ack, version will be the version from the response.
// - If this is a nack, version will be the previous acked version (from
// versionMap). If there was no ack before, it will be empty.
func (c *Client) sendRequest(stream adsStream, resourceNames []string, rType resource.ResourceType, version, nonce, errMsg string) error {
req := &v3discoverypb.DiscoveryRequest{
Node: c.nodeProto,
TypeUrl: resourceTypeToURL[rType],
ResourceNames: resourceNames,
VersionInfo: version,
ResponseNonce: nonce,
}
if errMsg != "" {
req.ErrorDetail = &statuspb.Status{
Code: int32(codes.InvalidArgument), Message: errMsg,
}
}
if err := stream.Send(req); err != nil {
return errors.ServiceUnavailable(reasonSendError, fmt.Sprintf("xds: stream.Send(%+v) failed: %v", req, err))
}
c.logger.Debugf("ADS request sent: %v", (req))
return nil
}
// RecvResponse blocks on the receipt of one response message on the provided
// stream.
func (c *Client) RecvResponse(stream adsStream) (proto.Message, error) {
resp, err := stream.Recv()
if err != nil {
return nil, fmt.Errorf("xds: stream.Recv() failed: %v", err)
}
c.logger.Infof("ADS response received, type: %v", resp.GetTypeUrl())
c.logger.Debugf("ADS response received: %+v", (resp))
return resp, nil
}
func mapToSlice(m map[string]bool) []string {
ret := make([]string, 0, len(m))
for i := range m {
ret = append(ret, i)
}
return ret
}
func (c *Client) Close() error {
c.cancel()
return c.cc.Close()
}

@ -0,0 +1,214 @@
package v3
import (
"time"
v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/go-kratos/kratos/v2/errors"
"github.com/go-kratos/kratos/v2/xds/resource"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
)
func (c *Client) run() {
go c.sendLoop()
for {
select {
case <-c.ctx.Done():
return
default:
}
stream, err := c.getStream()
if err != nil {
time.Sleep(time.Second * 3)
continue
}
c.recv(stream)
}
}
func (c *Client) recv(stream adsStream) error {
for {
r, err := c.RecvResponse(stream)
if err != nil {
c.resetStream(c.stream)
return err
}
rType := resource.UnknownResource
resp, ok := r.(*v3discoverypb.DiscoveryResponse)
if !ok {
// send nack
c.ackCh <- &ackAction{
rType: rType,
version: "",
nonce: resp.Nonce,
errMsg: err.Error(),
stream: stream,
}
continue
}
// Note that the xDS transport protocol is versioned independently of
// the resource types, and it is supported to transfer older versions
// of resource types using new versions of the transport protocol, or
// vice-versa. Hence we need to handle v3 type_urls as well here.
url := resp.GetTypeUrl()
switch {
case resource.IsListenerResource(url):
rType = resource.ListenerResource
for _, res := range resp.GetResources() {
lis := &v3listenerpb.Listener{}
err = proto.Unmarshal(res.GetValue(), lis)
if err != nil {
break
}
}
case resource.IsRouteConfigResource(url):
rType = resource.RouteConfigResource
for _, res := range resp.GetResources() {
rc := &v3routepb.RouteConfiguration{}
err = proto.Unmarshal(res.GetValue(), rc)
if err != nil {
break
}
}
case resource.IsClusterResource(url):
rType = resource.ClusterResource
for _, res := range resp.GetResources() {
cluster := &v3clusterpb.Cluster{}
err = proto.Unmarshal(res.GetValue(), cluster)
if err != nil {
break
}
}
case resource.IsEndpointsResource(url):
rType = resource.EndpointsResource
for _, res := range resp.GetResources() {
cla := &v3endpointpb.ClusterLoadAssignment{}
err = proto.Unmarshal(res.GetValue(), cla)
if err != nil {
break
}
}
default:
// Unknown resource type
continue
}
if err != nil {
// send nack
// send nack
c.ackCh <- &ackAction{
rType: rType,
version: "",
nonce: resp.Nonce,
errMsg: err.Error(),
stream: stream,
}
} else {
c.ackCh <- &ackAction{
rType: rType,
version: resp.GetVersionInfo(),
nonce: resp.GetNonce(),
stream: stream,
}
}
}
}
func (c *Client) sendLoop() {
for {
select {
case <-c.ctx.Done():
return
case action := <-c.watchCh:
stream, err := c.getStream()
if err != nil {
c.logger.Errorf("xds client get stream failed!err:=%v", err)
continue
}
err = c.WatchResource(stream, action.rType, action.resource, action.remove)
if err != nil {
c.logger.Errorf("xds client watch resource failed!err:=%v", err)
if errors.IsServiceUnavailable(err) {
c.resetStream(stream)
}
}
case action := <-c.ackCh:
stream, err := c.getStream()
if err != nil {
c.logger.Errorf("xds client get stream failed!err:=%v", err)
continue
}
if action.stream != stream {
continue
}
err = c.SendAck(stream, action.rType, action.version, action.nonce, action.errMsg)
if err != nil {
c.logger.Errorf("xds client send ack failed!err:=%v", err)
if errors.IsServiceUnavailable(err) {
c.resetStream(stream)
}
}
}
}
}
func (c *Client) resetStream(origin adsStream) {
c.lk.Lock()
if origin == c.stream {
c.stream = nil
}
c.lk.Unlock()
}
func (c *Client) getStream() (adsStream, error) {
c.lk.RLock()
if c.stream == nil {
c.lk.RUnlock()
c.lk.Lock()
defer c.lk.Unlock()
if c.stream == nil {
var err error
for i := 0; i < 3; i++ {
c.stream, err = c.newStream(c.ctx, c.cc)
if err == nil {
return c.stream, nil
}
if i < 2 {
time.Sleep(time.Millisecond * 250 * time.Duration(i+1))
}
}
return nil, err
}
return c.stream, nil
}
defer c.lk.RUnlock()
return c.stream, nil
}
type watchAction struct {
rType resource.ResourceType
remove bool // Whether this is to remove watch for the resource.
resource string
}
type ackAction struct {
rType resource.ResourceType
version string // NACK if version is an empty string.
nonce string
errMsg string // Empty unless it's a NACK.
// ACK/NACK are tagged with the stream it's for. When the stream is down,
// all the ACK/NACK for this stream will be dropped, and the version/nonce
// won't be updated.
stream grpc.ClientStream
}

@ -0,0 +1,149 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package resource
import (
"time"
"google.golang.org/protobuf/types/known/anypb"
)
// UpdateValidatorFunc performs validations on update structs using
// context/logic available at the xdsClient layer. Since these validation are
// performed on internal update structs, they can be shared between different
// API clients.
type UpdateValidatorFunc func(interface{}) error
// UpdateMetadata contains the metadata for each update, including timestamp,
// raw message, and so on.
type UpdateMetadata struct {
// Status is the status of this resource, e.g. ACKed, NACKed, or
// Not_exist(removed).
Status ServiceStatus
// Version is the version of the xds response. Note that this is the version
// of the resource in use (previous ACKed). If a response is NACKed, the
// NACKed version is in ErrState.
Version string
// Timestamp is when the response is received.
Timestamp time.Time
// ErrState is set when the update is NACKed.
ErrState *UpdateErrorMetadata
}
// IsListenerResource returns true if the provider URL corresponds to an xDS
// Listener resource.
func IsListenerResource(url string) bool {
return url == V2ListenerURL || url == V3ListenerURL
}
// IsHTTPConnManagerResource returns true if the provider URL corresponds to an xDS
// HTTPConnManager resource.
func IsHTTPConnManagerResource(url string) bool {
return url == V2HTTPConnManagerURL || url == V3HTTPConnManagerURL
}
// IsRouteConfigResource returns true if the provider URL corresponds to an xDS
// RouteConfig resource.
func IsRouteConfigResource(url string) bool {
return url == V2RouteConfigURL || url == V3RouteConfigURL
}
// IsClusterResource returns true if the provider URL corresponds to an xDS
// Cluster resource.
func IsClusterResource(url string) bool {
return url == V2ClusterURL || url == V3ClusterURL
}
// IsEndpointsResource returns true if the provider URL corresponds to an xDS
// Endpoints resource.
func IsEndpointsResource(url string) bool {
return url == V2EndpointsURL || url == V3EndpointsURL
}
// ServiceStatus is the status of the update.
type ServiceStatus int
const (
// ServiceStatusUnknown is the default state, before a watch is started for
// the resource.
ServiceStatusUnknown ServiceStatus = iota
// ServiceStatusRequested is when the watch is started, but before and
// response is received.
ServiceStatusRequested
// ServiceStatusNotExist is when the resource doesn't exist in
// state-of-the-world responses (e.g. LDS and CDS), which means the resource
// is removed by the management server.
ServiceStatusNotExist // Resource is removed in the server, in LDS/CDS.
// ServiceStatusACKed is when the resource is ACKed.
ServiceStatusACKed
// ServiceStatusNACKed is when the resource is NACKed.
ServiceStatusNACKed
)
// UpdateErrorMetadata is part of UpdateMetadata. It contains the error state
// when a response is NACKed.
type UpdateErrorMetadata struct {
// Version is the version of the NACKed response.
Version string
// Err contains why the response was NACKed.
Err error
// Timestamp is when the NACKed response was received.
Timestamp time.Time
}
// UpdateWithMD contains the raw message of the update and the metadata,
// including version, raw message, timestamp.
//
// This is to be used for config dump and CSDS, not directly by users (like
// resolvers/balancers).
type UpdateWithMD struct {
MD UpdateMetadata
Raw *anypb.Any
}
// ResourceType identifies resources in a transport protocol agnostic way. These
// will be used in transport version agnostic code, while the versioned API
// clients will map these to appropriate version URLs.
type ResourceType int
// Version agnostic resource type constants.
const (
UnknownResource ResourceType = iota
ListenerResource
HTTPConnManagerResource
RouteConfigResource
ClusterResource
EndpointsResource
)
func (r ResourceType) String() string {
switch r {
case ListenerResource:
return "ListenerResource"
case HTTPConnManagerResource:
return "HTTPConnManagerResource"
case RouteConfigResource:
return "RouteConfigResource"
case ClusterResource:
return "ClusterResource"
case EndpointsResource:
return "EndpointsResource"
default:
return "UnknownResource"
}
}

@ -0,0 +1,63 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package version defines constants to distinguish between supported xDS API
// versions.
package resource
// TransportAPI refers to the API version for xDS transport protocol. This
// describes the xDS gRPC endpoint and version of DiscoveryRequest/Response used
// on the wire.
type TransportAPI int
const (
// TransportV2 refers to the v2 xDS transport protocol.
TransportV2 TransportAPI = iota
// TransportV3 refers to the v3 xDS transport protocol.
TransportV3
)
// Resource URLs. We need to be able to accept either version of the resource
// regardless of the version of the transport protocol in use.
const (
googleapiPrefix = "type.googleapis.com/"
V2ListenerType = "envoy.api.v2.Listener"
V2RouteConfigType = "envoy.api.v2.RouteConfiguration"
V2ClusterType = "envoy.api.v2.Cluster"
V2EndpointsType = "envoy.api.v2.ClusterLoadAssignment"
V2ListenerURL = googleapiPrefix + V2ListenerType
V2RouteConfigURL = googleapiPrefix + V2RouteConfigType
V2ClusterURL = googleapiPrefix + V2ClusterType
V2EndpointsURL = googleapiPrefix + V2EndpointsType
V2HTTPConnManagerURL = googleapiPrefix + "envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager"
V3ListenerType = "envoy.config.listener.v3.Listener"
V3RouteConfigType = "envoy.config.route.v3.RouteConfiguration"
V3ClusterType = "envoy.config.cluster.v3.Cluster"
V3EndpointsType = "envoy.config.endpoint.v3.ClusterLoadAssignment"
V3ListenerURL = googleapiPrefix + V3ListenerType
V3RouteConfigURL = googleapiPrefix + V3RouteConfigType
V3ClusterURL = googleapiPrefix + V3ClusterType
V3EndpointsURL = googleapiPrefix + V3EndpointsType
V3HTTPConnManagerURL = googleapiPrefix + "envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager"
V3UpstreamTLSContextURL = googleapiPrefix + "envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext"
V3DownstreamTLSContextURL = googleapiPrefix + "envoy.extensions.transport_sockets.tls.v3.DownstreamTlsContext"
)
Loading…
Cancel
Save