package memory import ( "context" "log" "sync" "github.com/go-kratos/kratos/examples/event/event" ) var ( _ event.Sender = (*memorySender)(nil) _ event.Receiver = (*memoryReceiver)(nil) _ event.Event = (*Message)(nil) ) var ( chanMap = struct { sync.RWMutex cm map[string]chan *Message }{} ChanSize = 256 ) func init() { chanMap.cm = make(map[string]chan *Message) } type Message struct { key string value []byte } func (m *Message) Key() string { return m.key } func (m *Message) Value() []byte { return m.value } type memorySender struct { topic string } func (m *memorySender) Send(ctx context.Context, msg event.Event) error { chanMap.cm[m.topic] <- &Message{ key: msg.Key(), value: msg.Value(), } return nil } func (m *memorySender) Close() error { return nil } type memoryReceiver struct { topic string } func (m *memoryReceiver) Receive(ctx context.Context, handler event.Handler) error { go func() { for msg := range chanMap.cm[m.topic] { err := handler(context.Background(), msg) if err != nil { log.Fatal("message handling exception:", err) } } }() return nil } func (m *memoryReceiver) Close() error { return nil } func NewMemory(topic string) (event.Sender, event.Receiver) { chanMap.RLock() if _, ok := chanMap.cm[topic]; !ok { // chanMap.Lock() chanMap.cm[topic] = make(chan *Message, ChanSize) // chanMap.Unlock() } defer chanMap.RUnlock() return &memorySender{topic: topic}, &memoryReceiver{topic: topic} }