chore(examples/event): examples of increasing use of event (#1228)
* chore(examples/event): examples of increasing use of eventpull/1231/head
parent
e1228d454a
commit
b6d005b21e
@ -0,0 +1,21 @@ |
||||
package event |
||||
|
||||
import "context" |
||||
|
||||
type Message interface { |
||||
Key() string |
||||
Value() []byte |
||||
Header() map[string]string |
||||
} |
||||
|
||||
type Handler func(context.Context, Message) error |
||||
|
||||
type Sender interface { |
||||
Send(ctx context.Context, msg Message) error |
||||
Close() error |
||||
} |
||||
|
||||
type Receiver interface { |
||||
Receive(ctx context.Context, handler Handler) error |
||||
Close() error |
||||
} |
@ -0,0 +1,134 @@ |
||||
package kafka |
||||
|
||||
import ( |
||||
"context" |
||||
"github.com/go-kratos/kratos/examples/event/event" |
||||
"github.com/segmentio/kafka-go" |
||||
"github.com/segmentio/kafka-go/protocol" |
||||
"log" |
||||
) |
||||
|
||||
var _ event.Sender = (*kafkaSender)(nil) |
||||
var _ event.Receiver = (*kafkaReceiver)(nil) |
||||
var _ event.Message = (*Message)(nil) |
||||
|
||||
type Message struct { |
||||
key string |
||||
value []byte |
||||
header map[string]string |
||||
} |
||||
|
||||
func (m *Message) Key() string { |
||||
return m.key |
||||
} |
||||
func (m *Message) Value() []byte { |
||||
return m.value |
||||
} |
||||
func (m *Message) Header() map[string]string { |
||||
return m.header |
||||
} |
||||
|
||||
func NewMessage(key string, value []byte, header map[string]string) event.Message { |
||||
return &Message{ |
||||
key: key, |
||||
value: value, |
||||
header: header, |
||||
} |
||||
} |
||||
|
||||
type kafkaSender struct { |
||||
writer *kafka.Writer |
||||
topic string |
||||
} |
||||
|
||||
func (s *kafkaSender) Send(ctx context.Context, message event.Message) error { |
||||
var h []kafka.Header |
||||
if len(message.Header()) > 0 { |
||||
for k, v := range message.Header() { |
||||
h = append(h, protocol.Header{ |
||||
Key: k, |
||||
Value: []byte(v), |
||||
}) |
||||
} |
||||
} |
||||
err := s.writer.WriteMessages(ctx, kafka.Message{ |
||||
Key: []byte(message.Key()), |
||||
Value: message.Value(), |
||||
Headers: h, |
||||
}) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (s *kafkaSender) Close() error { |
||||
err := s.writer.Close() |
||||
if err != nil { |
||||
return err |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func NewKafkaSender(address []string, topic string) (event.Sender, error) { |
||||
|
||||
w := &kafka.Writer{ |
||||
Topic: topic, |
||||
Addr: kafka.TCP(address...), |
||||
Balancer: &kafka.LeastBytes{}, |
||||
} |
||||
return &kafkaSender{writer: w, topic: topic}, nil |
||||
} |
||||
|
||||
type kafkaReceiver struct { |
||||
reader *kafka.Reader |
||||
topic string |
||||
} |
||||
|
||||
func (k *kafkaReceiver) Receive(ctx context.Context, handler event.Handler) error { |
||||
go func() { |
||||
for { |
||||
m, err := k.reader.FetchMessage(context.Background()) |
||||
if err != nil { |
||||
break |
||||
} |
||||
h := make(map[string]string) |
||||
if len(m.Headers) > 0 { |
||||
for _, header := range m.Headers { |
||||
h[header.Key] = string(header.Value) |
||||
} |
||||
} |
||||
err = handler(context.Background(), &Message{ |
||||
key: string(m.Key), |
||||
value: m.Value, |
||||
header: h, |
||||
}) |
||||
if err != nil { |
||||
log.Fatal("message handling exception:", err) |
||||
} |
||||
if err := k.reader.CommitMessages(ctx, m); err != nil { |
||||
log.Fatal("failed to commit messages:", err) |
||||
} |
||||
} |
||||
}() |
||||
return nil |
||||
} |
||||
|
||||
func (k *kafkaReceiver) Close() error { |
||||
err := k.reader.Close() |
||||
if err != nil { |
||||
return err |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func NewKafkaReceiver(address []string, topic string) (event.Receiver, error) { |
||||
r := kafka.NewReader(kafka.ReaderConfig{ |
||||
Brokers: address, |
||||
GroupID: "group-a", |
||||
Topic: topic, |
||||
MinBytes: 10e3, // 10KB
|
||||
MaxBytes: 10e6, // 10MB
|
||||
}) |
||||
return &kafkaReceiver{reader: r, topic: topic}, nil |
||||
} |
@ -0,0 +1,37 @@ |
||||
package main |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"os" |
||||
"os/signal" |
||||
"syscall" |
||||
|
||||
"github.com/go-kratos/kratos/examples/event/event" |
||||
"github.com/go-kratos/kratos/examples/event/kafka" |
||||
) |
||||
|
||||
func main() { |
||||
sigs := make(chan os.Signal, 1) |
||||
signal.Notify(sigs, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) |
||||
receiver, err := kafka.NewKafkaReceiver([]string{"localhost:9092"}, "kratos") |
||||
if err != nil { |
||||
panic(err) |
||||
} |
||||
receive(receiver) |
||||
select { |
||||
case <-sigs: |
||||
_ = receiver.Close() |
||||
} |
||||
} |
||||
|
||||
func receive(receiver event.Receiver) { |
||||
fmt.Println("start receiver") |
||||
err := receiver.Receive(context.Background(), func(ctx context.Context, message event.Message) error { |
||||
fmt.Printf("key:%s, value:%s, header:%s\n", message.Key(), message.Value(), message.Header()) |
||||
return nil |
||||
}) |
||||
if err != nil { |
||||
return |
||||
} |
||||
} |
@ -0,0 +1,33 @@ |
||||
package main |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"github.com/go-kratos/kratos/examples/event/event" |
||||
"github.com/go-kratos/kratos/examples/event/kafka" |
||||
) |
||||
|
||||
func main() { |
||||
sender, err := kafka.NewKafkaSender([]string{"localhost:9092"}, "kratos") |
||||
if err != nil { |
||||
panic(err) |
||||
} |
||||
|
||||
for i := 0; i < 50; i++ { |
||||
send(sender) |
||||
} |
||||
|
||||
_ = sender.Close() |
||||
} |
||||
|
||||
func send(sender event.Sender) { |
||||
msg := kafka.NewMessage("kratos", []byte("hello world"), map[string]string{ |
||||
"user": "kratos", |
||||
"phone": "123456", |
||||
}) |
||||
err := sender.Send(context.Background(), msg) |
||||
if err != nil { |
||||
panic(err) |
||||
} |
||||
fmt.Printf("key:%s, value:%s, header:%s\n", msg.Key(), msg.Value(), msg.Header()) |
||||
} |
Loading…
Reference in new issue