Compare commits
2 Commits
Author | SHA1 | Date |
---|---|---|
Casper | ea59afe189 | 3 years ago |
Casper | cc42f76d9d | 3 years ago |
@ -0,0 +1,29 @@ |
||||
package events |
||||
|
||||
import ( |
||||
"context" |
||||
"github.com/go-kratos/kratos/v2/transport" |
||||
) |
||||
|
||||
type Handler interface { |
||||
Handle(ctx context.Context, msg Message) error |
||||
} |
||||
|
||||
type Message struct { |
||||
Topic string |
||||
Data []byte |
||||
Metadata map[string]interface{} |
||||
} |
||||
|
||||
type PublishMetadata struct { |
||||
Topic string |
||||
} |
||||
|
||||
type SubRequest struct { |
||||
Topic string |
||||
} |
||||
|
||||
type Subscriber interface { |
||||
transport.Server |
||||
Subscribe(subReq SubRequest, handler Handler) error |
||||
} |
@ -0,0 +1,14 @@ |
||||
package events |
||||
|
||||
import "context" |
||||
|
||||
type Receiver interface { |
||||
Receive(ctx context.Context) (Message, error) |
||||
Ack(msg Message) error |
||||
Nack(msg Message) error |
||||
Close() error |
||||
} |
||||
|
||||
type ReceiverBuilder interface { |
||||
Build(subReq SubRequest) (Receiver, error) |
||||
} |
@ -0,0 +1,8 @@ |
||||
package events |
||||
|
||||
import "context" |
||||
|
||||
type Sender interface { |
||||
Send(ctx context.Context, message Message) error |
||||
Close() error |
||||
} |
@ -0,0 +1,67 @@ |
||||
package events |
||||
|
||||
import ( |
||||
"context" |
||||
"log" |
||||
) |
||||
|
||||
type ServerOption func(*Server) |
||||
|
||||
func WithReceiverBuilder(builder ReceiverBuilder) ServerOption { |
||||
return func(s *Server) { |
||||
s.receiverBuilder = builder |
||||
} |
||||
} |
||||
|
||||
type Server struct { |
||||
receiverBuilder ReceiverBuilder |
||||
receiverMap map[Receiver]Handler |
||||
} |
||||
|
||||
func NewServer(opts ...ServerOption) *Server { |
||||
s := &Server{ |
||||
receiverMap: make(map[Receiver]Handler), |
||||
} |
||||
for _, opt := range opts { |
||||
opt(s) |
||||
} |
||||
return s |
||||
} |
||||
|
||||
func (s *Server) Start(ctx context.Context) error { |
||||
for receiver, handler := range s.receiverMap { |
||||
go func(receiver Receiver, handler Handler) { |
||||
for { |
||||
msg, err := receiver.Receive(ctx) |
||||
if err != nil { |
||||
log.Printf("receiver error: %v", err) |
||||
return |
||||
} |
||||
err = handler.Handle(context.Background(), msg) |
||||
if err != nil { |
||||
log.Printf("handler error: %v", err) |
||||
} |
||||
} |
||||
}(receiver, handler) |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (s *Server) Stop(ctx context.Context) error { |
||||
for receiver := range s.receiverMap { |
||||
err := receiver.Close() |
||||
if err != nil { |
||||
log.Printf("Error closing receiver: %v", err) |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (s *Server) Subscribe(subReq SubRequest, handler Handler) error { |
||||
receiver, err := s.receiverBuilder.Build(subReq) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
s.receiverMap[receiver] = handler |
||||
return nil |
||||
} |
Loading…
Reference in new issue