chore(examples/event): add memory and rename Message to Event (#1282)

* chore(examples/event): add memory and rename Message to Event
pull/1420/head
包子 3 years ago committed by GitHub
parent de55281108
commit 1517321dd8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      examples/event/event/event.go
  2. 47
      examples/event/kafka/kafka.go
  3. 84
      examples/event/memory/memory.go
  4. 33
      examples/event/memory/memory_test.go
  5. 4
      examples/event/receiver/main.go
  6. 7
      examples/event/sender/main.go

@ -2,16 +2,15 @@ package event
import "context" import "context"
type Message interface { type Event interface {
Key() string Key() string
Value() []byte Value() []byte
Header() map[string]string
} }
type Handler func(context.Context, Message) error type Handler func(context.Context, Event) error
type Sender interface { type Sender interface {
Send(ctx context.Context, msg Message) error Send(ctx context.Context, msg Event) error
Close() error Close() error
} }

@ -4,35 +4,30 @@ import (
"context" "context"
"github.com/go-kratos/kratos/examples/event/event" "github.com/go-kratos/kratos/examples/event/event"
"github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/protocol"
"log" "log"
) )
var _ event.Sender = (*kafkaSender)(nil) var _ event.Sender = (*kafkaSender)(nil)
var _ event.Receiver = (*kafkaReceiver)(nil) var _ event.Receiver = (*kafkaReceiver)(nil)
var _ event.Message = (*Message)(nil) var _ event.Event = (*Message)(nil)
type Message struct { type Message struct {
key string key string
value []byte value []byte
header map[string]string
} }
func (m *Message) Key() string { func (m *Message) Key() string {
return m.key return m.key
} }
func (m *Message) Value() []byte { func (m *Message) Value() []byte {
return m.value return m.value
} }
func (m *Message) Header() map[string]string {
return m.header
}
func NewMessage(key string, value []byte, header map[string]string) event.Message { func NewMessage(key string, value []byte) event.Event {
return &Message{ return &Message{
key: key, key: key,
value: value, value: value,
header: header,
} }
} }
@ -41,20 +36,10 @@ type kafkaSender struct {
topic string topic string
} }
func (s *kafkaSender) Send(ctx context.Context, message event.Message) error { func (s *kafkaSender) Send(ctx context.Context, message event.Event) error {
var h []kafka.Header
if len(message.Header()) > 0 {
for k, v := range message.Header() {
h = append(h, protocol.Header{
Key: k,
Value: []byte(v),
})
}
}
err := s.writer.WriteMessages(ctx, kafka.Message{ err := s.writer.WriteMessages(ctx, kafka.Message{
Key: []byte(message.Key()), Key: []byte(message.Key()),
Value: message.Value(), Value: message.Value(),
Headers: h,
}) })
if err != nil { if err != nil {
return err return err
@ -71,7 +56,6 @@ func (s *kafkaSender) Close() error {
} }
func NewKafkaSender(address []string, topic string) (event.Sender, error) { func NewKafkaSender(address []string, topic string) (event.Sender, error) {
w := &kafka.Writer{ w := &kafka.Writer{
Topic: topic, Topic: topic,
Addr: kafka.TCP(address...), Addr: kafka.TCP(address...),
@ -92,16 +76,9 @@ func (k *kafkaReceiver) Receive(ctx context.Context, handler event.Handler) erro
if err != nil { if err != nil {
break break
} }
h := make(map[string]string)
if len(m.Headers) > 0 {
for _, header := range m.Headers {
h[header.Key] = string(header.Value)
}
}
err = handler(context.Background(), &Message{ err = handler(context.Background(), &Message{
key: string(m.Key), key: string(m.Key),
value: m.Value, value: m.Value,
header: h,
}) })
if err != nil { if err != nil {
log.Fatal("message handling exception:", err) log.Fatal("message handling exception:", err)

@ -0,0 +1,84 @@
package memory
import (
"context"
"github.com/go-kratos/kratos/examples/event/event"
"log"
"sync"
)
var _ event.Sender = (*memorySender)(nil)
var _ event.Receiver = (*memoryReceiver)(nil)
var _ event.Event = (*Message)(nil)
var (
chanMap = struct {
sync.RWMutex
cm map[string]chan *Message
}{}
ChanSize = 256
)
func init() {
chanMap.cm = make(map[string]chan *Message)
}
type Message struct {
key string
value []byte
}
func (m *Message) Key() string {
return m.key
}
func (m *Message) Value() []byte {
return m.value
}
type memorySender struct {
topic string
}
func (m *memorySender) Send(ctx context.Context, msg event.Event) error {
chanMap.cm[m.topic] <- &Message{
key: msg.Key(),
value: msg.Value(),
}
return nil
}
func (m *memorySender) Close() error {
return nil
}
type memoryReceiver struct {
topic string
}
func (m *memoryReceiver) Receive(ctx context.Context, handler event.Handler) error {
go func() {
for msg := range chanMap.cm[m.topic] {
err := handler(context.Background(), msg)
if err != nil {
log.Fatal("message handling exception:", err)
}
}
}()
return nil
}
func (m *memoryReceiver) Close() error {
return nil
}
func NewMemory(topic string) (event.Sender, event.Receiver) {
chanMap.RLock()
if _, ok := chanMap.cm[topic]; !ok {
//chanMap.Lock()
chanMap.cm[topic] = make(chan *Message, ChanSize)
//chanMap.Unlock()
}
defer chanMap.RUnlock()
return &memorySender{topic: topic}, &memoryReceiver{topic: topic}
}

@ -0,0 +1,33 @@
package memory
import (
"context"
"fmt"
"github.com/go-kratos/kratos/examples/event/event"
"testing"
"time"
)
func TestSendAndReceive(t *testing.T) {
send, receive := NewMemory("test")
err := receive.Receive(context.Background(), func(ctx context.Context, event event.Event) error {
t.Log(fmt.Sprintf("key:%s, value:%s\n", event.Key(), event.Value()))
return nil
})
if err != nil {
t.Error(err)
}
for i := 0; i < 5; i++ {
err := send.Send(context.Background(), &Message{
key: "kratos",
value: []byte("hello world"),
})
if err != nil {
t.Error(err)
}
}
time.Sleep(5 * time.Second)
}

@ -27,8 +27,8 @@ func main() {
func receive(receiver event.Receiver) { func receive(receiver event.Receiver) {
fmt.Println("start receiver") fmt.Println("start receiver")
err := receiver.Receive(context.Background(), func(ctx context.Context, message event.Message) error { err := receiver.Receive(context.Background(), func(ctx context.Context, msg event.Event) error {
fmt.Printf("key:%s, value:%s, header:%s\n", message.Key(), message.Value(), message.Header()) fmt.Printf("key:%s, value:%s\n", msg.Key(), msg.Value())
return nil return nil
}) })
if err != nil { if err != nil {

@ -21,13 +21,10 @@ func main() {
} }
func send(sender event.Sender) { func send(sender event.Sender) {
msg := kafka.NewMessage("kratos", []byte("hello world"), map[string]string{ msg := kafka.NewMessage("kratos", []byte("hello world"))
"user": "kratos",
"phone": "123456",
})
err := sender.Send(context.Background(), msg) err := sender.Send(context.Background(), msg)
if err != nil { if err != nil {
panic(err) panic(err)
} }
fmt.Printf("key:%s, value:%s, header:%s\n", msg.Key(), msg.Value(), msg.Header()) fmt.Printf("key:%s, value:%s\n", msg.Key(), msg.Value())
} }

Loading…
Cancel
Save