You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
kratos/examples/event/kafka/kafka.go

134 lines
2.6 KiB

package kafka
import (
"context"
"github.com/go-kratos/kratos/examples/event/event"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/protocol"
"log"
)
var _ event.Sender = (*kafkaSender)(nil)
var _ event.Receiver = (*kafkaReceiver)(nil)
var _ event.Message = (*Message)(nil)
type Message struct {
key string
value []byte
header map[string]string
}
func (m *Message) Key() string {
return m.key
}
func (m *Message) Value() []byte {
return m.value
}
func (m *Message) Header() map[string]string {
return m.header
}
func NewMessage(key string, value []byte, header map[string]string) event.Message {
return &Message{
key: key,
value: value,
header: header,
}
}
type kafkaSender struct {
writer *kafka.Writer
topic string
}
func (s *kafkaSender) Send(ctx context.Context, message event.Message) error {
var h []kafka.Header
if len(message.Header()) > 0 {
for k, v := range message.Header() {
h = append(h, protocol.Header{
Key: k,
Value: []byte(v),
})
}
}
err := s.writer.WriteMessages(ctx, kafka.Message{
Key: []byte(message.Key()),
Value: message.Value(),
Headers: h,
})
if err != nil {
return err
}
return nil
}
func (s *kafkaSender) Close() error {
err := s.writer.Close()
if err != nil {
return err
}
return nil
}
func NewKafkaSender(address []string, topic string) (event.Sender, error) {
w := &kafka.Writer{
Topic: topic,
Addr: kafka.TCP(address...),
Balancer: &kafka.LeastBytes{},
}
return &kafkaSender{writer: w, topic: topic}, nil
}
type kafkaReceiver struct {
reader *kafka.Reader
topic string
}
func (k *kafkaReceiver) Receive(ctx context.Context, handler event.Handler) error {
go func() {
for {
m, err := k.reader.FetchMessage(context.Background())
if err != nil {
break
}
h := make(map[string]string)
if len(m.Headers) > 0 {
for _, header := range m.Headers {
h[header.Key] = string(header.Value)
}
}
err = handler(context.Background(), &Message{
key: string(m.Key),
value: m.Value,
header: h,
})
if err != nil {
log.Fatal("message handling exception:", err)
}
if err := k.reader.CommitMessages(ctx, m); err != nil {
log.Fatal("failed to commit messages:", err)
}
}
}()
return nil
}
func (k *kafkaReceiver) Close() error {
err := k.reader.Close()
if err != nil {
return err
}
return nil
}
func NewKafkaReceiver(address []string, topic string) (event.Receiver, error) {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: address,
GroupID: "group-a",
Topic: topic,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
return &kafkaReceiver{reader: r, topic: topic}, nil
}