From cc42f76d9d6fd25ffac06f91fe7bf1e1d9e6aaa1 Mon Sep 17 00:00:00 2001 From: Casper Date: Thu, 27 Jan 2022 20:10:39 +0800 Subject: [PATCH] feat(events):add PubSub/Events --- events/pubsub.go | 28 +++++++++++++++++++ events/receiver.go | 14 ++++++++++ events/sender.go | 8 ++++++ events/server.go | 67 ++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 117 insertions(+) create mode 100644 events/pubsub.go create mode 100644 events/receiver.go create mode 100644 events/sender.go create mode 100644 events/server.go diff --git a/events/pubsub.go b/events/pubsub.go new file mode 100644 index 000000000..e8c428250 --- /dev/null +++ b/events/pubsub.go @@ -0,0 +1,28 @@ +package events + +import ( + "context" + "github.com/go-kratos/kratos/v2/transport" +) + +type Handler interface { + Handle(ctx context.Context, msg Message) error +} + +type Message struct { + Topic string + Data []byte +} + +type PublishMetadata struct { + Topic string +} + +type SubRequest struct { + Topic string +} + +type Subscriber interface { + transport.Server + Subscribe(subReq SubRequest, handler Handler) error +} diff --git a/events/receiver.go b/events/receiver.go new file mode 100644 index 000000000..ff662db43 --- /dev/null +++ b/events/receiver.go @@ -0,0 +1,14 @@ +package events + +import "context" + +type Receiver interface { + Receive(ctx context.Context) (Message, error) + Ack(msg Message) error + Nack(msg Message) error + Close() error +} + +type ReceiverBuilder interface { + Build(subReq SubRequest) (Receiver, error) +} diff --git a/events/sender.go b/events/sender.go new file mode 100644 index 000000000..88f92a568 --- /dev/null +++ b/events/sender.go @@ -0,0 +1,8 @@ +package events + +import "context" + +type Sender interface { + Send(ctx context.Context, message Message) error + Close() error +} diff --git a/events/server.go b/events/server.go new file mode 100644 index 000000000..a939ed7d4 --- /dev/null +++ b/events/server.go @@ -0,0 +1,67 @@ +package events + +import ( + "context" + "log" +) + +type ServerOption func(*Server) + +func WithReceiverBuilder(builder ReceiverBuilder) ServerOption { + return func(s *Server) { + s.receiverBuilder = builder + } +} + +type Server struct { + receiverBuilder ReceiverBuilder + receiverMap map[Receiver]Handler +} + +func NewServer(opts ...ServerOption) *Server { + s := &Server{ + receiverMap: make(map[Receiver]Handler), + } + for _, opt := range opts { + opt(s) + } + return s +} + +func (s *Server) Start(ctx context.Context) error { + for receiver, handler := range s.receiverMap { + go func(receiver Receiver, handler Handler) { + for { + msg, err := receiver.Receive(ctx) + if err != nil { + log.Printf("receiver error: %v", err) + return + } + err = handler.Handle(context.Background(), msg) + if err != nil { + log.Printf("handler error: %v", err) + } + } + }(receiver, handler) + } + return nil +} + +func (s *Server) Stop(ctx context.Context) error { + for receiver := range s.receiverMap { + err := receiver.Close() + if err != nil { + log.Printf("Error closing receiver: %v", err) + } + } + return nil +} + +func (s *Server) Subscribe(subReq SubRequest, handler Handler) error { + receiver, err := s.receiverBuilder.Build(subReq) + if err != nil { + return err + } + s.receiverMap[receiver] = handler + return nil +}