parent
0f23c1c516
commit
cc42f76d9d
@ -0,0 +1,28 @@ |
|||||||
|
package events |
||||||
|
|
||||||
|
import ( |
||||||
|
"context" |
||||||
|
"github.com/go-kratos/kratos/v2/transport" |
||||||
|
) |
||||||
|
|
||||||
|
type Handler interface { |
||||||
|
Handle(ctx context.Context, msg Message) error |
||||||
|
} |
||||||
|
|
||||||
|
type Message struct { |
||||||
|
Topic string |
||||||
|
Data []byte |
||||||
|
} |
||||||
|
|
||||||
|
type PublishMetadata struct { |
||||||
|
Topic string |
||||||
|
} |
||||||
|
|
||||||
|
type SubRequest struct { |
||||||
|
Topic string |
||||||
|
} |
||||||
|
|
||||||
|
type Subscriber interface { |
||||||
|
transport.Server |
||||||
|
Subscribe(subReq SubRequest, handler Handler) error |
||||||
|
} |
@ -0,0 +1,14 @@ |
|||||||
|
package events |
||||||
|
|
||||||
|
import "context" |
||||||
|
|
||||||
|
type Receiver interface { |
||||||
|
Receive(ctx context.Context) (Message, error) |
||||||
|
Ack(msg Message) error |
||||||
|
Nack(msg Message) error |
||||||
|
Close() error |
||||||
|
} |
||||||
|
|
||||||
|
type ReceiverBuilder interface { |
||||||
|
Build(subReq SubRequest) (Receiver, error) |
||||||
|
} |
@ -0,0 +1,8 @@ |
|||||||
|
package events |
||||||
|
|
||||||
|
import "context" |
||||||
|
|
||||||
|
type Sender interface { |
||||||
|
Send(ctx context.Context, message Message) error |
||||||
|
Close() error |
||||||
|
} |
@ -0,0 +1,67 @@ |
|||||||
|
package events |
||||||
|
|
||||||
|
import ( |
||||||
|
"context" |
||||||
|
"log" |
||||||
|
) |
||||||
|
|
||||||
|
type ServerOption func(*Server) |
||||||
|
|
||||||
|
func WithReceiverBuilder(builder ReceiverBuilder) ServerOption { |
||||||
|
return func(s *Server) { |
||||||
|
s.receiverBuilder = builder |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
type Server struct { |
||||||
|
receiverBuilder ReceiverBuilder |
||||||
|
receiverMap map[Receiver]Handler |
||||||
|
} |
||||||
|
|
||||||
|
func NewServer(opts ...ServerOption) *Server { |
||||||
|
s := &Server{ |
||||||
|
receiverMap: make(map[Receiver]Handler), |
||||||
|
} |
||||||
|
for _, opt := range opts { |
||||||
|
opt(s) |
||||||
|
} |
||||||
|
return s |
||||||
|
} |
||||||
|
|
||||||
|
func (s *Server) Start(ctx context.Context) error { |
||||||
|
for receiver, handler := range s.receiverMap { |
||||||
|
go func(receiver Receiver, handler Handler) { |
||||||
|
for { |
||||||
|
msg, err := receiver.Receive(ctx) |
||||||
|
if err != nil { |
||||||
|
log.Printf("receiver error: %v", err) |
||||||
|
return |
||||||
|
} |
||||||
|
err = handler.Handle(context.Background(), msg) |
||||||
|
if err != nil { |
||||||
|
log.Printf("handler error: %v", err) |
||||||
|
} |
||||||
|
} |
||||||
|
}(receiver, handler) |
||||||
|
} |
||||||
|
return nil |
||||||
|
} |
||||||
|
|
||||||
|
func (s *Server) Stop(ctx context.Context) error { |
||||||
|
for receiver := range s.receiverMap { |
||||||
|
err := receiver.Close() |
||||||
|
if err != nil { |
||||||
|
log.Printf("Error closing receiver: %v", err) |
||||||
|
} |
||||||
|
} |
||||||
|
return nil |
||||||
|
} |
||||||
|
|
||||||
|
func (s *Server) Subscribe(subReq SubRequest, handler Handler) error { |
||||||
|
receiver, err := s.receiverBuilder.Build(subReq) |
||||||
|
if err != nil { |
||||||
|
return err |
||||||
|
} |
||||||
|
s.receiverMap[receiver] = handler |
||||||
|
return nil |
||||||
|
} |
Loading…
Reference in new issue