diff --git a/doc/wiki-cn/warden-quickstart.md b/doc/wiki-cn/warden-quickstart.md index 970c3bde3..8b44fe49c 100644 --- a/doc/wiki-cn/warden-quickstart.md +++ b/doc/wiki-cn/warden-quickstart.md @@ -100,13 +100,13 @@ import ( "google.golang.org/grpc" ) -// AppID unique app id for service discovery -const AppID = "your app id" +// target server addrs. +const target = "direct://default/127.0.0.1:9000,127.0.0.1:9091" // NOTE: example // NewClient new member grpc client func NewClient(cfg *warden.ClientConfig, opts ...grpc.DialOption) (DemoClient, error) { client := warden.NewClient(cfg, opts...) - conn, err := client.Dial(context.Background(), "discovery://default/"+AppID) + conn, err := client.Dial(context.Background(), target) if err != nil { return nil, err } @@ -117,7 +117,7 @@ func NewClient(cfg *warden.ClientConfig, opts ...grpc.DialOption) (DemoClient, e } ``` -其中,`"discovery://default/"+AppID`为gRPC target,提供给resolver用于discovery服务发现的,如果在使用其他服务发现组件,可以根据自己的实现情况传入。 +其中,`target`为gRPC用于服务发现的目标,使用标准url资源格式提供给resolver用于服务发现。`warden`默认使用`direct`直连方式,直接与`server`端进行连接。如果在使用其他服务发现组件请看[warden服务发现](warden-resolver.md)。 有了初始化`Client`的代码,我们的`Dao`对象即可进行初始化和使用,以下以直接import服务端api包为例: diff --git a/doc/wiki-cn/warden-resolver.md b/doc/wiki-cn/warden-resolver.md index e69de29bb..2d543c5ab 100644 --- a/doc/wiki-cn/warden-resolver.md +++ b/doc/wiki-cn/warden-resolver.md @@ -0,0 +1,196 @@ +# 前提 + +服务注册与发现最简单的就是`direct`固定服务端地址的直连方式。也就是服务端正常监听端口启动不进行额外操作,客户端使用如下`target`: + +```url +direct://default/127.0.0.1:9000,127.0.0.1:9091 +``` + +> 关于`target`就是标准的`URL`资源定位符[查看WIKI](https://zh.wikipedia.org/wiki/%E7%BB%9F%E4%B8%80%E8%B5%84%E6%BA%90%E5%AE%9A%E4%BD%8D%E7%AC%A6) + +其中`direct`为协议类型,此处表示直接使用该`URL`内提供的地址`127.0.0.1:9000,127.0.0.1:9091`进行连接,而`default`在此处无意义仅当做占位符。 + +# gRPC Resolver + +gRPC暴露了服务发现的接口`resolver.Builder`和`resolver.ClientConn`和`resolver.Resolver`,[官方代码位置](https://github.com/grpc/grpc-go/blob/master/resolver/resolver.go): + +```go +// Builder creates a resolver that will be used to watch name resolution updates. +type Builder interface { + // Build creates a new resolver for the given target. + // + // gRPC dial calls Build synchronously, and fails if the returned error is + // not nil. + Build(target Target, cc ClientConn, opts BuildOption) (Resolver, error) + // Scheme returns the scheme supported by this resolver. + // Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md. + Scheme() string +} + +// ClientConn contains the callbacks for resolver to notify any updates +// to the gRPC ClientConn. +// +// This interface is to be implemented by gRPC. Users should not need a +// brand new implementation of this interface. For the situations like +// testing, the new implementation should embed this interface. This allows +// gRPC to add new methods to this interface. +type ClientConn interface { + // UpdateState updates the state of the ClientConn appropriately. + UpdateState(State) + // NewAddress is called by resolver to notify ClientConn a new list + // of resolved addresses. + // The address list should be the complete list of resolved addresses. + // + // Deprecated: Use UpdateState instead. + NewAddress(addresses []Address) + // NewServiceConfig is called by resolver to notify ClientConn a new + // service config. The service config should be provided as a json string. + // + // Deprecated: Use UpdateState instead. + NewServiceConfig(serviceConfig string) +} + +// Resolver watches for the updates on the specified target. +// Updates include address updates and service config updates. +type Resolver interface { + // ResolveNow will be called by gRPC to try to resolve the target name + // again. It's just a hint, resolver can ignore this if it's not necessary. + // + // It could be called multiple times concurrently. + ResolveNow(ResolveNowOption) + // Close closes the resolver. + Close() +} +``` + +下面依次分析这三个接口的作用: + +* `Builder`用于gRPC内部创建`Resolver`接口的实现,但注意声明的`Build`方法将接口`ClientConn`作为参数传入了 +* `ClientConn`接口有两个废弃方法不用管,看`UpdateState`方法需要传入`State`结构,看代码可以发现其中包含了`Addresses []Address // Resolved addresses for the target`,可以看出是需要将服务发现得到的`Address`对象列表告诉`ClientConn`的对象 +* `Resolver`提供了`ResolveNow`用于被gRPC尝试重新进行服务发现 + +看完这三个接口就可以明白gRPC的服务发现实现逻辑,通过`Builder`进行`Reslover`的创建,在`Build`的过程中将服务发现的地址信息丢给`ClientConn`用于内部连接创建等逻辑。主要逻辑可以按下面顺序来看源码理解: + +* 当`client`在`Dial`时会根据`target`解析的`scheme`获取对应的`Builder`,[官方代码位置](https://github.com/grpc/grpc-go/blob/master/clientconn.go#L242) +* 当`Dial`成功会创建出结构体`ClientConn`的对象[官方代码位置](https://github.com/grpc/grpc-go/blob/master/clientconn.go#L447)(注意不是上面的`ClientConn`接口),可以看到结构体`ClientConn`内的成员`resolverWrapper`又实现了接口`ClientConn`的方法[官方代码位置](https://github.com/grpc/grpc-go/blob/master/resolver_conn_wrapper.go) +* 当`resolverWrapper`被初始化时就会调用`Build`方法[官方代码位置](https://github.com/grpc/grpc-go/blob/master/resolver_conn_wrapper.go#L89),其中参数为接口`ClientConn`传入的是`ccResolverWrapper` +* 当用户基于`Builder`的实现进行`UpdateState`调用时,则会触发结构体`ClientConn`的`updateResolverState`方法[官方代码位置](https://github.com/grpc/grpc-go/blob/master/resolver_conn_wrapper.go#L109),`updateResolverState`则会对传入的`Address`进行初始化等逻辑[官方代码位置](https://github.com/grpc/grpc-go/blob/master/clientconn.go#L553) + +如此整个服务发现过程就结束了。从中也可以看出gRPC官方提供的三个接口还是很灵活的,但也正因为灵活要实现稍微麻烦一些,而`Address`[官方代码位置](https://github.com/grpc/grpc-go/blob/master/resolver/resolver.go#L79)如果直接被业务拿来用于服务节点信息的描述结构则显得有些过于简单。 + +所以`warden`包装了gRPC的整个服务发现实现逻辑,代码分别位于`pkg/naming/naming.go`和`warden/resolver/resolver.go`,其中: + +* `naming.go`内定义了用于描述业务实例的`Instance`结构、用于服务注册的`Registry`接口、用于服务发现的`Resolver`接口 +* `resolver.go`内实现了gRPC官方的`resolver.Builder`和`resolver.Resolver`接口,但也暴露了`naming.go`内的`naming.Builder`和`naming.Resolver`接口 + +# warden Resolver + +接下来看`naming`内的接口如下: + +```go +// Resolver resolve naming service +type Resolver interface { + Fetch(context.Context) (*InstancesInfo, bool) + Watch() <-chan struct{} + Close() error +} + +// Builder resolver builder. +type Builder interface { + Build(id string) Resolver + Scheme() string +} +``` + +可以看到封装方式与gRPC官方的方法一样,通过`Builder`进行`Resolver`的初始化。不同的是通过封装将参数进行了简化: + +* `Build`只需要传对应的服务`id`即可:`warden/resolver/resolver.go`在gRPC进行调用后,会根据`Scheme`方法查询对应的`naming.Builder`实现并调用`Build`将`id`传入,而`naming.Resolver`的实现即可通过`id`去对应的服务发现中间件进行实例信息的查询 +* 而`Resolver`则对方法进行了扩展,除了简单进行`Fetch`操作外还多了`Watch`方法,用于监听服务发现中间件的节点变化情况,从而能够实时的进行服务实例信息的更新 + +在`naming/discovery`内实现了基于[discovery](https://github.com/bilibili/discovery)为中间件的服务注册与发现逻辑。如果要实现其他中间件如`etcd`|`zookeeper`等的逻辑,参考`naming/discovery/discovery.go`内的逻辑,将与`discovery`的交互逻辑替换掉即可(后续会默认将etcd/zk等实现,敬请期待)。 + +# 使用discovery + +因为`warden`内默认使用`direct`的方式,所以要使用[discovery](https://github.com/bilibili/discovery)需要在业务的`NewClient`前进行注册,代码如下: + +```go +package dao + +import ( + "context" + + "github.com/bilibili/kratos/pkg/naming/discovery" + "github.com/bilibili/kratos/pkg/net/rpc/warden" + "github.com/bilibili/kratos/pkg/net/rpc/warden/resolver" + + "google.golang.org/grpc" +) + +// AppID your appid, ensure unique. +const AppID = "demo.service" // NOTE: example + +// NewClient new member grpc client +func NewClient(cfg *warden.ClientConfig, opts ...grpc.DialOption) (DemoClient, error) { + + // NOTE: 注意这段代码,表示要使用discovery进行服务发现 + // NOTE: 还需注意的是,resolver.Register是全局生效的,所以建议该代码放在进程初始化的时候执行 + // NOTE: !!!切记不要在一个进程内进行不同中间件的Register!!! + resolver.Register(discovery.Builder()) + + client := warden.NewClient(cfg, opts...) + conn, err := client.Dial(context.Background(), "discovery://default/"+AppID) + if err != nil { + return nil, err + } + // 注意替换这里: + // NewDemoClient方法是在"api"目录下代码生成的 + // 对应proto文件内自定义的service名字,请使用正确方法名替换 + return NewDemoClient(conn), nil +} +``` + +注意看传入`client.Dial`的`target`是`discovery://default/${appid}`,当gRPC进行解析后会得到`scheme`=`discovery`,`warden/resolver.Builder`会通过该`scheme`获取到`naming/discovery.Discovery`对象。而`naming/discovery.Discovery`对象基于`id`就知道要获取哪个服务的实例信息。 + +# 服务注册 + +客户端既然使用了[discovery](https://github.com/bilibili/discovery)进行服务发现,也就意味着服务端启动后必须将自己注册给[discovery](https://github.com/bilibili/discovery)知道。 + +相对服务发现来讲,服务注册则简单很多,看`naming/discovery/discovery.go`内的代码实现了`naming/naming.go`内的`Registry`接口,服务端启动时可以参考下面代码进行注册: + +```go +// 该代码可放在main.go,当warden server进行初始化之后 +// 省略... + +ip := "" // NOTE: 必须拿到您实例节点的真实IP, +port := "" // NOTE: 必须拿到您实例grpc监听的真实端口,warden默认监听9000 +hn, _ := os.Hostname() +dis := discovery.New(nil) +ins := &naming.Instance{ + Zone: env.Zone, + Env: env.DeployEnv, + AppID: "your app id", + Hostname: hn, + Addrs: []string{ + "grpc://" + ip + ":" + port, + }, +} +cancel, err := dis.Register(context.Background(), ins) +if err != nil { + panic(err) +} + +// 省略... + +// 特别注意!!! +// cancel必须在进程退出时执行!!! +cancel() + +``` + +# 扩展阅读 + +[warden快速开始](warden-quickstart.md) [warden拦截器](warden-mid.md) [warden基于pb生成](warden-pb.md) [warden负载均衡](warden-balancer.md) + +------------- + +[文档目录树](summary.md) diff --git a/doc/wiki-cn/warden.md b/doc/wiki-cn/warden.md index 7eaaded5b..9c2782564 100644 --- a/doc/wiki-cn/warden.md +++ b/doc/wiki-cn/warden.md @@ -21,16 +21,7 @@ gRPC暴露了两个拦截器接口,分别是: # 服务发现 -gRPC暴露了服务发现的接口`resolver.Resolver`,`warden/resolver/resolver.go`实现了该接口,并基于了`pkg/naming/naming.go`内的`Resolver`接口进行`Fetch``Watch`等操作。 - -`pkg/naming/discovery/discovery.go`内实现了`pkg/naming/naming.go`内的`Resolver`接口,使用[discovery](https://github.com/bilibili/discovery)来进行服务发现。 - -注意:`pkg/naming/naming.go`内的`Resolver`接口是`kratos`的一层封装,暴露的接口主要: - -* 相对原生`resolver.Resolver`内`ResolveNow`更友好的方法`Fetch``Watch` -* 统一应用的实例信息结构体`naming.Instance` - -想要用非[discovery](https://github.com/bilibili/discovery)的请参考下面文档进行开发。 +`warden`默认使用`direct`方式直连,正常线上都会使用第三方服务注册与发现中间件,`warden`内包含了[discovery](https://github.com/bilibili/discovery)的逻辑实现,想使用如`etcd`、`zookeeper`等也可以,都请看下面文档。 [warden服务发现](warden-resolver.md) diff --git a/pkg/net/rpc/warden/client.go b/pkg/net/rpc/warden/client.go index d7f308c75..f206ec99c 100644 --- a/pkg/net/rpc/warden/client.go +++ b/pkg/net/rpc/warden/client.go @@ -18,6 +18,8 @@ import ( "github.com/bilibili/kratos/pkg/net/netutil/breaker" "github.com/bilibili/kratos/pkg/net/rpc/warden/balancer/p2c" "github.com/bilibili/kratos/pkg/net/rpc/warden/internal/status" + "github.com/bilibili/kratos/pkg/net/rpc/warden/resolver" + "github.com/bilibili/kratos/pkg/net/rpc/warden/resolver/direct" "github.com/bilibili/kratos/pkg/net/trace" xtime "github.com/bilibili/kratos/pkg/time" @@ -49,6 +51,10 @@ func baseMetadata() metadata.MD { return gmd } +func init() { + resolver.Register(direct.New()) +} + // ClientConfig is rpc client conf. type ClientConfig struct { Dial xtime.Duration @@ -222,7 +228,7 @@ func (c *Client) UseOpt(opt ...grpc.DialOption) *Client { // Dial creates a client connection to the given target. // Target format is scheme://authority/endpoint?query_arg=value -// example: direct://default/192.168.1.1:8080,192.168.1.2:8081 +// example: direct://default/192.168.1.1:9000,192.168.1.2:9001 func (c *Client) Dial(ctx context.Context, target string, opt ...grpc.DialOption) (conn *grpc.ClientConn, err error) { if !c.conf.NonBlock { c.opt = append(c.opt, grpc.WithBlock()) diff --git a/pkg/net/rpc/warden/internal/examples/client/client.go b/pkg/net/rpc/warden/internal/examples/client/client.go index 0ccc08428..bc22b3a2f 100644 --- a/pkg/net/rpc/warden/internal/examples/client/client.go +++ b/pkg/net/rpc/warden/internal/examples/client/client.go @@ -10,11 +10,11 @@ import ( pb "github.com/bilibili/kratos/pkg/net/rpc/warden/internal/proto/testproto" ) -// usage: ./client -grpc.target=test.service=127.0.0.1:8080 +// usage: ./client -grpc.target=test.service=127.0.0.1:9000 func main() { log.Init(&log.Config{Stdout: true}) flag.Parse() - conn, err := warden.NewClient(nil).Dial(context.Background(), "direct://d/127.0.0.1:8081") + conn, err := warden.NewClient(nil).Dial(context.Background(), "direct://default/127.0.0.1:9000") if err != nil { panic(err) } diff --git a/pkg/net/rpc/warden/server.go b/pkg/net/rpc/warden/server.go index 0149732de..0caa86f5c 100644 --- a/pkg/net/rpc/warden/server.go +++ b/pkg/net/rpc/warden/server.go @@ -19,8 +19,6 @@ import ( //this package is for json format response _ "github.com/bilibili/kratos/pkg/net/rpc/warden/internal/encoding/json" "github.com/bilibili/kratos/pkg/net/rpc/warden/internal/status" - "github.com/bilibili/kratos/pkg/net/rpc/warden/resolver" - "github.com/bilibili/kratos/pkg/net/rpc/warden/resolver/direct" "github.com/pkg/errors" "google.golang.org/grpc" @@ -137,7 +135,7 @@ func (s *Server) handle() grpc.UnaryServerInterceptor { func init() { addFlag(flag.CommandLine) - resolver.Register(direct.New()) + } func addFlag(fs *flag.FlagSet) {