Compare commits

...

2 Commits
main ... event

  1. 29
      events/pubsub.go
  2. 14
      events/receiver.go
  3. 8
      events/sender.go
  4. 67
      events/server.go

@ -0,0 +1,29 @@
package events
import (
"context"
"github.com/go-kratos/kratos/v2/transport"
)
type Handler interface {
Handle(ctx context.Context, msg Message) error
}
type Message struct {
Topic string
Data []byte
Metadata map[string]interface{}
}
type PublishMetadata struct {
Topic string
}
type SubRequest struct {
Topic string
}
type Subscriber interface {
transport.Server
Subscribe(subReq SubRequest, handler Handler) error
}

@ -0,0 +1,14 @@
package events
import "context"
type Receiver interface {
Receive(ctx context.Context) (Message, error)
Ack(msg Message) error
Nack(msg Message) error
Close() error
}
type ReceiverBuilder interface {
Build(subReq SubRequest) (Receiver, error)
}

@ -0,0 +1,8 @@
package events
import "context"
type Sender interface {
Send(ctx context.Context, message Message) error
Close() error
}

@ -0,0 +1,67 @@
package events
import (
"context"
"log"
)
type ServerOption func(*Server)
func WithReceiverBuilder(builder ReceiverBuilder) ServerOption {
return func(s *Server) {
s.receiverBuilder = builder
}
}
type Server struct {
receiverBuilder ReceiverBuilder
receiverMap map[Receiver]Handler
}
func NewServer(opts ...ServerOption) *Server {
s := &Server{
receiverMap: make(map[Receiver]Handler),
}
for _, opt := range opts {
opt(s)
}
return s
}
func (s *Server) Start(ctx context.Context) error {
for receiver, handler := range s.receiverMap {
go func(receiver Receiver, handler Handler) {
for {
msg, err := receiver.Receive(ctx)
if err != nil {
log.Printf("receiver error: %v", err)
return
}
err = handler.Handle(context.Background(), msg)
if err != nil {
log.Printf("handler error: %v", err)
}
}
}(receiver, handler)
}
return nil
}
func (s *Server) Stop(ctx context.Context) error {
for receiver := range s.receiverMap {
err := receiver.Close()
if err != nil {
log.Printf("Error closing receiver: %v", err)
}
}
return nil
}
func (s *Server) Subscribe(subReq SubRequest, handler Handler) error {
receiver, err := s.receiverBuilder.Build(subReq)
if err != nil {
return err
}
s.receiverMap[receiver] = handler
return nil
}
Loading…
Cancel
Save