From 3cd4f8126f91da4f8d6f98a184c49c65f716f715 Mon Sep 17 00:00:00 2001 From: baozhecheng Date: Mon, 7 Nov 2022 14:18:41 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9A=82=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .fleet/settings.json | 0 cmd/proto-gen-go-broker/broker.go | 316 ++++++++++++++++++++ cmd/proto-gen-go-broker/broker/broker.pb.go | 113 +++++++ cmd/proto-gen-go-broker/broker/broker.proto | 17 ++ cmd/proto-gen-go-broker/broker_test.go | 87 ++++++ cmd/proto-gen-go-broker/go.mod | 8 + cmd/proto-gen-go-broker/main.go | 34 +++ cmd/proto-gen-go-broker/template.go | 140 +++++++++ cmd/proto-gen-go-broker/version.go | 4 + contrib/registry/consul/client.go | 31 ++ contrib/registry/etcd/registry.go | 19 ++ openapi.yaml | 10 + transport/broker/broker.go | 17 ++ transport/broker/transport.go | 37 +++ transport/grpc/balancer.go | 8 +- transport/transport.go | 5 +- 16 files changed, 840 insertions(+), 6 deletions(-) create mode 100644 .fleet/settings.json create mode 100644 cmd/proto-gen-go-broker/broker.go create mode 100644 cmd/proto-gen-go-broker/broker/broker.pb.go create mode 100644 cmd/proto-gen-go-broker/broker/broker.proto create mode 100644 cmd/proto-gen-go-broker/broker_test.go create mode 100644 cmd/proto-gen-go-broker/go.mod create mode 100644 cmd/proto-gen-go-broker/main.go create mode 100644 cmd/proto-gen-go-broker/template.go create mode 100644 cmd/proto-gen-go-broker/version.go create mode 100644 openapi.yaml create mode 100644 transport/broker/broker.go create mode 100644 transport/broker/transport.go diff --git a/.fleet/settings.json b/.fleet/settings.json new file mode 100644 index 000000000..e69de29bb diff --git a/cmd/proto-gen-go-broker/broker.go b/cmd/proto-gen-go-broker/broker.go new file mode 100644 index 000000000..9d000a59f --- /dev/null +++ b/cmd/proto-gen-go-broker/broker.go @@ -0,0 +1,316 @@ +package main + +import ( + "fmt" + "net/http" + "os" + "regexp" + "strings" + + "google.golang.org/protobuf/reflect/protoreflect" + + "google.golang.org/genproto/googleapis/api/annotations" + "google.golang.org/protobuf/compiler/protogen" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/descriptorpb" +) + +const ( + contextPackage = protogen.GoImportPath("context") + transportHTTPPackage = protogen.GoImportPath("github.com/go-kratos/kratos/v2/transport/http") + bindingPackage = protogen.GoImportPath("github.com/go-kratos/kratos/v2/transport/http/binding") +) + +var methodSets = make(map[string]int) + +// generateFile generates a _http.pb.go file containing kratos errors definitions. +func generateFile(gen *protogen.Plugin, file *protogen.File, omitempty bool) *protogen.GeneratedFile { + if len(file.Services) == 0 || (omitempty && !hasHTTPRule(file.Services)) { + return nil + } + filename := file.GeneratedFilenamePrefix + "_broker.pb.go" + g := gen.NewGeneratedFile(filename, file.GoImportPath) + g.P("// Code generated by protoc-gen-go-broker. DO NOT EDIT.") + g.P("// versions:") + g.P(fmt.Sprintf("// - protoc-gen-go-broker %s", release)) + g.P("// - protoc ", protocVersion(gen)) + if file.Proto.GetOptions().GetDeprecated() { + g.P("// ", file.Desc.Path(), " is a deprecated file.") + } else { + g.P("// source: ", file.Desc.Path()) + } + g.P() + g.P("package ", file.GoPackageName) + g.P() + generateFileContent(gen, file, g, omitempty) + return g +} + +// generateFileContent generates the kratos errors definitions, excluding the package statement. +func generateFileContent(gen *protogen.Plugin, file *protogen.File, g *protogen.GeneratedFile, omitempty bool) { + if len(file.Services) == 0 { + return + } + for _, service := range file.Services { + genService(gen, file, g, service, omitempty) + } +} + +func genService(gen *protogen.Plugin, file *protogen.File, g *protogen.GeneratedFile, service *protogen.Service, omitempty bool) { + if service.Desc.Options().(*descriptorpb.ServiceOptions).GetDeprecated() { + g.P("//") + g.P(deprecationComment) + } + // HTTP Server. + sd := &serviceDesc{ + ServiceType: service.GoName, + ServiceName: string(service.Desc.FullName()), + Metadata: file.Desc.Path(), + } + for _, method := range service.Methods { + if method.Desc.IsStreamingClient() || method.Desc.IsStreamingServer() { + continue + } + rule, ok := proto.GetExtension(method.Desc.Options(), annotations.E_Http).(*annotations.HttpRule) + if rule != nil && ok { + for _, bind := range rule.AdditionalBindings { + sd.Methods = append(sd.Methods, buildHTTPRule(g, method, bind)) + } + sd.Methods = append(sd.Methods, buildHTTPRule(g, method, rule)) + } else if !omitempty { + path := fmt.Sprintf("/%s/%s", service.Desc.FullName(), method.Desc.Name()) + sd.Methods = append(sd.Methods, buildMethodDesc(g, method, http.MethodPost, path)) + } + } + if len(sd.Methods) != 0 { + g.P(sd.execute()) + } +} + +func hasHTTPRule(services []*protogen.Service) bool { + for _, service := range services { + for _, method := range service.Methods { + if method.Desc.IsStreamingClient() || method.Desc.IsStreamingServer() { + continue + } + rule, ok := proto.GetExtension(method.Desc.Options(), annotations.E_Http).(*annotations.HttpRule) + if rule != nil && ok { + return true + } + } + } + return false +} + +func buildHTTPRule(g *protogen.GeneratedFile, m *protogen.Method, rule *annotations.HttpRule) *methodDesc { + var ( + path string + method string + body string + responseBody string + ) + + switch pattern := rule.Pattern.(type) { + case *annotations.HttpRule_Get: + path = pattern.Get + method = http.MethodGet + case *annotations.HttpRule_Put: + path = pattern.Put + method = http.MethodPut + case *annotations.HttpRule_Post: + path = pattern.Post + method = http.MethodPost + case *annotations.HttpRule_Delete: + path = pattern.Delete + method = http.MethodDelete + case *annotations.HttpRule_Patch: + path = pattern.Patch + method = http.MethodPatch + case *annotations.HttpRule_Custom: + path = pattern.Custom.Path + method = pattern.Custom.Kind + } + body = rule.Body + responseBody = rule.ResponseBody + md := buildMethodDesc(g, m, method, path) + if method == http.MethodGet || method == http.MethodDelete { + if body != "" { + _, _ = fmt.Fprintf(os.Stderr, "\u001B[31mWARN\u001B[m: %s %s body should not be declared.\n", method, path) + } + } else { + if body == "" { + _, _ = fmt.Fprintf(os.Stderr, "\u001B[31mWARN\u001B[m: %s %s does not declare a body.\n", method, path) + } + } + if body == "*" { + md.HasBody = true + md.Body = "" + } else if body != "" { + md.HasBody = true + md.Body = "." + camelCaseVars(body) + } else { + md.HasBody = false + } + if responseBody == "*" { + md.ResponseBody = "" + } else if responseBody != "" { + md.ResponseBody = "." + camelCaseVars(responseBody) + } + return md +} + +func buildMethodDesc(g *protogen.GeneratedFile, m *protogen.Method, method, path string) *methodDesc { + defer func() { methodSets[m.GoName]++ }() + + vars := buildPathVars(path) + + for v, s := range vars { + fields := m.Input.Desc.Fields() + + if s != nil { + path = replacePath(v, *s, path) + } + for _, field := range strings.Split(v, ".") { + if strings.TrimSpace(field) == "" { + continue + } + if strings.Contains(field, ":") { + field = strings.Split(field, ":")[0] + } + fd := fields.ByName(protoreflect.Name(field)) + if fd == nil { + fmt.Fprintf(os.Stderr, "\u001B[31mERROR\u001B[m: The corresponding field '%s' declaration in message could not be found in '%s'\n", v, path) + os.Exit(2) + } + if fd.IsMap() { + fmt.Fprintf(os.Stderr, "\u001B[31mWARN\u001B[m: The field in path:'%s' shouldn't be a map.\n", v) + } else if fd.IsList() { + fmt.Fprintf(os.Stderr, "\u001B[31mWARN\u001B[m: The field in path:'%s' shouldn't be a list.\n", v) + } else if fd.Kind() == protoreflect.MessageKind || fd.Kind() == protoreflect.GroupKind { + fields = fd.Message().Fields() + } + } + } + return &methodDesc{ + Name: m.GoName, + OriginalName: string(m.Desc.Name()), + Num: methodSets[m.GoName], + Request: g.QualifiedGoIdent(m.Input.GoIdent), + Reply: g.QualifiedGoIdent(m.Output.GoIdent), + Path: path, + Method: method, + HasVars: len(vars) > 0, + } +} + +func buildPathVars(path string) (res map[string]*string) { + if strings.HasSuffix(path, "/") { + fmt.Fprintf(os.Stderr, "\u001B[31mWARN\u001B[m: Path %s should not end with \"/\" \n", path) + } + pattern := regexp.MustCompile(`(?i){([a-z.0-9_\s]*)=?([^{}]*)}`) + matches := pattern.FindAllStringSubmatch(path, -1) + res = make(map[string]*string, len(matches)) + for _, m := range matches { + name := strings.TrimSpace(m[1]) + if len(name) > 1 && len(m[2]) > 0 { + res[name] = &m[2] + } else { + res[name] = nil + } + } + return +} + +func replacePath(name string, value string, path string) string { + pattern := regexp.MustCompile(fmt.Sprintf(`(?i){([\s]*%s[\s]*)=?([^{}]*)}`, name)) + idx := pattern.FindStringIndex(path) + if len(idx) > 0 { + path = fmt.Sprintf("%s{%s:%s}%s", + path[:idx[0]], // The start of the match + name, + strings.ReplaceAll(value, "*", ".*"), + path[idx[1]:], + ) + } + return path +} + +func camelCaseVars(s string) string { + subs := strings.Split(s, ".") + vars := make([]string, 0, len(subs)) + for _, sub := range subs { + vars = append(vars, camelCase(sub)) + } + return strings.Join(vars, ".") +} + +// camelCase returns the CamelCased name. +// If there is an interior underscore followed by a lower case letter, +// drop the underscore and convert the letter to upper case. +// There is a remote possibility of this rewrite causing a name collision, +// but it's so remote we're prepared to pretend it's nonexistent - since the +// C++ generator lowercase names, it's extremely unlikely to have two fields +// with different capitalization. +// In short, _my_field_name_2 becomes XMyFieldName_2. +func camelCase(s string) string { + if s == "" { + return "" + } + t := make([]byte, 0, 32) + i := 0 + if s[0] == '_' { + // Need a capital letter; drop the '_'. + t = append(t, 'X') + i++ + } + // Invariant: if the next letter is lower case, it must be converted + // to upper case. + // That is, we process a word at a time, where words are marked by _ or + // upper case letter. Digits are treated as words. + for ; i < len(s); i++ { + c := s[i] + if c == '_' && i+1 < len(s) && isASCIILower(s[i+1]) { + continue // Skip the underscore in s. + } + if isASCIIDigit(c) { + t = append(t, c) + continue + } + // Assume we have a letter now - if not, it's a bogus identifier. + // The next word is a sequence of characters that must start upper case. + if isASCIILower(c) { + c ^= ' ' // Make it a capital letter. + } + t = append(t, c) // Guaranteed not lower case. + // Accept lower case sequence that follows. + for i+1 < len(s) && isASCIILower(s[i+1]) { + i++ + t = append(t, s[i]) + } + } + return string(t) +} + +// Is c an ASCII lower-case letter? +func isASCIILower(c byte) bool { + return 'a' <= c && c <= 'z' +} + +// Is c an ASCII digit? +func isASCIIDigit(c byte) bool { + return '0' <= c && c <= '9' +} + +func protocVersion(gen *protogen.Plugin) string { + v := gen.Request.GetCompilerVersion() + if v == nil { + return "(unknown)" + } + var suffix string + if s := v.GetSuffix(); s != "" { + suffix = "-" + s + } + return fmt.Sprintf("v%d.%d.%d%s", v.GetMajor(), v.GetMinor(), v.GetPatch(), suffix) +} + +const deprecationComment = "// Deprecated: Do not use." diff --git a/cmd/proto-gen-go-broker/broker/broker.pb.go b/cmd/proto-gen-go-broker/broker/broker.pb.go new file mode 100644 index 000000000..74f4d3577 --- /dev/null +++ b/cmd/proto-gen-go-broker/broker/broker.pb.go @@ -0,0 +1,113 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.19.4 +// source: cmd/proto-gen-go-broker/broker/broker.proto + +package broker + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + descriptorpb "google.golang.org/protobuf/types/descriptorpb" + reflect "reflect" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +var file_cmd_proto_gen_go_broker_broker_broker_proto_extTypes = []protoimpl.ExtensionInfo{ + { + ExtendedType: (*descriptorpb.MethodOptions)(nil), + ExtensionType: (*string)(nil), + Field: 1001011, + Name: "errors.receive_topic", + Tag: "bytes,1001011,opt,name=receive_topic", + Filename: "cmd/proto-gen-go-broker/broker/broker.proto", + }, + { + ExtendedType: (*descriptorpb.MethodOptions)(nil), + ExtensionType: (*string)(nil), + Field: 1008611, + Name: "errors.output_topic", + Tag: "bytes,1008611,opt,name=output_topic", + Filename: "cmd/proto-gen-go-broker/broker/broker.proto", + }, +} + +// Extension fields to descriptorpb.MethodOptions. +var ( + // optional string receive_topic = 1001011; + E_ReceiveTopic = &file_cmd_proto_gen_go_broker_broker_broker_proto_extTypes[0] + // optional string output_topic = 1008611; + E_OutputTopic = &file_cmd_proto_gen_go_broker_broker_broker_proto_extTypes[1] +) + +var File_cmd_proto_gen_go_broker_broker_broker_proto protoreflect.FileDescriptor + +var file_cmd_proto_gen_go_broker_broker_broker_proto_rawDesc = []byte{ + 0x0a, 0x2b, 0x63, 0x6d, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2d, 0x67, 0x65, 0x6e, 0x2d, + 0x67, 0x6f, 0x2d, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2f, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, + 0x2f, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x65, + 0x72, 0x72, 0x6f, 0x72, 0x73, 0x1a, 0x20, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x6f, + 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x3a, 0x45, 0x0a, 0x0d, 0x72, 0x65, 0x63, 0x65, 0x69, + 0x76, 0x65, 0x5f, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x1e, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x4d, 0x65, 0x74, 0x68, 0x6f, + 0x64, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0xb3, 0x8c, 0x3d, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x0c, 0x72, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x3a, 0x43, + 0x0a, 0x0c, 0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x5f, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x1e, + 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, + 0x2e, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0xe3, + 0xc7, 0x3d, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x54, 0x6f, + 0x70, 0x69, 0x63, 0x42, 0x6a, 0x0a, 0x18, 0x63, 0x6f, 0x6d, 0x2e, 0x67, 0x69, 0x74, 0x68, 0x75, + 0x62, 0x2e, 0x6b, 0x72, 0x61, 0x74, 0x6f, 0x73, 0x2e, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x50, + 0x01, 0x5a, 0x3d, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x6f, + 0x2d, 0x6b, 0x72, 0x61, 0x74, 0x6f, 0x73, 0x2f, 0x6b, 0x72, 0x61, 0x74, 0x6f, 0x73, 0x2f, 0x76, + 0x32, 0x2f, 0x63, 0x6d, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2d, 0x67, 0x65, 0x6e, 0x2d, + 0x67, 0x6f, 0x2d, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x3b, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, + 0xa2, 0x02, 0x0c, 0x4b, 0x72, 0x61, 0x74, 0x6f, 0x73, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var file_cmd_proto_gen_go_broker_broker_broker_proto_goTypes = []interface{}{ + (*descriptorpb.MethodOptions)(nil), // 0: google.protobuf.MethodOptions +} +var file_cmd_proto_gen_go_broker_broker_broker_proto_depIdxs = []int32{ + 0, // 0: errors.receive_topic:extendee -> google.protobuf.MethodOptions + 0, // 1: errors.output_topic:extendee -> google.protobuf.MethodOptions + 2, // [2:2] is the sub-list for method output_type + 2, // [2:2] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 0, // [0:2] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_cmd_proto_gen_go_broker_broker_broker_proto_init() } +func file_cmd_proto_gen_go_broker_broker_broker_proto_init() { + if File_cmd_proto_gen_go_broker_broker_broker_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_cmd_proto_gen_go_broker_broker_broker_proto_rawDesc, + NumEnums: 0, + NumMessages: 0, + NumExtensions: 2, + NumServices: 0, + }, + GoTypes: file_cmd_proto_gen_go_broker_broker_broker_proto_goTypes, + DependencyIndexes: file_cmd_proto_gen_go_broker_broker_broker_proto_depIdxs, + ExtensionInfos: file_cmd_proto_gen_go_broker_broker_broker_proto_extTypes, + }.Build() + File_cmd_proto_gen_go_broker_broker_broker_proto = out.File + file_cmd_proto_gen_go_broker_broker_broker_proto_rawDesc = nil + file_cmd_proto_gen_go_broker_broker_broker_proto_goTypes = nil + file_cmd_proto_gen_go_broker_broker_broker_proto_depIdxs = nil +} diff --git a/cmd/proto-gen-go-broker/broker/broker.proto b/cmd/proto-gen-go-broker/broker/broker.proto new file mode 100644 index 000000000..4ff06c433 --- /dev/null +++ b/cmd/proto-gen-go-broker/broker/broker.proto @@ -0,0 +1,17 @@ +syntax = "proto3"; + +package errors; + +option go_package = "github.com/go-kratos/kratos/v2/cmd/proto-gen-go-broker;broker"; +option java_multiple_files = true; +option java_package = "com.github.kratos.Broker"; +option objc_class_prefix = "KratosBroker"; + +import "google/protobuf/descriptor.proto"; + + + +extend google.protobuf.MethodOptions { + string receive_topic = 1001011; + string output_topic = 1008611; +} \ No newline at end of file diff --git a/cmd/proto-gen-go-broker/broker_test.go b/cmd/proto-gen-go-broker/broker_test.go new file mode 100644 index 000000000..0070b2208 --- /dev/null +++ b/cmd/proto-gen-go-broker/broker_test.go @@ -0,0 +1,87 @@ +package main + +import ( + "reflect" + "testing" +) + +func TestNoParameters(t *testing.T) { + path := "/test/noparams" + m := buildPathVars(path) + if !reflect.DeepEqual(m, map[string]*string{}) { + t.Fatalf("Map should be empty") + } +} + +func TestSingleParam(t *testing.T) { + path := "/test/{message.id}" + m := buildPathVars(path) + if !reflect.DeepEqual(len(m), 1) { + t.Fatalf("len(m) not is 1") + } + if m["message.id"] != nil { + t.Fatalf(`m["message.id"] should be empty`) + } +} + +func TestTwoParametersReplacement(t *testing.T) { + path := "/test/{message.id}/{message.name=messages/*}" + m := buildPathVars(path) + if len(m) != 2 { + t.Fatal("len(m) should be 2") + } + if m["message.id"] != nil { + t.Fatal(`m["message.id"] should be nil`) + } + if m["message.name"] == nil { + t.Fatal(`m["message.name"] should not be nil`) + } + if *m["message.name"] != "messages/*" { + t.Fatal(`m["message.name"] should be "messages/*"`) + } +} + +func TestNoReplacePath(t *testing.T) { + path := "/test/{message.id=test}" + if !reflect.DeepEqual(replacePath("message.id", "test", path), "/test/{message.id:test}") { + t.Fatal(`replacePath("message.id", "test", path) should be "/test/{message.id:test}"`) + } + path = "/test/{message.id=test/*}" + if !reflect.DeepEqual(replacePath("message.id", "test/*", path), "/test/{message.id:test/.*}") { + t.Fatal(`replacePath("message.id", "test/*", path) should be "/test/{message.id:test/.*}"`) + } +} + +func TestReplacePath(t *testing.T) { + path := "/test/{message.id}/{message.name=messages/*}" + newPath := replacePath("message.name", "messages/*", path) + if !reflect.DeepEqual("/test/{message.id}/{message.name:messages/.*}", newPath) { + t.Fatal(`replacePath("message.name", "messages/*", path) should be "/test/{message.id}/{message.name:messages/.*}"`) + } +} + +func TestIteration(t *testing.T) { + path := "/test/{message.id}/{message.name=messages/*}" + vars := buildPathVars(path) + for v, s := range vars { + if s != nil { + path = replacePath(v, *s, path) + } + } + if !reflect.DeepEqual("/test/{message.id}/{message.name:messages/.*}", path) { + t.Fatal(`replacePath("message.name", "messages/*", path) should be "/test/{message.id}/{message.name:messages/.*}"`) + } +} + +func TestIterationMiddle(t *testing.T) { + path := "/test/{message.name=messages/*}/books" + vars := buildPathVars(path) + for v, s := range vars { + if s != nil { + path = replacePath(v, *s, path) + } + } + if !reflect.DeepEqual("/test/{message.name:messages/.*}/books", path) { + t.Fatal(`replacePath("message.name", "messages/*", path) should be "/test/{message.name:messages/.*}/books"`) + } +} diff --git a/cmd/proto-gen-go-broker/go.mod b/cmd/proto-gen-go-broker/go.mod new file mode 100644 index 000000000..2e88494fd --- /dev/null +++ b/cmd/proto-gen-go-broker/go.mod @@ -0,0 +1,8 @@ +module github.com/go-kratos/kratos/cmd/protoc-gen-go-http/v2 + +go 1.16 + +require ( + google.golang.org/genproto v0.0.0-20220519153652-3a47de7e79bd + google.golang.org/protobuf v1.28.0 +) diff --git a/cmd/proto-gen-go-broker/main.go b/cmd/proto-gen-go-broker/main.go new file mode 100644 index 000000000..a18fbddc4 --- /dev/null +++ b/cmd/proto-gen-go-broker/main.go @@ -0,0 +1,34 @@ +package main + +import ( + "flag" + "fmt" + + "google.golang.org/protobuf/compiler/protogen" + "google.golang.org/protobuf/types/pluginpb" +) + +var ( + showVersion = flag.Bool("version", false, "print the version and exit") + omitempty = flag.Bool("omitempty", true, "omit if google.api is empty") +) + +func main() { + flag.Parse() + if *showVersion { + fmt.Printf("protoc-gen-go-http %v\n", release) + return + } + protogen.Options{ + ParamFunc: flag.CommandLine.Set, + }.Run(func(gen *protogen.Plugin) error { + gen.SupportedFeatures = uint64(pluginpb.CodeGeneratorResponse_FEATURE_PROTO3_OPTIONAL) + for _, f := range gen.Files { + if !f.Generate { + continue + } + generateFile(gen, f, *omitempty) + } + return nil + }) +} diff --git a/cmd/proto-gen-go-broker/template.go b/cmd/proto-gen-go-broker/template.go new file mode 100644 index 000000000..794210852 --- /dev/null +++ b/cmd/proto-gen-go-broker/template.go @@ -0,0 +1,140 @@ +package main + +import ( + "bytes" + "strings" + "text/template" +) + +var httpTemplate = ` +{{$svrType := .ServiceType}} +{{$svrName := .ServiceName}} + +{{- range .MethodSets}} +const Operation{{$svrType}}{{.OriginalName}} = "/{{$svrName}}/{{.OriginalName}}" +{{- end}} + +type {{.ServiceType}}HTTPServer interface { +{{- range .MethodSets}} + {{.Name}}(context.Context, *{{.Request}}) (*{{.Reply}}, error) +{{- end}} +} + +func Register{{.ServiceType}}HTTPServer(s *http.Server, srv {{.ServiceType}}HTTPServer) { + r := s.Route("/") + {{- range .Methods}} + r.{{.Method}}("{{.Path}}", _{{$svrType}}_{{.Name}}{{.Num}}_HTTP_Handler(srv)) + {{- end}} +} + +{{range .Methods}} +func _{{$svrType}}_{{.Name}}{{.Num}}_HTTP_Handler(srv {{$svrType}}HTTPServer) func(ctx http.Context) error { + return func(ctx http.Context) error { + var in {{.Request}} + {{- if .HasBody}} + if err := ctx.Bind(&in{{.Body}}); err != nil { + return err + } + + {{- if not (eq .Body "")}} + if err := ctx.BindQuery(&in); err != nil { + return err + } + {{- end}} + {{- else}} + if err := ctx.BindQuery(&in{{.Body}}); err != nil { + return err + } + {{- end}} + {{- if .HasVars}} + if err := ctx.BindVars(&in); err != nil { + return err + } + {{- end}} + http.SetOperation(ctx,Operation{{$svrType}}{{.OriginalName}}) + h := ctx.Middleware(func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.{{.Name}}(ctx, req.(*{{.Request}})) + }) + out, err := h(ctx, &in) + if err != nil { + return err + } + reply := out.(*{{.Reply}}) + return ctx.Result(200, reply{{.ResponseBody}}) + } +} +{{end}} + +type {{.ServiceType}}HTTPClient interface { +{{- range .MethodSets}} + {{.Name}}(ctx context.Context, req *{{.Request}}, opts ...http.CallOption) (rsp *{{.Reply}}, err error) +{{- end}} +} + +type {{.ServiceType}}HTTPClientImpl struct{ + cc *http.Client +} + +func New{{.ServiceType}}HTTPClient (client *http.Client) {{.ServiceType}}HTTPClient { + return &{{.ServiceType}}HTTPClientImpl{client} +} + +{{range .MethodSets}} +func (c *{{$svrType}}HTTPClientImpl) {{.Name}}(ctx context.Context, in *{{.Request}}, opts ...http.CallOption) (*{{.Reply}}, error) { + var out {{.Reply}} + pattern := "{{.Path}}" + path := binding.EncodeURL(pattern, in, {{not .HasBody}}) + opts = append(opts, http.Operation(Operation{{$svrType}}{{.OriginalName}})) + opts = append(opts, http.PathTemplate(pattern)) + {{if .HasBody -}} + err := c.cc.Invoke(ctx, "{{.Method}}", path, in{{.Body}}, &out{{.ResponseBody}}, opts...) + {{else -}} + err := c.cc.Invoke(ctx, "{{.Method}}", path, nil, &out{{.ResponseBody}}, opts...) + {{end -}} + if err != nil { + return nil, err + } + return &out, err +} +{{end}} +` + +type serviceDesc struct { + ServiceType string // Greeter + ServiceName string // helloworld.Greeter + Metadata string // api/helloworld/helloworld.proto + Methods []*methodDesc + MethodSets map[string]*methodDesc +} + +type methodDesc struct { + // method + Name string + OriginalName string // The parsed original name + Num int + Request string + Reply string + // http_rule + Path string + Method string + HasVars bool + HasBody bool + Body string + ResponseBody string +} + +func (s *serviceDesc) execute() string { + s.MethodSets = make(map[string]*methodDesc) + for _, m := range s.Methods { + s.MethodSets[m.Name] = m + } + buf := new(bytes.Buffer) + tmpl, err := template.New("http").Parse(strings.TrimSpace(httpTemplate)) + if err != nil { + panic(err) + } + if err := tmpl.Execute(buf, s); err != nil { + panic(err) + } + return strings.Trim(buf.String(), "\r\n") +} diff --git a/cmd/proto-gen-go-broker/version.go b/cmd/proto-gen-go-broker/version.go new file mode 100644 index 000000000..7d929e189 --- /dev/null +++ b/cmd/proto-gen-go-broker/version.go @@ -0,0 +1,4 @@ +package main + +// release is the current protoc-gen-go-http version. +const release = "v2.5.2" diff --git a/contrib/registry/consul/client.go b/contrib/registry/consul/client.go index c1364bb3f..bf9cd381e 100644 --- a/contrib/registry/consul/client.go +++ b/contrib/registry/consul/client.go @@ -144,6 +144,37 @@ func (c *Client) Register(_ context.Context, svc *registry.ServiceInstance, enab } err := c.cli.Agent().ServiceRegister(asr) + /* + for _, name := range c.otherName { + // 执行注册多个服务名的流程 + addresses := make(map[string]api.ServiceAddress, len(svc.Endpoints)) + checkAddresses := make([]string, 0, len(svc.Endpoints)) + for _, endpoint := range svc.Endpoints { + raw, err := url.Parse(endpoint) + if err != nil { + return err + } + addr := raw.Hostname() + port, _ := strconv.ParseUint(raw.Port(), 10, 16) + + checkAddresses = append(checkAddresses, net.JoinHostPort(addr, strconv.FormatUint(port, 10))) + addresses[raw.Scheme] = api.ServiceAddress{Address: endpoint, Port: int(port)} + } + asr := &api.AgentServiceRegistration{ + ID: svc.ID, + Name: name, // 改成你要注册的其他服务名 + Meta: svc.Metadata, + Tags: []string{fmt.Sprintf("version=%s", svc.Version)}, + TaggedAddresses: addresses, + } + if len(checkAddresses) > 0 { + host, portRaw, _ := net.SplitHostPort(checkAddresses[0]) + port, _ := strconv.ParseInt(portRaw, 10, 32) + asr.Address = host + asr.Port = int(port) + } + } + */ if err != nil { return err } diff --git a/contrib/registry/etcd/registry.go b/contrib/registry/etcd/registry.go index ca858c993..8360f8419 100644 --- a/contrib/registry/etcd/registry.go +++ b/contrib/registry/etcd/registry.go @@ -88,6 +88,25 @@ func (r *Registry) Register(ctx context.Context, service *registry.ServiceInstan } go r.heartBeat(r.opts.ctx, leaseID, key, value) + + // 注册额外服务名 + for _, name := range o.names { + key := fmt.Sprintf("%s/%s/%s", r.opts.namespace, service.Name, service.ID) + value, err := marshal(service) + if err != nil { + return err + } + if r.lease != nil { + r.lease.Close() + } + r.lease = clientv3.NewLease(r.client) + leaseID, err := r.registerWithKV(ctx, key, value) + if err != nil { + return err + } + + go r.heartBeat(r.opts.ctx, leaseID, key, value) + } return nil } diff --git a/openapi.yaml b/openapi.yaml new file mode 100644 index 000000000..f8f373526 --- /dev/null +++ b/openapi.yaml @@ -0,0 +1,10 @@ +# Generated with protoc-gen-openapi +# https://github.com/google/gnostic/tree/master/cmd/protoc-gen-openapi + +openapi: 3.0.3 +info: + title: "" + version: 0.0.1 +paths: {} +components: + schemas: {} diff --git a/transport/broker/broker.go b/transport/broker/broker.go new file mode 100644 index 000000000..0be8439e5 --- /dev/null +++ b/transport/broker/broker.go @@ -0,0 +1,17 @@ +package broker + +import "context" + +type Event interface { + Key() string + Header() map[string]string + Value() []byte +} + +type Handler func(context.Context, Event) error + +type Broker interface { + Receive(topic string, handler Handler) error + Send(ctx context.Context, msg Event) error + Close() error +} diff --git a/transport/broker/transport.go b/transport/broker/transport.go new file mode 100644 index 000000000..b8211fdf9 --- /dev/null +++ b/transport/broker/transport.go @@ -0,0 +1,37 @@ +package broker + +import ( + "context" + "github.com/go-kratos/kratos/v2/transport" +) + +type broker struct { + broker Broker + receives map[string]Handler +} + +type HandlerFunc func(context.Context, Event) error + +func (b *broker) Start(_ context.Context) error { + for topic, handler := range b.receives { + if err := b.broker.Receive(topic, func(ctx context.Context, event Event) error { + return handler(ctx, event) + }); err != nil { + return err + } + } + return nil +} + +func (b *broker) Stop(ctx context.Context) error { + return b.broker.Close() +} + +func NewBroker(b Broker) transport.Server { + return &broker{} +} + +// Receive registers a handler for the given topic. +func (b *broker) Receive(topic string, handler Handler) error { + return b.broker.Receive(topic, handler) +} diff --git a/transport/grpc/balancer.go b/transport/grpc/balancer.go index f42dc23bd..acd1424f7 100644 --- a/transport/grpc/balancer.go +++ b/transport/grpc/balancer.go @@ -19,6 +19,10 @@ var ( _ balancer.Picker = (*balancerPicker)(nil) ) +type balancerBuilder struct { + builder selector.Builder +} + func init() { b := base.NewBalancerBuilder( balancerName, @@ -30,10 +34,6 @@ func init() { balancer.Register(b) } -type balancerBuilder struct { - builder selector.Builder -} - // Build creates a grpc Picker. func (b *balancerBuilder) Build(info base.PickerBuildInfo) balancer.Picker { if len(info.ReadySCs) == 0 { diff --git a/transport/transport.go b/transport/transport.go index 42510df30..f98d8af82 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -61,8 +61,9 @@ func (k Kind) String() string { return string(k) } // Defines a set of transport kind const ( - KindGRPC Kind = "grpc" - KindHTTP Kind = "http" + KindGRPC Kind = "grpc" + KindHTTP Kind = "http" + KindBroker Kind = "broker" ) type (